Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,11 @@ module.exports = [

sharedConfig.configs.baselineNewRelicConfig,
newrelicConfigOverrides,
globalIgnores
globalIgnores,

{
rules: {
'accessor-pairs': 'off'
}
}
]
1 change: 1 addition & 0 deletions lib/subscriber-configs.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const subscribers = {
...require('./subscribers/openai/config'),
...require('./subscribers/pino/config'),
...require('./subscribers/pg/config'),
...require('./subscribers/platformatic-kafka/config'),
...require('./subscribers/undici/config')
}

Expand Down
8 changes: 7 additions & 1 deletion lib/subscribers/dc-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ const dc = require('node:diagnostics_channel')
*/

/**
* Provides a base interface for interacting with
* [Diagnostics Channel]{@link https://nodejs.org/docs/latest/api/diagnostics_channel.html#diagnostics-channel}
* instances. A diagnostics channel (DC) is _not_ a tracing channel. A DC
* is a simple channel with a singular event. This class is not suitable for
* interacting with libraries that publish _tracing channels_.
*
* @property {object} agent A New Relic Node.js agent instance.
* @property {ChannelDescriptor[]} channels The channels to subscribe to.
* @property {object} logger An agent logger instance.
Expand All @@ -46,7 +52,7 @@ class Subscriber {

set channels(channels) {
if (!Array.isArray(channels)) {
throw new Error('channels must be a collection of with propertiesof channel and hook')
throw new Error('channels must be a collection of ChannelDescriptor objects')
}
this._channels = channels
}
Expand Down
13 changes: 13 additions & 0 deletions lib/subscribers/platformatic-kafka/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright 2025 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

module.exports = {
'@platformatic/kafka': [{
path: './platformatic-kafka',
instrumentations: []
}]
}
126 changes: 126 additions & 0 deletions lib/subscribers/platformatic-kafka/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2025 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

// TODO: add test to cover once this is resolved https://github.com/platformatic/kafka/issues/155

const recorder = require('#agentlib/metrics/recorders/generic.js')
const {
TracingChannelSubscription,
TracingChannelSubscriber
} = require('../tracing-channel-subscriber.js')

const connectsSub = new TracingChannelSubscription({
channel: 'tracing:plt:kafka:connections:connects',
start(event) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This segment is getting named as Truncated/connect as well because segment is never touched

// TODO: is this invoked for every broker that was defined during
// initialization? If so, should we be creating a segment for each broker
// send?

const context = this.agent.tracer.getContext()
const { segment, transaction } = context

if (!segment || !transaction) {
this.logger.trace('Not capturing connection details due to missing transaction.')
return
}

const { topic, clientKind } = context.extras
const { host: broker } = event
this.agent.metrics
.getOrCreateMetric(`MessageBroker/Kafka/Nodes/${broker}/${clientKind}/${topic}`)
.incrementCallCount()

const externalSegment = this.agent.tracer.createSegment({
name: 'connect',
parent: segment,
recorder,
transaction
})
externalSegment.shimId = this.id
externalSegment.start()

return context.enterSegment({ segment: externalSegment })
},
error(event) {
const context = this.agent.tracer.getContext()
context?.transaction?.agent.errors.add(context.transaction, event.error)
}
})

const producerSendsSub = new TracingChannelSubscription({
channel: 'tracing:plt:kafka:producer:sends',
start(event) {
const context = this.agent.tracer.getContext()
const { segment, transaction } = context

if (!segment || !transaction) {
this.logger.trace('Not recording producer send due to missing transaction.')
return
}

// TODO: this module only has a single `.send` method that works like
// kafkajs's `sendBatch` method. We should probably figure out a better way
// to generate metrics for batches.
const firstMessage = event?.options?.messages?.[0]
const topic = firstMessage?.topic ?? 'unknown'

// Order of operations is:
// 1. `producer.send` is invoked which gets us to here
// 2. Connection is made to the remote system.
// 3. The `connections:connects` channel is triggered.
// 4. In the other channel, we need to the topic in order to generate the
// linking metric. So we have to store that information in the context.
context.extras.topic = topic
context.extras.clientKind = 'Produce'

const externalSegment = this.agent.tracer.createSegment({
name: `MessageBroker/Kafka/topic/Produce/Named/${topic}`,
parent: segment,
recorder,
transaction
})
segment.opaque = transaction.opaque
externalSegment.shimId = this.id
externalSegment.type = 'message'
externalSegment.start()

return context.enterSegment({ segment: externalSegment })
},
end() {
this.touch()
},

asyncStart() {
// TODO: are we getting the correct context? Do we need to do the crazy
// `isActiveTx` context propagation as we had to do in other cases?
this.touch()
},

asyncEnd() {
this.touch()
}
})

class PlatformaticKafkaSubscriber extends TracingChannelSubscriber {
constructor({ agent, logger, ...rest }) {
super({ agent, logger, packageName: '@platformatic/kafka', ...rest })
this.subscriptions = [
connectsSub,
producerSendsSub
]
}

/**
* Update the current context's timing tracker.
*/
touch() {
const context = this.agent.tracer.getContext()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't the segment you created above. it's already into the net library. I'm unclear why you created an entirely new subscriber. All that logic to bind the segment to tracing channel event, etc lives in our base subscriber. The only thing that had to be edited there was to change the id it subscribes to events. But this is your issue why you see Truncated names because the segment that's being created isn't getting touched. Doing this fixes your name though:

diff --git a/lib/subscribers/platformatic-kafka/index.js b/lib/subscribers/platformatic-kafka/index.js
index 851018df9..ec630fc9a 100644
--- a/lib/subscribers/platformatic-kafka/index.js
+++ b/lib/subscribers/platformatic-kafka/index.js
@@ -54,6 +54,7 @@ const connectsSub = new TracingChannelSubscription({
 const producerSendsSub = new TracingChannelSubscription({
   channel: 'tracing:plt:kafka:producer:sends',
   start(event) {
+    debugger
     const context = this.agent.tracer.getContext()
     const { segment, transaction } = context
 
@@ -88,20 +89,23 @@ const producerSendsSub = new TracingChannelSubscription({
     externalSegment.type = 'message'
     externalSegment.start()
 
+    event.segment = externalSegment
     return context.enterSegment({ segment: externalSegment })
   },
-  end() {
-    this.touch()
+  end(data) {
+    this.touch(data)
   },
 
-  asyncStart() {
+  asyncStart(data) {
+    debugger
     // TODO: are we getting the correct context? Do we need to do the crazy
     // `isActiveTx` context propagation as we had to do in other cases?
-    this.touch()
+    this.touch(data)
   },
 
-  asyncEnd() {
-    this.touch()
+  asyncEnd(data) {
+    debugger
+    this.touch(data)
   }
 })
 
@@ -117,9 +121,13 @@ class PlatformaticKafkaSubscriber extends TracingChannelSubscriber {
   /**
    * Update the current context's timing tracker.
    */
-  touch() {
-    const context = this.agent.tracer.getContext()
-    context?.segment?.touch()
+  touch(data) {
+    if (data.segment) {
+      data?.segment?.touch()
+    } else {
+      const context = this.agent.tracer.getContext()
+      context?.segment?.touch()
+    }
   }
 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unclear why you created an entirely new subscriber.

Because the existing code is all either tied to Orchestrion based channels or to baseline diagnostics channel. This module exports baseline tracing channel stuff. So we need something that handles that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but the only thing tied to orchestrion is

this.id = `${this.prefix}${this.packageName}:${this.channelName}`

context?.segment?.touch()
}
}

module.exports = PlatformaticKafkaSubscriber
124 changes: 124 additions & 0 deletions lib/subscribers/subscriber.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2025 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

/**
* This is an interface class. It defines the methods each subclass _must_
* implement and override in order for the diagnostics channel or tracing
* channel subscriptions to work.
*
* @property {object} agent A New Relic Node.js agent instance.
* @property {object} logger An agent logger instance.
* @property {object} config The agent configuration object.
* @property {string} packageName The name of the module being instrumented.
* This is the same string one would pass to the `require` function.
* @property {string} id An alias for `packageName`.
*
* @private
* @interface
*/
class Subscriber {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this interface class is meant to be the true parent of all three subscribers: Subscriber (base.js), DiagnosticsChannelSubscriber (dc-base.js) and now TracingChannelSubscriber?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly. The intention is to figure out the commonality once I figure out how all of the details here.

#agent
#config
#logger
#packageName

constructor({ agent, logger, packageName }) {
this.#agent = agent
this.#config = agent.config
this.#logger = logger.child({ component: `${packageName}-subscriber` })
this.#packageName = packageName
}

get [Symbol.toStringTag]() {
return 'Subscriber'
}

get agent() {
return this.#agent
}

get config() {
return this.#config
}

get id() {
return this.#packageName
}

get logger() {
return this.#logger
}

get packageName() {
return this.#packageName
}

/**
* Indicates if the subscription should be enabled or not. The most likely
* scenario is that an implementation will consult the agent configuration
* to determine the result.
*
* @returns {boolean} Subscriber is enabled or not.
*/
get enabled() {
throw Error('enabled is not implemented on class: ' + this.constructor.name)
}

/**
* Implementations should utilize subclass specific configuration or logic
* to enable the subscriber. This is basically a start-up lifecycle hook
* that the implementation can use to perform necessary actions, e.g.
* creating an asynchronous context and binding it to an appropriate channel.
*
* @returns {void | Function | boolean} Result of the enablement. Not likely
* to be used.
*/
enable() {
throw Error('enable is not implemented on class: ' + this.constructor.name)
}

/**
* The inverse of the `enable` method. It's basically an agent shutdown
* lifecycle hook. Any clean up logic required as a result of the work
* performed in the `enable` method should be hosted here.
*
* @returns {void | boolean} Result of the disablement. Not likely to be
* used.
*/
disable() {
throw Error('disable is not implemented on class: ' + this.constructor.name)
}

/**
* Classes must implement this method. It is expected to read some
* configuration data, specific to the subclass, and utilize it to
* perform the channel subscriptions.
*
* @returns {void}
*
* @example A basic "Diagnostics Channel" based method.
* const dc = require('node:diagnostics_channel')
* for (const sub of this.#subscriptions) {
* dc.subscribe(sub.channelName, sub.hook.bind(this))
* }
*/
subscribe() {
throw Error('subscribe is not implemented on class: ' + this.constructor.name)
}

/**
* The inverse of the `subscribe` method. This should iterate through the
* subscribed channels and issue any unsubscribe and clean-up logic for them.
*
* @returns {void}
*/
unsubscribe() {
throw Error('unsubscribe is not implemented on class: ' + this.constructor.name)
}
}

module.exports = Subscriber
Loading
Loading