From 09e5d0609d8a7bdda81122ceae8dc47887dfe935 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Mon, 3 Nov 2025 12:22:45 -0500 Subject: [PATCH] feat: Instrument @platformatic/kafka --- eslint.config.js | 8 +- lib/subscriber-configs.js | 1 + lib/subscribers/dc-base.js | 8 +- lib/subscribers/platformatic-kafka/config.js | 13 ++ lib/subscribers/platformatic-kafka/index.js | 126 +++++++++++ lib/subscribers/subscriber.js | 124 +++++++++++ lib/subscribers/tracing-channel-subscriber.js | 203 ++++++++++++++++++ test/versioned/platformatic-kafka/newrelic.js | 28 +++ .../versioned/platformatic-kafka/package.json | 19 ++ .../platformatic-kafka/producer.test.js | 159 ++++++++++++++ 10 files changed, 687 insertions(+), 2 deletions(-) create mode 100644 lib/subscribers/platformatic-kafka/config.js create mode 100644 lib/subscribers/platformatic-kafka/index.js create mode 100644 lib/subscribers/subscriber.js create mode 100644 lib/subscribers/tracing-channel-subscriber.js create mode 100644 test/versioned/platformatic-kafka/newrelic.js create mode 100644 test/versioned/platformatic-kafka/package.json create mode 100644 test/versioned/platformatic-kafka/producer.test.js diff --git a/eslint.config.js b/eslint.config.js index cda253082d..947b71be15 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -143,5 +143,11 @@ module.exports = [ sharedConfig.configs.baselineNewRelicConfig, newrelicConfigOverrides, - globalIgnores + globalIgnores, + + { + rules: { + 'accessor-pairs': 'off' + } + } ] diff --git a/lib/subscriber-configs.js b/lib/subscriber-configs.js index dd294db36d..b96aec163c 100644 --- a/lib/subscriber-configs.js +++ b/lib/subscriber-configs.js @@ -18,6 +18,7 @@ const subscribers = { ...require('./subscribers/openai/config'), ...require('./subscribers/pino/config'), ...require('./subscribers/pg/config'), + ...require('./subscribers/platformatic-kafka/config'), ...require('./subscribers/undici/config') } diff --git a/lib/subscribers/dc-base.js b/lib/subscribers/dc-base.js index 17108701ef..2d5b98879c 100644 --- a/lib/subscribers/dc-base.js +++ b/lib/subscribers/dc-base.js @@ -26,6 +26,12 @@ const dc = require('node:diagnostics_channel') */ /** + * Provides a base interface for interacting with + * [Diagnostics Channel]{@link https://nodejs.org/docs/latest/api/diagnostics_channel.html#diagnostics-channel} + * instances. A diagnostics channel (DC) is _not_ a tracing channel. A DC + * is a simple channel with a singular event. This class is not suitable for + * interacting with libraries that publish _tracing channels_. + * * @property {object} agent A New Relic Node.js agent instance. * @property {ChannelDescriptor[]} channels The channels to subscribe to. * @property {object} logger An agent logger instance. @@ -46,7 +52,7 @@ class Subscriber { set channels(channels) { if (!Array.isArray(channels)) { - throw new Error('channels must be a collection of with propertiesof channel and hook') + throw new Error('channels must be a collection of ChannelDescriptor objects') } this._channels = channels } diff --git a/lib/subscribers/platformatic-kafka/config.js b/lib/subscribers/platformatic-kafka/config.js new file mode 100644 index 0000000000..69350c5b91 --- /dev/null +++ b/lib/subscribers/platformatic-kafka/config.js @@ -0,0 +1,13 @@ +/* + * Copyright 2025 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +module.exports = { + '@platformatic/kafka': [{ + path: './platformatic-kafka', + instrumentations: [] + }] +} diff --git a/lib/subscribers/platformatic-kafka/index.js b/lib/subscribers/platformatic-kafka/index.js new file mode 100644 index 0000000000..851018df92 --- /dev/null +++ b/lib/subscribers/platformatic-kafka/index.js @@ -0,0 +1,126 @@ +/* + * Copyright 2025 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +// TODO: add test to cover once this is resolved https://github.com/platformatic/kafka/issues/155 + +const recorder = require('#agentlib/metrics/recorders/generic.js') +const { + TracingChannelSubscription, + TracingChannelSubscriber +} = require('../tracing-channel-subscriber.js') + +const connectsSub = new TracingChannelSubscription({ + channel: 'tracing:plt:kafka:connections:connects', + start(event) { + // TODO: is this invoked for every broker that was defined during + // initialization? If so, should we be creating a segment for each broker + // send? + + const context = this.agent.tracer.getContext() + const { segment, transaction } = context + + if (!segment || !transaction) { + this.logger.trace('Not capturing connection details due to missing transaction.') + return + } + + const { topic, clientKind } = context.extras + const { host: broker } = event + this.agent.metrics + .getOrCreateMetric(`MessageBroker/Kafka/Nodes/${broker}/${clientKind}/${topic}`) + .incrementCallCount() + + const externalSegment = this.agent.tracer.createSegment({ + name: 'connect', + parent: segment, + recorder, + transaction + }) + externalSegment.shimId = this.id + externalSegment.start() + + return context.enterSegment({ segment: externalSegment }) + }, + error(event) { + const context = this.agent.tracer.getContext() + context?.transaction?.agent.errors.add(context.transaction, event.error) + } +}) + +const producerSendsSub = new TracingChannelSubscription({ + channel: 'tracing:plt:kafka:producer:sends', + start(event) { + const context = this.agent.tracer.getContext() + const { segment, transaction } = context + + if (!segment || !transaction) { + this.logger.trace('Not recording producer send due to missing transaction.') + return + } + + // TODO: this module only has a single `.send` method that works like + // kafkajs's `sendBatch` method. We should probably figure out a better way + // to generate metrics for batches. + const firstMessage = event?.options?.messages?.[0] + const topic = firstMessage?.topic ?? 'unknown' + + // Order of operations is: + // 1. `producer.send` is invoked which gets us to here + // 2. Connection is made to the remote system. + // 3. The `connections:connects` channel is triggered. + // 4. In the other channel, we need to the topic in order to generate the + // linking metric. So we have to store that information in the context. + context.extras.topic = topic + context.extras.clientKind = 'Produce' + + const externalSegment = this.agent.tracer.createSegment({ + name: `MessageBroker/Kafka/topic/Produce/Named/${topic}`, + parent: segment, + recorder, + transaction + }) + segment.opaque = transaction.opaque + externalSegment.shimId = this.id + externalSegment.type = 'message' + externalSegment.start() + + return context.enterSegment({ segment: externalSegment }) + }, + end() { + this.touch() + }, + + asyncStart() { + // TODO: are we getting the correct context? Do we need to do the crazy + // `isActiveTx` context propagation as we had to do in other cases? + this.touch() + }, + + asyncEnd() { + this.touch() + } +}) + +class PlatformaticKafkaSubscriber extends TracingChannelSubscriber { + constructor({ agent, logger, ...rest }) { + super({ agent, logger, packageName: '@platformatic/kafka', ...rest }) + this.subscriptions = [ + connectsSub, + producerSendsSub + ] + } + + /** + * Update the current context's timing tracker. + */ + touch() { + const context = this.agent.tracer.getContext() + context?.segment?.touch() + } +} + +module.exports = PlatformaticKafkaSubscriber diff --git a/lib/subscribers/subscriber.js b/lib/subscribers/subscriber.js new file mode 100644 index 0000000000..6539800732 --- /dev/null +++ b/lib/subscribers/subscriber.js @@ -0,0 +1,124 @@ +/* + * Copyright 2025 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +/** + * This is an interface class. It defines the methods each subclass _must_ + * implement and override in order for the diagnostics channel or tracing + * channel subscriptions to work. + * + * @property {object} agent A New Relic Node.js agent instance. + * @property {object} logger An agent logger instance. + * @property {object} config The agent configuration object. + * @property {string} packageName The name of the module being instrumented. + * This is the same string one would pass to the `require` function. + * @property {string} id An alias for `packageName`. + * + * @private + * @interface + */ +class Subscriber { + #agent + #config + #logger + #packageName + + constructor({ agent, logger, packageName }) { + this.#agent = agent + this.#config = agent.config + this.#logger = logger.child({ component: `${packageName}-subscriber` }) + this.#packageName = packageName + } + + get [Symbol.toStringTag]() { + return 'Subscriber' + } + + get agent() { + return this.#agent + } + + get config() { + return this.#config + } + + get id() { + return this.#packageName + } + + get logger() { + return this.#logger + } + + get packageName() { + return this.#packageName + } + + /** + * Indicates if the subscription should be enabled or not. The most likely + * scenario is that an implementation will consult the agent configuration + * to determine the result. + * + * @returns {boolean} Subscriber is enabled or not. + */ + get enabled() { + throw Error('enabled is not implemented on class: ' + this.constructor.name) + } + + /** + * Implementations should utilize subclass specific configuration or logic + * to enable the subscriber. This is basically a start-up lifecycle hook + * that the implementation can use to perform necessary actions, e.g. + * creating an asynchronous context and binding it to an appropriate channel. + * + * @returns {void | Function | boolean} Result of the enablement. Not likely + * to be used. + */ + enable() { + throw Error('enable is not implemented on class: ' + this.constructor.name) + } + + /** + * The inverse of the `enable` method. It's basically an agent shutdown + * lifecycle hook. Any clean up logic required as a result of the work + * performed in the `enable` method should be hosted here. + * + * @returns {void | boolean} Result of the disablement. Not likely to be + * used. + */ + disable() { + throw Error('disable is not implemented on class: ' + this.constructor.name) + } + + /** + * Classes must implement this method. It is expected to read some + * configuration data, specific to the subclass, and utilize it to + * perform the channel subscriptions. + * + * @returns {void} + * + * @example A basic "Diagnostics Channel" based method. + * const dc = require('node:diagnostics_channel') + * for (const sub of this.#subscriptions) { + * dc.subscribe(sub.channelName, sub.hook.bind(this)) + * } + */ + subscribe() { + throw Error('subscribe is not implemented on class: ' + this.constructor.name) + } + + /** + * The inverse of the `subscribe` method. This should iterate through the + * subscribed channels and issue any unsubscribe and clean-up logic for them. + * + * @returns {void} + */ + unsubscribe() { + throw Error('unsubscribe is not implemented on class: ' + this.constructor.name) + } +} + +module.exports = Subscriber diff --git a/lib/subscribers/tracing-channel-subscriber.js b/lib/subscribers/tracing-channel-subscriber.js new file mode 100644 index 0000000000..6786a77c6b --- /dev/null +++ b/lib/subscribers/tracing-channel-subscriber.js @@ -0,0 +1,203 @@ +/* + * Copyright 2025 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +// eslint-disable-next-line n/no-unsupported-features/node-builtins +const dc = require('node:diagnostics_channel') +const Subscriber = require('./subscriber.js') + +/** + * A `TracingChannelSubscription` is an object that provides the channel name + * and event handlers for a tracing channel. It provides an easy to validate + * object with access to necessary event handlers. + */ +class TracingChannelSubscription { + #channel + #start + #end + #asyncStart + #asyncEnd + #error + + /** + * Create a new instance. All event handlers are optional. Any event handler + * not provided will be registered to a no-operation function. + * + * @param {object} params Constructor parameters object. + * @param {string} params.channel The name of the tracing channel to monitor. + * @param {Function} [params.start] The callback to invoke on start events. + * @param {Function} [params.end] The callback to invoke on end events. + * @param {Function} [params.asyncStart] The callback to invoke on asyncStart events. + * @param {Function} [params.asyncEnd] The callback to invoke on asyncEnd events. + * @param {Function} [params.error] The callback to invoke on error events. + */ + constructor ({ channel, start, end, asyncStart, asyncEnd, error }) { + this.#channel = channel + + // It's annoying that we have to do it this way. If we were to use + // `Object.defineProperties` then we wouldn't get IDE/editor support + // for the methods. + this.#start = start ?? noop + this.#end = end ?? noop + this.#asyncStart = asyncStart ?? noop + this.#asyncEnd = asyncEnd ?? noop + this.#error = error ?? noop + + function noop() {} + } + + get [Symbol.toStringTag]() { + return 'TracingChannelSubscription' + } + + /** + * The name of the channel this subscription targets. It should be the + * fully qualified channel name, sans any event name. + * + * @example + * const chan = diagnosticsChannel.tracingChannel('foo:bar:baz') + * // name = `tracing:foo:bar:baz` + * const sub = new TracingChannelSubscription({ channel: 'tracing:foo:bar:baz' }) + * console.log(sub.channel) // "tracing:foo:bar:baz" + * + * @returns {string} The tracing channel name. + */ + get channel() { + return this.#channel + } + + /** + * Function to handle `start` events. + * + * @returns {Function} Start event handler. + */ + get start() { + return this.#start + } + + /** + * Function to handle `end` events. + * + * @returns {Function} End event handler. + */ + get end() { + return this.#end + } + + /** + * Function to handle `asyncStart` events. + * + * @returns {Function} Async start event handler. + */ + get asyncStart() { + return this.#asyncStart + } + + /** + * Function to handle `asyncEnd` events. + * + * @returns {Function} Async end event handler. + */ + get asyncEnd() { + return this.#asyncEnd + } + + /** + * Function to handle `error` events. + * + * @returns {Function} Error event handler. + */ + get error() { + return this.#error + } +} + +/** + * A `TracingChannelSubscriber` is used to interact with libraries that publish + * [Tracing Channel]{@link https://nodejs.org/docs/latest/api/diagnostics_channel.html#class-tracingchannel} + * instances. A tracing channel (TC) is a collection of diagnostics channels, + * where each channel corresponds to a specific event in the lifecycle of a + * traced operation. + * + * To be clear: this is meant to be used with libraries that publish their own + * channels. Not with libraries that we have dynamically patched with injected + * tracing channels. For those cases, use the class exported from `./base.js`, + * or one of the subclasses of it. + */ +class TracingChannelSubscriber extends Subscriber { + #tcSubs = [] + #registeredSubs = [] + #events = ['start', 'end', 'asyncStart', 'asyncEnd', 'error'] + + constructor({ agent, logger, packageName }) { + super({ agent, logger, packageName }) + } + + get [Symbol.toStringTag]() { + return 'TracingChannelSubscriber' + } + + /** + * Define the object that contains the subscription channel name and + * event callbacks. + * + * @param {TracingChannelSubscription[]} tcSubs An object with event listeners + * and the channel name. + */ + set subscriptions(tcSubs) { + const validated = [] + for (const sub of tcSubs) { + if (Object.prototype.toString.call(sub) !== '[object TracingChannelSubscription]') { + this.logger.warn('attempted to set subscriptions with an invalid object') + return + } + validated.push(sub) + } + Array.prototype.push.apply(this.#tcSubs, validated) + } + + /** + * Whether the instance is enabled or not. This is accomplished by matching + * the key name exported from the instrumentations `config.js` with the + * `packageName` provided at construction. + * + * @returns {boolean} `true` for an enabled subscriber. + */ + get enabled() { + return this.config.instrumentation[this.id].enabled === true + } + + enable() { + return true + } + + disable() { + return true + } + + subscribe() { + for (const sub of this.#tcSubs) { + const { channel } = sub + for (const event of this.#events) { + const chan = `${channel}:${event}` + const fn = sub[event].bind(this) + dc.subscribe(chan, fn) + this.#registeredSubs.push([chan, fn]) + } + } + } + + unsubscribe() { + for (const [chan, fn] of this.#registeredSubs) { + dc.unsubscribe(chan, fn) + } + } +} + +module.exports = { + TracingChannelSubscription, + TracingChannelSubscriber +} diff --git a/test/versioned/platformatic-kafka/newrelic.js b/test/versioned/platformatic-kafka/newrelic.js new file mode 100644 index 0000000000..af4b696e2f --- /dev/null +++ b/test/versioned/platformatic-kafka/newrelic.js @@ -0,0 +1,28 @@ +/* + * Copyright 2021 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +exports.config = { + app_name: ['My Application'], + license_key: 'license key here', + logging: { + level: 'trace', + filepath: '../../newrelic_agent.log' + }, + utilization: { + detect_aws: false, + detect_pcf: false, + detect_azure: false, + detect_gcp: false, + detect_docker: false + }, + distributed_tracing: { + enabled: true + }, + transaction_tracer: { + enabled: true + } +} diff --git a/test/versioned/platformatic-kafka/package.json b/test/versioned/platformatic-kafka/package.json new file mode 100644 index 0000000000..cae676b951 --- /dev/null +++ b/test/versioned/platformatic-kafka/package.json @@ -0,0 +1,19 @@ +{ + "name": "platformatic-kafka-tests", + "targets": [{"name":"@platformatic/kafka","minAgentVersion":"13.6.1"}], + "version": "0.0.0", + "private": true, + "tests": [ + { + "engines": { + "node": ">=20" + }, + "dependencies": { + "@platformatic/kafka": ">=1.19.0" + }, + "files": [ + "producer.test.js" + ] + } + ] +} diff --git a/test/versioned/platformatic-kafka/producer.test.js b/test/versioned/platformatic-kafka/producer.test.js new file mode 100644 index 0000000000..2fa61be963 --- /dev/null +++ b/test/versioned/platformatic-kafka/producer.test.js @@ -0,0 +1,159 @@ +/* + * Copyright 2025 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +const test = require('node:test') +const { randomUUID } = require('node:crypto') + +const promiseResolvers = require('../../lib/promise-resolvers') +const helper = require('../../lib/agent_helper') +const { + kafka_host: kHost, + kafka_port: kPort +} = require('../../lib/params') + +test.beforeEach(async (ctx) => { + ctx.nr = {} + ctx.nr.agent = helper.instrumentMockedAgent({ + feature_flag: { + pltkafka_instrumentation: true + } + }) + ctx.nr.topic = `topic-${randomUUID()}` + ctx.nr.clientId = `client-${randomUUID()}` + + const { + Admin, + Producer, + stringSerializers, + connectionsConnectsChannel + } = require('@platformatic/kafka') + ctx.nr.pltKafka = { Admin, Producer, stringSerializers } + ctx.nr.producer = new Producer({ + clientId: ctx.nr.clientId, + bootstrapBrokers: [`${kHost}:${kPort}`], + serializers: stringSerializers + }) + ctx.nr.connectionsConnectsChannel = connectionsConnectsChannel + + const admin = new Admin({ + clientId: 'test-admin', + bootstrapBrokers: [`${kHost}:${kPort}`] + }) + await admin.createTopics({ + topics: [ctx.nr.topic], + partitions: 1, + replicas: 1 + }) + ctx.nr.admin = admin +}) + +test.afterEach(async (ctx) => { + await ctx.nr.producer.close() + await ctx.nr.admin.deleteTopics({ topics: [ctx.nr.topic] }) + await ctx.nr.admin.close() + + helper.unloadAgent(ctx.nr.agent) +}) + +test('adds package tracking metrics', (t) => { + // TODO: assert that package tracking metrics are added during module instrumentation + t.diagnostic('test not implemented') +}) + +test('reports connection errors', async (t) => { + t.plan(4) + + // For some reason, the plan is not enough. Without the resolver, the + // asynchronous activity doesn't have time to complete. + const { promise, resolve } = promiseResolvers() + const expectedTxName = 'produce-tx-error' + + const { clientId } = t.nr + const { Producer, stringSerializers } = t.nr.pltKafka + await t.nr.producer.close() + t.nr.producer = new Producer({ + clientId, + bootstrapBrokers: [`${kHost}:13337`], // non-listening port to trigger connect error + serializers: stringSerializers, + retries: 0 + }) + const { agent, producer } = t.nr + + agent.on('transactionFinished', (tx) => { + t.assert.equal(tx.name, expectedTxName) + t.assert.equal(tx.exceptions.length, 1) + const event = tx.agent.errors.eventAggregator.getEvents().at(0).at(0) + t.assert.equal(event['error.message'], 'Connection to 127.0.0.1:13337 failed.') + }) + + helper.runInTransaction(agent, async (tx) => { + tx.name = expectedTxName + try { + await producer.send({ + messages: [{ + topic: 'does not matter', + key: 'key1', + value: 'value1' + }] + }) + t.assert.fail('should generate an error') + } catch (error) { + t.assert.ok(error) + } finally { + tx.end() + resolve() + } + }) + + await promise +}) + +test('tracks messages sent from a producer', (t, end) => { + t.plan(5) + + const { agent, producer, topic } = t.nr + const expectedTxName = 'success-case' + + agent.on('transactionFinished', (tx) => { + t.assert.equal(tx.name, expectedTxName) + + // TODO: I don't think it should start with "Truncated"? + const name = `Truncated/MessageBroker/Kafka/topic/Produce/Named/${topic}` + const segment = tx.agent.tracer.getSegment() + const segmentChildren = tx.trace.getChildren(segment.id) + + const foundSegment = segmentChildren.find((c) => c.name.endsWith(topic)) + t.assert.ok(foundSegment) + t.assert.equal(foundSegment.name, name) + + const metric = tx.metrics.getMetric(name) + t.assert.equal(metric.callCount, 1) + + const trackingMetric = tx.agent.metrics.getMetric(`MessageBroker/Kafka/Nodes/${kHost}/Produce/${topic}`) + t.assert.equal(trackingMetric.callCount, 1) + + end() + }) + + helper.runInTransaction(agent, async (tx) => { + tx.name = expectedTxName + try { + await producer.send({ + messages: [{ + topic, + key: 'user-123', + value: JSON.stringify({ name: 'John', action: 'login' }), + headers: { source: 'web-app' } + }] + }) + } catch (error) { + t.assert.fail('should not have generated an error: ' + error.message) + } finally { + tx.end() + } + }) +})