Skip to content
This repository was archived by the owner on Mar 29, 2020. It is now read-only.

Commit 575eaa7

Browse files
committed
Merge remote-tracking branch 'cobli/pr-add-span-reporter'
2 parents 055868b + 54a9df3 commit 575eaa7

File tree

15 files changed

+565
-172
lines changed

15 files changed

+565
-172
lines changed

.travis.yml

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
1+
# Use container-based infrastructure
2+
sudo: false
3+
14
language: scala
2-
script:
3-
- ./travis-test.sh
5+
6+
jdk: oraclejdk8
7+
48
scala:
5-
- 2.11.8
6-
jdk:
7-
- oraclejdk8
8-
before_script:
9-
- mkdir $TRAVIS_BUILD_DIR/tmp
10-
- export SBT_OPTS="-Djava.io.tmpdir=$TRAVIS_BUILD_DIR/tmp"
11-
sudo: false
12-
9+
- 2.12.8
10+
11+
# These directories are cached to S3 at the end of the build
12+
cache:
13+
directories:
14+
- $HOME/.ivy2/cache
15+
- $HOME/.sbt/boot/
16+
17+
before_cache:
18+
# Cleanup the cached directories to avoid unnecessary cache updates
19+
- rm -fv $HOME/.ivy2/.sbt.ivy.lock
20+
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
21+
- find $HOME/.sbt -name "*.lock" -print -delete
22+
23+

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,19 @@ Kamon.addReporter(new DatadogAgentReporter())
3232
Kamon.addReporter(new DatadogAPIReporter())
3333
```
3434

35+
In order to get [APM tracing](https://docs.datadoghq.com/tracing/) working, add
36+
37+
```
38+
Kamon.addReporter(new DatadogSpanReporter())
39+
```
40+
3541
Configuration
3642
-------------
3743

3844
#### Agent Reporter
3945

4046
By default, the Agent reporter assumes that you have an instance of the Datadog Agent running in localhost and listening on
41-
port 8125. If that is not the case the you can use the `kamon.datadog.agent.hostname` and `kamon.datadog.agent.port` configuration
47+
port 8125. If that is not the case, you can use the `kamon.datadog.agent.hostname` and `kamon.datadog.agent.port` configuration
4248
keys to point the module at your Datadog Agent installation.
4349

4450
#### API Reporter
@@ -56,6 +62,15 @@ follows:
5662

5763
You can refer to the [Datadog documentation](https://docs.datadoghq.com/developers/metrics/#histograms) for more details.
5864

65+
66+
#### Span Reporter
67+
As [recommended](https://docs.datadoghq.com/api/?lang=python#tracing) , the Span reporter assumes that you have an instance of the Datadog Agent running in localhost and listening on
68+
port 8125. If that is not the case, you can use the `kamon.datadog.trace.http.api-url` configuration
69+
key to point the module at your Datadog Agent installation.
70+
71+
Don't forget to configure [actors filtering for message tracing](https://kamon.io/docs/latest/instrumentation/akka/tracing/)
72+
73+
5974
### Metric Units ###
6075

6176
Kamon keeps all timing measurements in nanoseconds and memory measurements in bytes. In order to scale those to other

build.sbt

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,29 +13,42 @@
1313
* =========================================================================================
1414
*/
1515

16-
import scalariform.formatter.preferences._
1716
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
17+
import scalariform.formatter.preferences._
18+
1819

19-
val kamonCore = "io.kamon" %% "kamon-core" % "1.1.6"
20-
val kamonTestKit = "io.kamon" %% "kamon-testkit" % "1.1.6"
21-
val asyncHttpClient = "com.squareup.okhttp3" % "okhttp" % "3.10.0"
20+
val kamonCore = "io.kamon" %% "kamon-core" % "1.1.6"
21+
val kamonTestKit = "io.kamon" %% "kamon-testkit" % "1.1.6"
22+
val asyncHttpClient = "com.squareup.okhttp3" % "okhttp" % "3.10.0"
23+
val asyncHttpClientMock = "com.squareup.okhttp3" % "mockwebserver" % "3.10.0"
2224

2325
lazy val root = (project in file("."))
2426
.settings(name := "kamon-datadog")
2527
.settings(
2628
libraryDependencies ++=
27-
compileScope(kamonCore, asyncHttpClient, scalaCompact.value) ++
28-
testScope(scalatest, slf4jApi, slf4jnop, kamonCore, kamonTestKit),
29+
compileScope(kamonCore, asyncHttpClient, scalaCompact.value, playJsonVersion.value) ++
30+
testScope(scalatest, slf4jApi, slf4jnop, kamonCore, kamonTestKit, asyncHttpClientMock),
2931
ScalariformKeys.preferences := formatSettings(ScalariformKeys.preferences.value))
3032

3133

34+
def playJsonVersion = Def.setting {
35+
scalaBinaryVersion.value match {
36+
case "2.10" => "com.typesafe.play" %% "play-json" % "2.4.11"
37+
case "2.12" | "2.11" => "com.typesafe.play" %% "play-json" % "2.6.9"
38+
}
39+
}
40+
41+
3242
def scalaCompact = Def.setting {
3343
scalaBinaryVersion.value match {
3444
case "2.10" | "2.11" => "org.scala-lang.modules" %% "scala-java8-compat" % "0.5.0"
3545
case "2.12" => "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0"
3646
}
3747
}
3848

49+
/* Changing Kamon configuration in real-time seems to turn tests unstable */
50+
parallelExecution in Test := false
51+
3952
def formatSettings(prefs: IFormattingPreferences) = prefs
4053
.setPreference(AlignParameters, true)
4154
.setPreference(AlignSingleLineCaseStatements, true)

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.2.8
1+
sbt.version=1.2.8

src/main/resources/reference.conf

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,39 @@ kamon {
2828
#
2929
http {
3030

31-
# Datadog API key to use to send metrics to datadog directly over HTTPS.
31+
api-url = "https://app.datadoghq.com/api/v1/series"
32+
33+
34+
# Datadog API key to use to send metrics to datadog directly over HTTPS.
3235
# If this is not set, metrics are sent as statsd packets over UDP to dogstatsd.
3336
api-key = ""
3437

35-
# The Datadog api endpoint to send the data to
36-
api-url = "https://app.datadoghq.com/api/v1/series?api_key="
38+
using-agent = false
3739

3840
connect-timeout = 5 seconds
3941
read-timeout = 5 seconds
4042
request-timeout = 5 seconds
4143
}
4244

45+
46+
#
47+
# Settings relevant to the DatadogSpanReporter
48+
#
49+
trace.http {
50+
51+
# Default to agent URL (https://docs.datadoghq.com/api/?lang=python#tracing)
52+
api-url = "http://localhost:8126/v0.3/traces"
53+
54+
api-key = ${kamon.datadog.http.api-key}
55+
56+
using-agent = true
57+
58+
connect-timeout = ${kamon.datadog.http.connect-timeout}
59+
read-timeout = ${kamon.datadog.http.read-timeout}
60+
request-timeout = ${kamon.datadog.http.request-timeout}
61+
}
62+
63+
4364
# All time values are collected in nanoseconds,
4465
# to scale before sending to datadog set "time-units" to "s" or "ms" or "µs".
4566
# Value "n" is equivalent to omitting the setting

src/main/scala/kamon/datadog/DatadogAPIReporter.scala

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,30 @@
1717
package kamon.datadog
1818

1919
import java.lang.StringBuilder
20+
import java.nio.charset.StandardCharsets
2021
import java.text.{ DecimalFormat, DecimalFormatSymbols }
2122
import java.time.Duration
2223
import java.util.Locale
23-
import java.util.concurrent.TimeUnit
2424

2525
import com.typesafe.config.Config
26-
import kamon.metric._
27-
import kamon.metric.MeasurementUnit
2826
import kamon.metric.MeasurementUnit.Dimension.{ Information, Time }
29-
import kamon.metric.MeasurementUnit.{ information, time }
27+
import kamon.metric.{ MeasurementUnit, MetricDistribution, MetricValue, PeriodSnapshot }
3028
import kamon.util.{ EnvironmentTagBuilder, Matcher }
3129
import kamon.{ Kamon, MetricReporter }
32-
import okhttp3.{ MediaType, OkHttpClient, Request, RequestBody }
3330
import org.slf4j.LoggerFactory
3431

32+
import scala.util.{ Failure, Success }
33+
3534
class DatadogAPIReporter extends MetricReporter {
3635
import DatadogAPIReporter._
3736

3837
private val logger = LoggerFactory.getLogger(classOf[DatadogAPIReporter])
3938
private val symbols = DecimalFormatSymbols.getInstance(Locale.US)
4039
symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.
4140

42-
private val jsonType = MediaType.parse("application/json; charset=utf-8")
4341
private val valueFormat = new DecimalFormat("#0.#########", symbols)
4442
private var configuration = readConfiguration(Kamon.config())
45-
private var httpClient: OkHttpClient = createHttpClient(configuration)
43+
private var httpClient: HttpClient = new HttpClient(configuration.httpConfig)
4644

4745
override def start(): Unit = {
4846
logger.info("Started the Datadog API reporter.")
@@ -54,24 +52,20 @@ class DatadogAPIReporter extends MetricReporter {
5452

5553
override def reconfigure(config: Config): Unit = {
5654
val newConfiguration = readConfiguration(config)
57-
httpClient = createHttpClient(readConfiguration(Kamon.config()))
5855
configuration = newConfiguration
56+
httpClient = new HttpClient(configuration.httpConfig)
5957
}
6058

6159
override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
62-
val url = configuration.apiUrl + configuration.apiKey
63-
val body = RequestBody.create(jsonType, buildRequestBody(snapshot))
64-
val request = new Request.Builder().url(url).post(body).build
65-
val response = httpClient.newCall(request).execute()
66-
67-
if (!response.isSuccessful()) {
68-
logger.error(s"Failed to POST metrics to Datadog ${url} with status code [${response.code()}], Body: [${response.body().string()}]")
60+
httpClient.doPost("application/json; charset=utf-8", buildRequestBody(snapshot)) match {
61+
case Failure(e) =>
62+
logger.error(e.getMessage)
63+
case Success(response) =>
64+
logger.info(response)
6965
}
70-
71-
response.close()
7266
}
7367

74-
private[datadog] def buildRequestBody(snapshot: PeriodSnapshot): String = {
68+
private[datadog] def buildRequestBody(snapshot: PeriodSnapshot): Array[Byte] = {
7569
val timestamp = snapshot.from.getEpochSecond.toString
7670

7771
val host = Kamon.environment.host
@@ -111,7 +105,9 @@ class DatadogAPIReporter extends MetricReporter {
111105
seriesBuilder
112106
.insert(0, "{\"series\":[")
113107
.append("]}")
114-
.toString
108+
.toString()
109+
.getBytes(StandardCharsets.UTF_8)
110+
115111
}
116112

117113
private def scale(value: Long, unit: MeasurementUnit): Double = unit.dimension match {
@@ -124,23 +120,10 @@ class DatadogAPIReporter extends MetricReporter {
124120
case _ => value.toDouble
125121
}
126122

127-
// Apparently okhttp doesn't require explicit closing of the connection
128-
private def createHttpClient(config: Configuration): OkHttpClient = {
129-
new OkHttpClient.Builder()
130-
.connectTimeout(config.connectTimeout.toMillis, TimeUnit.MILLISECONDS)
131-
.readTimeout(config.connectTimeout.toMillis, TimeUnit.MILLISECONDS)
132-
.writeTimeout(config.connectTimeout.toMillis, TimeUnit.MILLISECONDS).build()
133-
}
134-
135123
private def readConfiguration(config: Config): Configuration = {
136124
val datadogConfig = config.getConfig("kamon.datadog")
137-
138125
Configuration(
139-
apiUrl = datadogConfig.getString("http.api-url"),
140-
apiKey = datadogConfig.getString("http.api-key"),
141-
connectTimeout = datadogConfig.getDuration("http.connect-timeout"),
142-
readTimeout = datadogConfig.getDuration("http.read-timeout"),
143-
requestTimeout = datadogConfig.getDuration("http.request-timeout"),
126+
datadogConfig.getConfig("http"),
144127
timeUnit = readTimeUnit(datadogConfig.getString("time-unit")),
145128
informationUnit = readInformationUnit(datadogConfig.getString("information-unit")),
146129
// Remove the "host" tag since it gets added to the datadog payload separately
@@ -154,8 +137,7 @@ private object DatadogAPIReporter {
154137
val count = "count"
155138
val gauge = "gauge"
156139

157-
case class Configuration(apiUrl: String, apiKey: String, connectTimeout: Duration, readTimeout: Duration, requestTimeout: Duration,
158-
timeUnit: MeasurementUnit, informationUnit: MeasurementUnit, extraTags: Map[String, String], tagFilter: Matcher)
140+
case class Configuration(httpConfig: Config, timeUnit: MeasurementUnit, informationUnit: MeasurementUnit, extraTags: Map[String, String], tagFilter: Matcher)
159141

160142
implicit class QuoteInterp(val sc: StringContext) extends AnyVal {
161143
def quote(args: Any*): String = "\"" + sc.s(args: _*) + "\""
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package kamon.datadog
2+
3+
import java.time.Duration
4+
5+
import com.typesafe.config.Config
6+
import kamon.trace.IdentityProvider.Identifier
7+
import kamon.trace.{ IdentityProvider, Span }
8+
import kamon.{ Kamon, SpanReporter }
9+
import org.slf4j.LoggerFactory
10+
import play.api.libs.json.{ JsObject, Json }
11+
12+
trait KamonDataDogTranslator {
13+
def translate(span: Span.FinishedSpan): DdSpan
14+
}
15+
16+
object KamonDataDogTranslatorDefault$ extends KamonDataDogTranslator {
17+
def translate(span: Span.FinishedSpan): DdSpan = {
18+
val traceId = BigInt(span.context.traceID.string, 16)
19+
val spanId = BigInt(span.context.spanID.string, 16)
20+
21+
val parentId = span.context.parentID match {
22+
case IdentityProvider.NoIdentifier => None
23+
case Identifier(value, _) => Some(BigInt(value, 16))
24+
}
25+
26+
val name = span.tags.get("component")
27+
.getOrElse(Span.TagValue.String("kamon.trace"))
28+
.asInstanceOf[Span.TagValue.String]
29+
.string
30+
val resource = span.operationName
31+
val service = Kamon.environment.service
32+
val from = span.from
33+
val start = from.getEpochNano
34+
val duration = Duration.between(from, span.to)
35+
val marks = span.marks.map { m => m.key -> m.instant.getEpochNano }.toMap
36+
val tags = span.tags.map { m =>
37+
m._1 -> {
38+
m._2 match {
39+
case v: Span.TagValue.Boolean => {
40+
v.text match {
41+
case "true" => true
42+
case "false" => false
43+
}
44+
}
45+
case v: Span.TagValue.String => v.string
46+
case v: Span.TagValue.Number => v.number
47+
case null => null
48+
}
49+
}
50+
}
51+
val error: Boolean = tags.get("error") match {
52+
case Some(v: Boolean) => v
53+
case _ => false
54+
}
55+
val meta = marks ++ tags
56+
new DdSpan(traceId, spanId, parentId, name, resource, service, "custom", start, duration, meta, error)
57+
58+
}
59+
}
60+
61+
class DatadogSpanReporter extends SpanReporter {
62+
private val translator: KamonDataDogTranslator = KamonDataDogTranslatorDefault$
63+
private val logger = LoggerFactory.getLogger(classOf[DatadogAPIReporter])
64+
final private val httpConfigPath = "kamon.datadog.trace.http"
65+
private var httpClient = new HttpClient(Kamon.config().getConfig(httpConfigPath))
66+
67+
override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = if (spans.nonEmpty) {
68+
val spanList: List[Seq[JsObject]] = spans
69+
.map(span => translator.translate(span).toJson())
70+
.groupBy { _.\("trace_id").get.toString() }
71+
.values
72+
.toList
73+
httpClient.doJsonPut(Json.toJson(spanList))
74+
}
75+
76+
override def start(): Unit = {
77+
logger.info("Started the Kamon DataDog reporter")
78+
}
79+
80+
override def stop(): Unit = {
81+
logger.info("Stopped the Kamon DataDog reporter")
82+
}
83+
84+
override def reconfigure(config: Config): Unit = {
85+
logger.info("Reconfigured the Kamon DataDog reporter")
86+
httpClient = new HttpClient(config.getConfig(httpConfigPath))
87+
}
88+
89+
}

0 commit comments

Comments
 (0)