-
Notifications
You must be signed in to change notification settings - Fork 421
feat: Instrument @platformatic/kafka #3486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| /* | ||
| * Copyright 2025 New Relic Corporation. All rights reserved. | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| 'use strict' | ||
|
|
||
| module.exports = { | ||
| '@platformatic/kafka': [{ | ||
| path: './platformatic-kafka', | ||
| instrumentations: [] | ||
| }] | ||
| } |
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -0,0 +1,126 @@ | ||||
| /* | ||||
| * Copyright 2025 New Relic Corporation. All rights reserved. | ||||
| * SPDX-License-Identifier: Apache-2.0 | ||||
| */ | ||||
|
|
||||
| 'use strict' | ||||
|
|
||||
| // TODO: add test to cover once this is resolved https://github.com/platformatic/kafka/issues/155 | ||||
|
|
||||
| const recorder = require('#agentlib/metrics/recorders/generic.js') | ||||
| const { | ||||
| TracingChannelSubscription, | ||||
| TracingChannelSubscriber | ||||
| } = require('../tracing-channel-subscriber.js') | ||||
|
|
||||
| const connectsSub = new TracingChannelSubscription({ | ||||
| channel: 'tracing:plt:kafka:connections:connects', | ||||
| start(event) { | ||||
| // TODO: is this invoked for every broker that was defined during | ||||
| // initialization? If so, should we be creating a segment for each broker | ||||
| // send? | ||||
|
|
||||
| const context = this.agent.tracer.getContext() | ||||
| const { segment, transaction } = context | ||||
|
|
||||
| if (!segment || !transaction) { | ||||
| this.logger.trace('Not capturing connection details due to missing transaction.') | ||||
| return | ||||
| } | ||||
|
|
||||
| const { topic, clientKind } = context.extras | ||||
| const { host: broker } = event | ||||
| this.agent.metrics | ||||
| .getOrCreateMetric(`MessageBroker/Kafka/Nodes/${broker}/${clientKind}/${topic}`) | ||||
| .incrementCallCount() | ||||
|
|
||||
| const externalSegment = this.agent.tracer.createSegment({ | ||||
| name: 'connect', | ||||
| parent: segment, | ||||
| recorder, | ||||
| transaction | ||||
| }) | ||||
| externalSegment.shimId = this.id | ||||
| externalSegment.start() | ||||
|
|
||||
| return context.enterSegment({ segment: externalSegment }) | ||||
| }, | ||||
| error(event) { | ||||
| const context = this.agent.tracer.getContext() | ||||
| context?.transaction?.agent.errors.add(context.transaction, event.error) | ||||
| } | ||||
| }) | ||||
|
|
||||
| const producerSendsSub = new TracingChannelSubscription({ | ||||
| channel: 'tracing:plt:kafka:producer:sends', | ||||
| start(event) { | ||||
| const context = this.agent.tracer.getContext() | ||||
| const { segment, transaction } = context | ||||
|
|
||||
| if (!segment || !transaction) { | ||||
| this.logger.trace('Not recording producer send due to missing transaction.') | ||||
| return | ||||
| } | ||||
|
|
||||
| // TODO: this module only has a single `.send` method that works like | ||||
| // kafkajs's `sendBatch` method. We should probably figure out a better way | ||||
| // to generate metrics for batches. | ||||
| const firstMessage = event?.options?.messages?.[0] | ||||
| const topic = firstMessage?.topic ?? 'unknown' | ||||
|
|
||||
| // Order of operations is: | ||||
| // 1. `producer.send` is invoked which gets us to here | ||||
| // 2. Connection is made to the remote system. | ||||
| // 3. The `connections:connects` channel is triggered. | ||||
| // 4. In the other channel, we need to the topic in order to generate the | ||||
| // linking metric. So we have to store that information in the context. | ||||
| context.extras.topic = topic | ||||
| context.extras.clientKind = 'Produce' | ||||
|
|
||||
| const externalSegment = this.agent.tracer.createSegment({ | ||||
| name: `MessageBroker/Kafka/topic/Produce/Named/${topic}`, | ||||
| parent: segment, | ||||
| recorder, | ||||
| transaction | ||||
| }) | ||||
| segment.opaque = transaction.opaque | ||||
| externalSegment.shimId = this.id | ||||
| externalSegment.type = 'message' | ||||
| externalSegment.start() | ||||
|
|
||||
| return context.enterSegment({ segment: externalSegment }) | ||||
| }, | ||||
| end() { | ||||
| this.touch() | ||||
| }, | ||||
|
|
||||
| asyncStart() { | ||||
| // TODO: are we getting the correct context? Do we need to do the crazy | ||||
| // `isActiveTx` context propagation as we had to do in other cases? | ||||
| this.touch() | ||||
| }, | ||||
|
|
||||
| asyncEnd() { | ||||
| this.touch() | ||||
| } | ||||
| }) | ||||
|
|
||||
| class PlatformaticKafkaSubscriber extends TracingChannelSubscriber { | ||||
| constructor({ agent, logger, ...rest }) { | ||||
| super({ agent, logger, packageName: '@platformatic/kafka', ...rest }) | ||||
| this.subscriptions = [ | ||||
| connectsSub, | ||||
| producerSendsSub | ||||
| ] | ||||
| } | ||||
|
|
||||
| /** | ||||
| * Update the current context's timing tracker. | ||||
| */ | ||||
| touch() { | ||||
| const context = this.agent.tracer.getContext() | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this isn't the segment you created above. it's already into the net library. I'm unclear why you created an entirely new subscriber. All that logic to bind the segment to tracing channel event, etc lives in our base subscriber. The only thing that had to be edited there was to change the id it subscribes to events. But this is your issue why you see Truncated names because the segment that's being created isn't getting touched. Doing this fixes your name though:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because the existing code is all either tied to Orchestrion based channels or to baseline diagnostics channel. This module exports baseline tracing channel stuff. So we need something that handles that.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right but the only thing tied to orchestrion is node-newrelic/lib/subscribers/base.js Line 78 in 63e531b
|
||||
| context?.segment?.touch() | ||||
| } | ||||
| } | ||||
|
|
||||
| module.exports = PlatformaticKafkaSubscriber | ||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| /* | ||
| * Copyright 2025 New Relic Corporation. All rights reserved. | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| 'use strict' | ||
|
|
||
| /** | ||
| * This is an interface class. It defines the methods each subclass _must_ | ||
| * implement and override in order for the diagnostics channel or tracing | ||
| * channel subscriptions to work. | ||
| * | ||
| * @property {object} agent A New Relic Node.js agent instance. | ||
| * @property {object} logger An agent logger instance. | ||
| * @property {object} config The agent configuration object. | ||
| * @property {string} packageName The name of the module being instrumented. | ||
| * This is the same string one would pass to the `require` function. | ||
| * @property {string} id An alias for `packageName`. | ||
| * | ||
| * @private | ||
| * @interface | ||
| */ | ||
| class Subscriber { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm assuming this interface class is meant to be the true parent of all three subscribers: Subscriber (base.js), DiagnosticsChannelSubscriber (dc-base.js) and now TracingChannelSubscriber?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possibly. The intention is to figure out the commonality once I figure out how all of the details here. |
||
| #agent | ||
| #config | ||
| #logger | ||
| #packageName | ||
|
|
||
| constructor({ agent, logger, packageName }) { | ||
| this.#agent = agent | ||
| this.#config = agent.config | ||
| this.#logger = logger.child({ component: `${packageName}-subscriber` }) | ||
| this.#packageName = packageName | ||
| } | ||
|
|
||
| get [Symbol.toStringTag]() { | ||
| return 'Subscriber' | ||
| } | ||
|
|
||
| get agent() { | ||
| return this.#agent | ||
| } | ||
|
|
||
| get config() { | ||
| return this.#config | ||
| } | ||
|
|
||
| get id() { | ||
| return this.#packageName | ||
| } | ||
|
|
||
| get logger() { | ||
| return this.#logger | ||
| } | ||
|
|
||
| get packageName() { | ||
| return this.#packageName | ||
| } | ||
|
|
||
| /** | ||
| * Indicates if the subscription should be enabled or not. The most likely | ||
| * scenario is that an implementation will consult the agent configuration | ||
| * to determine the result. | ||
| * | ||
| * @returns {boolean} Subscriber is enabled or not. | ||
| */ | ||
| get enabled() { | ||
| throw Error('enabled is not implemented on class: ' + this.constructor.name) | ||
| } | ||
|
|
||
| /** | ||
| * Implementations should utilize subclass specific configuration or logic | ||
| * to enable the subscriber. This is basically a start-up lifecycle hook | ||
| * that the implementation can use to perform necessary actions, e.g. | ||
| * creating an asynchronous context and binding it to an appropriate channel. | ||
| * | ||
| * @returns {void | Function | boolean} Result of the enablement. Not likely | ||
| * to be used. | ||
| */ | ||
| enable() { | ||
| throw Error('enable is not implemented on class: ' + this.constructor.name) | ||
| } | ||
|
|
||
| /** | ||
| * The inverse of the `enable` method. It's basically an agent shutdown | ||
| * lifecycle hook. Any clean up logic required as a result of the work | ||
| * performed in the `enable` method should be hosted here. | ||
| * | ||
| * @returns {void | boolean} Result of the disablement. Not likely to be | ||
| * used. | ||
| */ | ||
| disable() { | ||
| throw Error('disable is not implemented on class: ' + this.constructor.name) | ||
| } | ||
|
|
||
| /** | ||
| * Classes must implement this method. It is expected to read some | ||
| * configuration data, specific to the subclass, and utilize it to | ||
| * perform the channel subscriptions. | ||
| * | ||
| * @returns {void} | ||
| * | ||
| * @example A basic "Diagnostics Channel" based method. | ||
| * const dc = require('node:diagnostics_channel') | ||
| * for (const sub of this.#subscriptions) { | ||
| * dc.subscribe(sub.channelName, sub.hook.bind(this)) | ||
| * } | ||
| */ | ||
| subscribe() { | ||
| throw Error('subscribe is not implemented on class: ' + this.constructor.name) | ||
| } | ||
|
|
||
| /** | ||
| * The inverse of the `subscribe` method. This should iterate through the | ||
| * subscribed channels and issue any unsubscribe and clean-up logic for them. | ||
| * | ||
| * @returns {void} | ||
| */ | ||
| unsubscribe() { | ||
| throw Error('unsubscribe is not implemented on class: ' + this.constructor.name) | ||
| } | ||
| } | ||
|
|
||
| module.exports = Subscriber | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This segment is getting named as
Truncated/connectas well because segment is never touched