diff --git a/packages/core/package.json b/packages/core/package.json index 8a29fed..b123bd6 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -58,6 +58,7 @@ "js-base64": "^3.7.5", "lodash": "^4.17.21", "merge2": "^1.4.1", + "micromatch": "^4.0.5", "p-limit": "^3.1.0", "p-memoize": "4.0.3", "rxjs": "8.0.0-alpha.12", @@ -85,6 +86,7 @@ "@types/jest": "^29.5.0", "@types/lodash": "^4.14.191", "@types/merge2": "^1.4.0", + "@types/micromatch": "^4.0.6", "@types/node": "^16.18.39", "@types/node-fetch": "^2.6.2", "@types/secp256k1": "^4.0.3", diff --git a/packages/core/src/Plugin.ts b/packages/core/src/Plugin.ts index 9ac0286..a3fc76f 100644 --- a/packages/core/src/Plugin.ts +++ b/packages/core/src/Plugin.ts @@ -14,6 +14,7 @@ export type NetworkModeConfig = { recoveryStream: Stream; systemStream: Stream; nodeManager: LogStoreNodeManager; + includeOnly?: string[]; }; export type StandaloneModeConfig = { diff --git a/packages/core/src/config/config.schema.json b/packages/core/src/config/config.schema.json index a5a863a..16b5f25 100644 --- a/packages/core/src/config/config.schema.json +++ b/packages/core/src/config/config.schema.json @@ -101,6 +101,13 @@ "type": "string", "enum": ["network"] }, + "includeOnly": { + "type": "array", + "description": "List of glob-like patterns representing the streams to be tracked by this node. Note: it act as a filter, if the list is empty, all streams fetched from the contract will be tracked.", + "items": { + "type": "string" + } + }, "pool": { "type": "object", "description": "Kyve Pool configuration for network participant mode", diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 90cd25c..1cfa237 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -15,6 +15,8 @@ export type NetworkParticipantMode = { url: string; pollInterval: number; }; + // glob pattern like list of streams that should be tracked by this node + includeOnly?: string[]; }; type StandaloneMode = { type: 'standalone'; diff --git a/packages/core/src/plugins/logStore/LogStorePlugin.ts b/packages/core/src/plugins/logStore/LogStorePlugin.ts index 044f55c..7545705 100644 --- a/packages/core/src/plugins/logStore/LogStorePlugin.ts +++ b/packages/core/src/plugins/logStore/LogStorePlugin.ts @@ -19,6 +19,7 @@ import { MessageListener } from './MessageListener'; import { MessageProcessor } from './MessageProcessor'; import { NodeStreamsRegistry } from './NodeStreamsRegistry'; import { ValidationSchemaManager } from './validation-schema/ValidationSchemaManager'; +import { Request } from 'express'; const logger = new Logger(module); @@ -156,9 +157,9 @@ export abstract class LogStorePlugin extends Plugin { data: Readable; }>; - public abstract validateUserQueryAccess( - address: EthereumAddress - ): Promise<{ valid: true } | { valid: false; message: string }>; + public abstract validateQueryRequest( + request: Request + ): Promise<{ valid: true } | { valid: false; message: string, errorCode?: number }>; private async startStorage( metricsContext: MetricsContext diff --git a/packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts b/packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts index 6ba30ae..b1fccc3 100644 --- a/packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts +++ b/packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts @@ -1,12 +1,7 @@ /** * Endpoints for RESTful data requests */ -import { - MetricsContext, - MetricsDefinition, - RateMetric, - toEthereumAddress, -} from '@streamr/utils'; +import { MetricsContext, MetricsDefinition, RateMetric } from '@streamr/utils'; import { Request, RequestHandler, Response } from 'express'; import { HttpServerEndpoint } from '../../../Plugin'; @@ -111,8 +106,6 @@ const createHandler = (metrics: MetricsDefinition): RequestHandler => { const format = getFormat(req.query.format as string | undefined); - const consumer = toEthereumAddress(req.consumer!); - const store = logStoreContext.getStore(); if (!store) { throw new Error('LogStore context was not initialized'); @@ -120,9 +113,9 @@ const createHandler = (metrics: MetricsDefinition): RequestHandler => { const { logStorePlugin } = store; - const validation = await logStorePlugin.validateUserQueryAccess(consumer); + const validation = await logStorePlugin.validateQueryRequest(req); if (!validation.valid) { - sendError(validation.message, res); + sendError(validation.message, res, validation.errorCode); return; } diff --git a/packages/core/src/plugins/logStore/http/httpHelpers.ts b/packages/core/src/plugins/logStore/http/httpHelpers.ts index df29aca..f94dbcf 100644 --- a/packages/core/src/plugins/logStore/http/httpHelpers.ts +++ b/packages/core/src/plugins/logStore/http/httpHelpers.ts @@ -74,9 +74,13 @@ export const sendSuccess = ( }); }; -export const sendError = (message: string, res: Response) => { +export const sendError = ( + message: string, + res: Response, + errorCode?: number +) => { logger.error(message); - res.status(400).json({ + res.status(errorCode ?? 400).json({ error: message, }); }; diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts index 02a25f7..922cda1 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkConfig.ts @@ -48,7 +48,9 @@ export class LogStoreNetworkConfig implements LogStoreConfig { pollInterval: number, logStoreClient: LogStoreClient, streamrClient: StreamrClient, - listener: LogStoreConfigListener + listener: LogStoreConfigListener, + // helps us not even include stream parts that we're not interested in + streamFilter?: (streamPart: StreamPartID) => boolean ) { this.clusterSize = clusterSize; this.myIndexInCluster = myIndexInCluster; @@ -57,9 +59,10 @@ export class LogStoreNetworkConfig implements LogStoreConfig { pollInterval, logStoreClient, (streams, block) => { - const streamParts = streams.flatMap((stream: Stream) => [ - ...this.createMyStreamParts(stream), - ]); + const streamParts = streams + .flatMap((stream: Stream) => [...this.createMyStreamParts(stream)]) + // if there's a filter, apply it, otherwise include everything + .filter((streamPart) => streamFilter?.(streamPart) ?? true); this.handleDiff( this.synchronizer.ingestSnapshot( new Set(streamParts), diff --git a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts index 4bba71a..17fb55d 100644 --- a/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts +++ b/packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts @@ -1,17 +1,21 @@ import { QueryRequest } from '@logsn/protocol'; -import { StreamPartIDUtils } from '@streamr/protocol'; -import { EthereumAddress, Logger } from '@streamr/utils'; +import { StreamPartID, StreamPartIDUtils, toStreamID } from '@streamr/protocol'; +import { Logger, toEthereumAddress } from '@streamr/utils'; import { Schema } from 'ajv'; -import { Stream } from 'streamr-client'; +import { Request } from 'express'; +import { Subscription } from 'rxjs'; +import { Stream, StreamID } from 'streamr-client'; import { NetworkModeConfig, PluginOptions } from '../../../Plugin'; import { BroadbandPublisher } from '../../../shared/BroadbandPublisher'; import { BroadbandSubscriber } from '../../../shared/BroadbandSubscriber'; +import { globsMatch } from '../../../utils/filterByGlob'; import PLUGIN_CONFIG_SCHEMA from '../config.schema.json'; import { createRecoveryEndpoint } from '../http/recoveryEndpoint'; import { LogStorePlugin } from '../LogStorePlugin'; import { Heartbeat } from './Heartbeat'; import { KyvePool } from './KyvePool'; +import { createIncompatibleNodeUrlLogger } from './log-utils/checkNodeUrlCompatibility'; import { LogStoreNetworkConfig } from './LogStoreNetworkConfig'; import { MessageMetricsCollector } from './MessageMetricsCollector'; import { NetworkQueryRequestManager } from './NetworkQueryRequestManager'; @@ -41,6 +45,7 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { private readonly propagationResolver: PropagationResolver; private readonly propagationDispatcher: PropagationDispatcher; private readonly reportPoller: ReportPoller; + private readonly otherSubscriptions: Subscription[] = []; private metricsTimer?: NodeJS.Timer; @@ -121,7 +126,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { this.queryResponseManager, this.propagationResolver, this.systemPublisher, - this.systemSubscriber + this.systemSubscriber, + (queryRequest) => this.isStreamIncluded(toStreamID(queryRequest.streamId)) ); this.reportPoller = new ReportPoller( @@ -131,6 +137,22 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { this.systemPublisher, this.systemSubscriber ); + + this.otherSubscriptions.push( + createIncompatibleNodeUrlLogger( + this.logStoreClient, + this.streamrClient, + this.networkConfig + ).subscribe() + ); + } + + private isStreamIncluded(streamId: StreamID): boolean { + const includeOnlyGlobs = this.networkConfig.includeOnly; + if (!includeOnlyGlobs) { + return true; + } + return globsMatch(streamId, ...includeOnlyGlobs); } get networkConfig(): NetworkModeConfig { @@ -175,7 +197,6 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { ) ); } - this.metricsTimer = setInterval( this.logMetrics.bind(this), METRICS_INTERVAL @@ -204,6 +225,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { : Promise.resolve(), ]); + this.otherSubscriptions.forEach((s) => s.unsubscribe()); + await super.stop(); } @@ -217,6 +240,9 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { ): Promise { const node = await this.streamrClient.getNode(); + const streamFilter = (streamPartId: StreamPartID) => + this.isStreamIncluded(StreamPartIDUtils.getStreamID(streamPartId)); + const logStoreConfig = new LogStoreNetworkConfig( this.pluginConfig.cluster.clusterSize, this.pluginConfig.cluster.myIndexInCluster, @@ -257,7 +283,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { StreamPartIDUtils.getStreamID(streamPart) ); }, - } + }, + streamFilter ); await logStoreConfig.start(); return logStoreConfig; @@ -278,18 +305,32 @@ export class LogStoreNetworkPlugin extends LogStorePlugin { }; } - public async validateUserQueryAccess(address: EthereumAddress) { - const balance = await this.logStoreClient.getQueryBalanceOf(address); + public async validateQueryRequest(req: Request) { + const consumerAddress = toEthereumAddress(req.consumer!); + + const balance = await this.logStoreClient.getQueryBalanceOf( + consumerAddress + ); if (balance <= 0) { return { valid: false, message: 'Not enough balance staked for query', + errorCode: 402, } as const; - } else { + } + + const streamId = req.params.id; + const isStreamIncluded = this.isStreamIncluded(toStreamID(streamId)); + + if (!isStreamIncluded) { return { - valid: true, + valid: false, + message: 'Stream is excluded by the network configuration', + errorCode: 404, } as const; } + + return { valid: true } as const; } private logMetrics() { diff --git a/packages/core/src/plugins/logStore/network/NetworkQueryRequestManager.ts b/packages/core/src/plugins/logStore/network/NetworkQueryRequestManager.ts index 56a54bd..61cc848 100644 --- a/packages/core/src/plugins/logStore/network/NetworkQueryRequestManager.ts +++ b/packages/core/src/plugins/logStore/network/NetworkQueryRequestManager.ts @@ -7,6 +7,7 @@ import { import { createSignaturePayload, StreamMessage } from '@streamr/protocol'; import { Logger } from '@streamr/utils'; import { keccak256 } from 'ethers/lib/utils'; +import { EMPTY } from 'rxjs'; import { Readable } from 'stream'; import { MessageMetadata } from 'streamr-client'; @@ -24,7 +25,12 @@ export class NetworkQueryRequestManager extends BaseQueryRequestManager { private readonly queryResponseManager: QueryResponseManager, private readonly propagationResolver: PropagationResolver, private readonly publisher: BroadbandPublisher, - private readonly subscriber: BroadbandSubscriber + private readonly subscriber: BroadbandSubscriber, + // This function is used to determine whether the query response should be published. + // There are cases in which this node can choose not to participate in the query response propagation. + private readonly shouldPublishQueryResponseFn: ( + queryRequest: QueryRequest + ) => boolean ) { // super(); @@ -51,7 +57,9 @@ export class NetworkQueryRequestManager extends BaseQueryRequestManager { content, metadata, }); - const readableStream = this.getDataForQueryRequest(queryRequest); + const readableStream = this.shouldPublishQueryResponseFn(queryRequest) + ? this.getDataForQueryRequest(queryRequest) + : Readable.from(EMPTY); const hashMap = await this.getHashMap(readableStream); const queryResponse = new QueryResponse({ diff --git a/packages/core/src/plugins/logStore/network/log-utils/checkNodeUrlCompatibility.ts b/packages/core/src/plugins/logStore/network/log-utils/checkNodeUrlCompatibility.ts new file mode 100644 index 0000000..8daf1aa --- /dev/null +++ b/packages/core/src/plugins/logStore/network/log-utils/checkNodeUrlCompatibility.ts @@ -0,0 +1,48 @@ +import { LogStoreClient } from '@logsn/client'; +import { Logger } from '@streamr/utils'; +import { defer, EMPTY, filter, interval, map, shareReplay, tap } from 'rxjs'; +import { switchMap } from 'rxjs/internal/operators/switchMap'; +import StreamrClient from 'streamr-client'; + +import { NetworkModeConfig } from '../../../../Plugin'; + +const logger = new Logger(module); + +/** + * Reports if the node is configured to include only specific streams, but also has an HTTP server enabled. + * This is not acceptable as it may produce undesired 404 responses from clients. + * + * However, this should not shutdown the node, as it's not a critical issue for its operation. + */ +export const createIncompatibleNodeUrlLogger = ( + logStoreClient: LogStoreClient, + streamrClient: StreamrClient, + config: NetworkModeConfig +) => { + const address$ = defer(() => streamrClient.getAddress()).pipe( + // won't change, we cache it + shareReplay(1) + ); + const hasNodeUrl$ = address$.pipe( + map((nodeAddress) => logStoreClient.getNodeUrl(nodeAddress)) + ); + + const hasIncludeOnly = + config.includeOnly !== undefined && config.includeOnly.length > 0; + + if (!hasIncludeOnly) { + // this config is static, won't change, so we can already return it + return EMPTY; + } + + // run periodically + return interval(5 * 60 * 1000).pipe( + switchMap(() => hasNodeUrl$), + filter(Boolean), + tap(() => { + logger.error( + `This node is configured to include only specific streams, but also has an HTTP server enabled. This may produce undesired 404 responses from clients.` + ); + }) + ); +}; diff --git a/packages/core/src/plugins/logStore/standalone/LogStoreStandalonePlugin.ts b/packages/core/src/plugins/logStore/standalone/LogStoreStandalonePlugin.ts index 4a1e358..5e6b973 100644 --- a/packages/core/src/plugins/logStore/standalone/LogStoreStandalonePlugin.ts +++ b/packages/core/src/plugins/logStore/standalone/LogStoreStandalonePlugin.ts @@ -50,7 +50,7 @@ export class LogStoreStandalonePlugin extends LogStorePlugin { }; } - public async validateUserQueryAccess() { + public async validateQueryRequest() { return { valid: true } as const; } diff --git a/packages/core/src/utils/filterByGlob.ts b/packages/core/src/utils/filterByGlob.ts new file mode 100644 index 0000000..719f78a --- /dev/null +++ b/packages/core/src/utils/filterByGlob.ts @@ -0,0 +1,11 @@ +import micromatch from 'micromatch'; + +/** + * Check if the original string matches any of the globs + */ +export const globsMatch = ( + originalStr: string, + ...globs: string[] +): boolean => { + return globs.some((glob) => micromatch.isMatch(originalStr, glob)); +}; diff --git a/packages/core/test/integration/plugins/logStore/includeOnlyBehavior.test.ts b/packages/core/test/integration/plugins/logStore/includeOnlyBehavior.test.ts new file mode 100644 index 0000000..e5ceae8 --- /dev/null +++ b/packages/core/test/integration/plugins/logStore/includeOnlyBehavior.test.ts @@ -0,0 +1,98 @@ +import { Tracker } from '@streamr/network-tracker'; +import { KeyServer } from '@streamr/test-utils'; +import { providers, Wallet } from 'ethers'; +import { Stream } from 'streamr-client'; +import { CONFIG_TEST as STREAMR_CLIENT_CONFIG_TEST } from 'streamr-client/types/src/ConfigTest'; + +import { accountUtils, getLogStoreNodeTestUtils } from '../../../resourceUtils'; +import { CONTRACT_OWNER_PRIVATE_KEY, createTestStream, startTestTracker } from '../../../utils'; + +jest.setTimeout(60000); + +// There are two options to run the test managed by a value of the TRACKER_PORT constant: +// 1. TRACKER_PORT = undefined - run the test against the brokers running in dev-env and brokers run by the test script. +// 2. TRACKER_PORT = 17771 - run the test against only brokers run by the test script. +// In this case dev-env doesn't run any brokers and there is no brokers joined the network (NodeManager.totalNodes == 0) +const TRACKER_PORT = undefined; + +describe('Network Mode Queries', () => { + const provider = new providers.JsonRpcProvider( + STREAMR_CLIENT_CONFIG_TEST.contracts?.streamRegistryChainRPCs?.rpcs[0].url, + STREAMR_CLIENT_CONFIG_TEST.contracts?.streamRegistryChainRPCs?.chainId + ); + + const fullNodeUtils = getLogStoreNodeTestUtils( + provider, + { + plugins: { + logStore: { db: { type: 'sqlite' } } + }, + httpServerPort: 7171 + }, + TRACKER_PORT + ); + const partialNodeUtils = getLogStoreNodeTestUtils( + provider, + { + plugins: { + logStore: { db: { type: 'sqlite' } } + }, + httpServerPort: 7172, + mode: { + includeOnly: ['**/*-include-test-stream'], + type: 'network' + } + }, + TRACKER_PORT + ); + + // Accounts + const client = accountUtils(provider); + + let tracker: Tracker; + + let nonIncludedStream: Stream; + let includedStream: Stream; + const includedStreamId = `/${Date.now()}-include-test-stream`; + + beforeAll(async () => { + await client.setup(); + nonIncludedStream = await createTestStream(client.streamrClient, module); + includedStream = await client.streamrClient.createStream(includedStreamId); + + await client.stakeStream(nonIncludedStream.id); + await client.stakeStream(includedStream.id); + await client.stakeQuery(); + }); + + afterAll(async () => { + // TODO: Setup global tear-down + await KeyServer.stopIfRunning(); + await client.teardown(); + }); + + beforeEach(async () => { + if (TRACKER_PORT) { + tracker = await startTestTracker(TRACKER_PORT); + } + const adminWallet = new Wallet(CONTRACT_OWNER_PRIVATE_KEY, provider); + await fullNodeUtils.setup(adminWallet, { http: 'http://127.0.0.1:7171' }); + await partialNodeUtils.setup(adminWallet, {}); + }); + + afterEach(async () => { + await fullNodeUtils.teardown(); + await partialNodeUtils.teardown(); + await tracker?.stop(); + }); + + test('query partial node - included stream', async () => { + const messageStream = await client + .logStoreClientForNodeUrl('http://localhost:7172') + .query({ streamId: includedStreamId }, { last: 1 }); + + for await (const message of messageStream) { + expect(message).toBeDefined(); + } + }); +}); diff --git a/packages/core/test/resourceUtils.ts b/packages/core/test/resourceUtils.ts new file mode 100644 index 0000000..1a2e4e9 --- /dev/null +++ b/packages/core/test/resourceUtils.ts @@ -0,0 +1,138 @@ +import { LogStoreClient, NodeMetadata } from '@logsn/client'; +import { LogStoreNodeManager } from '@logsn/contracts'; +import { + getNodeManagerContract, + getQueryManagerContract, + getStoreManagerContract, + getTokenManagerContract, + prepareStakeForNodeManager, + prepareStakeForQueryManager, + prepareStakeForStoreManager, +} from '@logsn/shared'; +import { fetchPrivateKeyWithGas } from '@streamr/test-utils'; +import { providers, Wallet } from 'ethers'; +import StreamrClient from 'streamr-client'; + +import { LogStoreNode } from '../src/node'; +import { LogStoreBrokerTestConfig, sleep, startLogStoreBroker } from './utils'; + +const STAKE_AMOUNT = BigInt('1000000000000000000'); + +const getOrError = (value: T | undefined): T => { + if (value === undefined) { + throw new Error('Something is not initialized'); + } + return value; +}; +export const getLogStoreNodeTestUtils = ( + provider: providers.Provider, + logStoreConfig: Omit, + trackerPort: number | undefined +) => { + let logstoreNode: LogStoreNode | undefined; + let wallet: Wallet | undefined; + let nodeManager: LogStoreNodeManager | undefined; + return { + teardown: async () => { + await getOrError(nodeManager) + .leave() + .then((tx) => tx.wait()); + await logstoreNode?.stop(); + }, + wallet: () => getOrError(wallet), + setup: async ( + adminWallet: Wallet, + metadata: NodeMetadata + ): Promise => { + wallet = new Wallet(await fetchPrivateKeyWithGas(), provider); + const nodeAdminManager = await getNodeManagerContract(adminWallet); + const tokenAdminManager = await getTokenManagerContract(adminWallet); + nodeManager = await getNodeManagerContract(wallet); + + await nodeAdminManager + .whitelistApproveNode(wallet.address) + .then((tx) => tx.wait()); + await tokenAdminManager + .addWhitelist(wallet.address, nodeManager.address) + .then((tx) => tx.wait()); + + await prepareStakeForNodeManager(wallet, STAKE_AMOUNT); + await nodeManager + .join(STAKE_AMOUNT, JSON.stringify(metadata)) + .then((tx) => tx.wait()); + + await sleep(5000); + + logstoreNode = await startLogStoreBroker({ + privateKey: wallet.privateKey, + trackerPort: trackerPort, + ...logStoreConfig, + }); + await logstoreNode.start(); + }, + }; +}; +export const accountUtils = ( + provider: providers.Provider, + initialWallet?: Wallet +) => { + let wallet: Wallet | undefined = initialWallet; + let logStoreClient: LogStoreClient | undefined; + let streamrClient: StreamrClient | undefined; + + const getWallet = () => getOrError(wallet); + + function getStreamrClient() { + if (!streamrClient) { + streamrClient = new StreamrClient({ + auth: { + privateKey: getWallet().privateKey, + }, + }); + } + return streamrClient; + } + + const logStoreNodeForUrlMap = new Map(); + + return { + setup: async () => { + wallet = new Wallet(await fetchPrivateKeyWithGas(), provider); + }, + teardown: async () => { + logStoreClient?.destroy(); + await streamrClient?.destroy(); + Array.from(logStoreNodeForUrlMap.values()).map((client) => + client.destroy() + ); + }, + get streamrClient() { + return getStreamrClient(); + }, + get logStoreClient() { + if (!logStoreClient) { + logStoreClient = new LogStoreClient(getStreamrClient()); + } + return logStoreClient; + }, + logStoreClientForNodeUrl: (nodeUrl: string) => { + if (!logStoreNodeForUrlMap.has(nodeUrl)) { + logStoreNodeForUrlMap.set( + nodeUrl, + new LogStoreClient(getStreamrClient(), { nodeUrl }) + ); + } + return logStoreNodeForUrlMap.get(nodeUrl)!; + }, + stakeStream: async (streamId: string) => { + const storeManager = await getStoreManagerContract(getWallet()); + await prepareStakeForStoreManager(getWallet(), STAKE_AMOUNT); + await storeManager.stake(streamId, '1').then((tx) => tx.wait()); + }, + stakeQuery: async () => { + const queryManager = await getQueryManagerContract(getWallet()); + await prepareStakeForQueryManager(getWallet(), STAKE_AMOUNT); + await queryManager.stake('1').then((tx) => tx.wait()); + }, + }; +}; diff --git a/packages/core/test/utils.ts b/packages/core/test/utils.ts index 0b151f9..51921fc 100644 --- a/packages/core/test/utils.ts +++ b/packages/core/test/utils.ts @@ -58,6 +58,7 @@ export interface LogStorePluginTestConfig { type: 'sqlite'; dbPath?: string; }; + refreshInterval?: number; }