Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"js-base64": "^3.7.5",
"lodash": "^4.17.21",
"merge2": "^1.4.1",
"micromatch": "^4.0.5",
"p-limit": "^3.1.0",
"p-memoize": "4.0.3",
"rxjs": "8.0.0-alpha.12",
Expand Down Expand Up @@ -85,6 +86,7 @@
"@types/jest": "^29.5.0",
"@types/lodash": "^4.14.191",
"@types/merge2": "^1.4.0",
"@types/micromatch": "^4.0.6",
"@types/node": "^16.18.39",
"@types/node-fetch": "^2.6.2",
"@types/secp256k1": "^4.0.3",
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/Plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export type NetworkModeConfig = {
recoveryStream: Stream;
systemStream: Stream;
nodeManager: LogStoreNodeManager;
includeOnly?: string[];
};

export type StandaloneModeConfig = {
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@
"type": "string",
"enum": ["network"]
},
"includeOnly": {
"type": "array",
"description": "List of glob-like patterns representing the streams to be tracked by this node. Note: it act as a filter, if the list is empty, all streams fetched from the contract will be tracked.",
"items": {
"type": "string"
}
},
"pool": {
"type": "object",
"description": "Kyve Pool configuration for network participant mode",
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export type NetworkParticipantMode = {
url: string;
pollInterval: number;
};
// glob pattern like list of streams that should be tracked by this node
includeOnly?: string[];
};
type StandaloneMode = {
type: 'standalone';
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/plugins/logStore/LogStorePlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { MessageListener } from './MessageListener';
import { MessageProcessor } from './MessageProcessor';
import { NodeStreamsRegistry } from './NodeStreamsRegistry';
import { ValidationSchemaManager } from './validation-schema/ValidationSchemaManager';
import { Request } from 'express';

const logger = new Logger(module);

Expand Down Expand Up @@ -156,9 +157,9 @@ export abstract class LogStorePlugin extends Plugin<LogStorePluginConfig> {
data: Readable;
}>;

public abstract validateUserQueryAccess(
address: EthereumAddress
): Promise<{ valid: true } | { valid: false; message: string }>;
public abstract validateQueryRequest(
request: Request
): Promise<{ valid: true } | { valid: false; message: string, errorCode?: number }>;

private async startStorage(
metricsContext: MetricsContext
Expand Down
13 changes: 3 additions & 10 deletions packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
/**
* Endpoints for RESTful data requests
*/
import {
MetricsContext,
MetricsDefinition,
RateMetric,
toEthereumAddress,
} from '@streamr/utils';
import { MetricsContext, MetricsDefinition, RateMetric } from '@streamr/utils';
import { Request, RequestHandler, Response } from 'express';

import { HttpServerEndpoint } from '../../../Plugin';
Expand Down Expand Up @@ -111,18 +106,16 @@ const createHandler = (metrics: MetricsDefinition): RequestHandler => {

const format = getFormat(req.query.format as string | undefined);

const consumer = toEthereumAddress(req.consumer!);

const store = logStoreContext.getStore();
if (!store) {
throw new Error('LogStore context was not initialized');
}

const { logStorePlugin } = store;

const validation = await logStorePlugin.validateUserQueryAccess(consumer);
const validation = await logStorePlugin.validateQueryRequest(req);
if (!validation.valid) {
sendError(validation.message, res);
sendError(validation.message, res, validation.errorCode);
return;
}

Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/plugins/logStore/http/httpHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ export const sendSuccess = (
});
};

export const sendError = (message: string, res: Response) => {
export const sendError = (
message: string,
res: Response,
errorCode?: number
) => {
logger.error(message);
res.status(400).json({
res.status(errorCode ?? 400).json({
error: message,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ export class LogStoreNetworkConfig implements LogStoreConfig {
pollInterval: number,
logStoreClient: LogStoreClient,
streamrClient: StreamrClient,
listener: LogStoreConfigListener
listener: LogStoreConfigListener,
// helps us not even include stream parts that we're not interested in
streamFilter?: (streamPart: StreamPartID) => boolean
) {
this.clusterSize = clusterSize;
this.myIndexInCluster = myIndexInCluster;
Expand All @@ -57,9 +59,10 @@ export class LogStoreNetworkConfig implements LogStoreConfig {
pollInterval,
logStoreClient,
(streams, block) => {
const streamParts = streams.flatMap((stream: Stream) => [
...this.createMyStreamParts(stream),
]);
const streamParts = streams
.flatMap((stream: Stream) => [...this.createMyStreamParts(stream)])
// if there's a filter, apply it, otherwise include everything
.filter((streamPart) => streamFilter?.(streamPart) ?? true);
this.handleDiff(
this.synchronizer.ingestSnapshot(
new Set<StreamPartID>(streamParts),
Expand Down
61 changes: 51 additions & 10 deletions packages/core/src/plugins/logStore/network/LogStoreNetworkPlugin.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import { QueryRequest } from '@logsn/protocol';
import { StreamPartIDUtils } from '@streamr/protocol';
import { EthereumAddress, Logger } from '@streamr/utils';
import { StreamPartID, StreamPartIDUtils, toStreamID } from '@streamr/protocol';
import { Logger, toEthereumAddress } from '@streamr/utils';
import { Schema } from 'ajv';
import { Stream } from 'streamr-client';
import { Request } from 'express';
import { Subscription } from 'rxjs';
import { Stream, StreamID } from 'streamr-client';

import { NetworkModeConfig, PluginOptions } from '../../../Plugin';
import { BroadbandPublisher } from '../../../shared/BroadbandPublisher';
import { BroadbandSubscriber } from '../../../shared/BroadbandSubscriber';
import { globsMatch } from '../../../utils/filterByGlob';
import PLUGIN_CONFIG_SCHEMA from '../config.schema.json';
import { createRecoveryEndpoint } from '../http/recoveryEndpoint';
import { LogStorePlugin } from '../LogStorePlugin';
import { Heartbeat } from './Heartbeat';
import { KyvePool } from './KyvePool';
import { createIncompatibleNodeUrlLogger } from './log-utils/checkNodeUrlCompatibility';
import { LogStoreNetworkConfig } from './LogStoreNetworkConfig';
import { MessageMetricsCollector } from './MessageMetricsCollector';
import { NetworkQueryRequestManager } from './NetworkQueryRequestManager';
Expand Down Expand Up @@ -41,6 +45,7 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
private readonly propagationResolver: PropagationResolver;
private readonly propagationDispatcher: PropagationDispatcher;
private readonly reportPoller: ReportPoller;
private readonly otherSubscriptions: Subscription[] = [];

private metricsTimer?: NodeJS.Timer;

Expand Down Expand Up @@ -121,7 +126,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
this.queryResponseManager,
this.propagationResolver,
this.systemPublisher,
this.systemSubscriber
this.systemSubscriber,
(queryRequest) => this.isStreamIncluded(toStreamID(queryRequest.streamId))
);

this.reportPoller = new ReportPoller(
Expand All @@ -131,6 +137,22 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
this.systemPublisher,
this.systemSubscriber
);

this.otherSubscriptions.push(
createIncompatibleNodeUrlLogger(
this.logStoreClient,
this.streamrClient,
this.networkConfig
).subscribe()
);
}

private isStreamIncluded(streamId: StreamID): boolean {
const includeOnlyGlobs = this.networkConfig.includeOnly;
if (!includeOnlyGlobs) {
return true;
}
return globsMatch(streamId, ...includeOnlyGlobs);
}

get networkConfig(): NetworkModeConfig {
Expand Down Expand Up @@ -175,7 +197,6 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
)
);
}

this.metricsTimer = setInterval(
this.logMetrics.bind(this),
METRICS_INTERVAL
Expand Down Expand Up @@ -204,6 +225,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
: Promise.resolve(),
]);

this.otherSubscriptions.forEach((s) => s.unsubscribe());

await super.stop();
}

Expand All @@ -217,6 +240,9 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
): Promise<LogStoreNetworkConfig> {
const node = await this.streamrClient.getNode();

const streamFilter = (streamPartId: StreamPartID) =>
this.isStreamIncluded(StreamPartIDUtils.getStreamID(streamPartId));

const logStoreConfig = new LogStoreNetworkConfig(
this.pluginConfig.cluster.clusterSize,
this.pluginConfig.cluster.myIndexInCluster,
Expand Down Expand Up @@ -257,7 +283,8 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
StreamPartIDUtils.getStreamID(streamPart)
);
},
}
},
streamFilter
);
await logStoreConfig.start();
return logStoreConfig;
Expand All @@ -278,18 +305,32 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
};
}

public async validateUserQueryAccess(address: EthereumAddress) {
const balance = await this.logStoreClient.getQueryBalanceOf(address);
public async validateQueryRequest(req: Request) {
const consumerAddress = toEthereumAddress(req.consumer!);

const balance = await this.logStoreClient.getQueryBalanceOf(
consumerAddress
);
if (balance <= 0) {
return {
valid: false,
message: 'Not enough balance staked for query',
errorCode: 402,
} as const;
} else {
}

const streamId = req.params.id;
const isStreamIncluded = this.isStreamIncluded(toStreamID(streamId));

if (!isStreamIncluded) {
return {
valid: true,
valid: false,
message: 'Stream is excluded by the network configuration',
errorCode: 404,
} as const;
}

return { valid: true } as const;
}

private logMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
import { createSignaturePayload, StreamMessage } from '@streamr/protocol';
import { Logger } from '@streamr/utils';
import { keccak256 } from 'ethers/lib/utils';
import { EMPTY } from 'rxjs';
import { Readable } from 'stream';
import { MessageMetadata } from 'streamr-client';

Expand All @@ -24,7 +25,12 @@ export class NetworkQueryRequestManager extends BaseQueryRequestManager {
private readonly queryResponseManager: QueryResponseManager,
private readonly propagationResolver: PropagationResolver,
private readonly publisher: BroadbandPublisher,
private readonly subscriber: BroadbandSubscriber
private readonly subscriber: BroadbandSubscriber,
// This function is used to determine whether the query response should be published.
// There are cases in which this node can choose not to participate in the query response propagation.
private readonly shouldPublishQueryResponseFn: (
queryRequest: QueryRequest
) => boolean
) {
//
super();
Expand All @@ -51,7 +57,9 @@ export class NetworkQueryRequestManager extends BaseQueryRequestManager {
content,
metadata,
});
const readableStream = this.getDataForQueryRequest(queryRequest);
const readableStream = this.shouldPublishQueryResponseFn(queryRequest)
? this.getDataForQueryRequest(queryRequest)
: Readable.from(EMPTY);

const hashMap = await this.getHashMap(readableStream);
const queryResponse = new QueryResponse({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { LogStoreClient } from '@logsn/client';
import { Logger } from '@streamr/utils';
import { defer, EMPTY, filter, interval, map, shareReplay, tap } from 'rxjs';
import { switchMap } from 'rxjs/internal/operators/switchMap';
import StreamrClient from 'streamr-client';

import { NetworkModeConfig } from '../../../../Plugin';

const logger = new Logger(module);

/**
* Reports if the node is configured to include only specific streams, but also has an HTTP server enabled.
* This is not acceptable as it may produce undesired 404 responses from clients.
*
* However, this should not shutdown the node, as it's not a critical issue for its operation.
*/
export const createIncompatibleNodeUrlLogger = (
logStoreClient: LogStoreClient,
streamrClient: StreamrClient,
config: NetworkModeConfig
) => {
const address$ = defer(() => streamrClient.getAddress()).pipe(
// won't change, we cache it
shareReplay(1)
);
const hasNodeUrl$ = address$.pipe(
map((nodeAddress) => logStoreClient.getNodeUrl(nodeAddress))
);

const hasIncludeOnly =
config.includeOnly !== undefined && config.includeOnly.length > 0;

if (!hasIncludeOnly) {
// this config is static, won't change, so we can already return it
return EMPTY;
}

// run periodically
return interval(5 * 60 * 1000).pipe(
switchMap(() => hasNodeUrl$),
filter(Boolean),
tap(() => {
logger.error(
`This node is configured to include only specific streams, but also has an HTTP server enabled. This may produce undesired 404 responses from clients.`
);
})
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class LogStoreStandalonePlugin extends LogStorePlugin {
};
}

public async validateUserQueryAccess() {
public async validateQueryRequest() {
return { valid: true } as const;
}

Expand Down
11 changes: 11 additions & 0 deletions packages/core/src/utils/filterByGlob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import micromatch from 'micromatch';

/**
* Check if the original string matches any of the globs
*/
export const globsMatch = (
originalStr: string,
...globs: string[]
): boolean => {
return globs.some((glob) => micromatch.isMatch(originalStr, glob));
};
Loading