Skip to content

Commit 97df376

Browse files
committed
NuOpenLineageLog - Adding or setting log level to info for debugging the metrics degradation issue. This version should not be merged to main/master
1 parent 6ddf259 commit 97df376

File tree

2 files changed

+34
-16
lines changed

2 files changed

+34
-16
lines changed

client/java/src/main/java/io/openlineage/client/OpenLineageClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void emit(@NonNull OpenLineage.RunEvent runEvent) {
7777
}
7878
if (circuitBreaker.isPresent() && circuitBreaker.get().currentState().isClosed()) {
7979
engagedCircuitBreaker.set(1);
80-
log.warn("OpenLineageClient disabled with circuit breaker");
80+
log.warn("NuOpenLineageLog: OpenLineageClient: emit: OpenLineageClient disabled with circuit breaker");
8181
return;
8282
} else {
8383
engagedCircuitBreaker.set(0);

integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.openlineage.spark.agent;
22

33
import io.openlineage.client.OpenLineage;
4+
import io.openlineage.client.OpenLineageClientUtils;
45
import lombok.extern.slf4j.Slf4j;
56

67
import java.lang.reflect.Field;
@@ -19,6 +20,19 @@
1920
@Slf4j
2021
public class NuEventEmitter {
2122

23+
public static long estimateShallowSize() {
24+
return 24;
25+
}
26+
27+
public static long estimateDeepSize(String str) {
28+
long shallowSize = estimateShallowSize();
29+
boolean isLatin1 = str.chars().allMatch(ch -> ch <= 0xFF);
30+
long dataSize = isLatin1 ? str.length() : str.length() * 2L;
31+
long arraySize = 12 + 4 + dataSize;
32+
arraySize = (arraySize + 7) & ~7;
33+
return shallowSize + arraySize;
34+
}
35+
2236
private static final Set<String> WANTED_JOB_TYPES = new HashSet<>(
2337
Collections.singletonList(
2438
"SQL_JOB" // as defined in SparkSQLExecutionContext.SPARK_JOB_TYPE
@@ -35,15 +49,15 @@ public class NuEventEmitter {
3549
private static Boolean isPermittedJobType(RunEvent event) {
3650
String jobType = event.getJob().getFacets().getJobType().getJobType();
3751
if (WANTED_JOB_TYPES.stream().noneMatch(jobType::equals)) {
38-
log.info("NuOpenLineageLog: OpenLineage event with job type {} has no lineage value and should not be emitted", jobType);
52+
log.info("NuOpenLineageLog: NuEventEmitter: isPermittedJobType: OpenLineage event with job type {} has no lineage value and should not be emitted", jobType);
3953
return false;
4054
}
4155
return true;
4256
}
4357

44-
private static Boolean isPermitedEventType(RunEvent event) {
58+
private static Boolean isPermittedEventType(RunEvent event) {
4559
if (RUNNING.equals(event.getEventType())) {
46-
log.info("NuOpenLineageLog: OpenLineage event is {} and should not be emitted", RUNNING);
60+
log.info("NuOpenLineageLog: NuEventEmitter: isPermittedEventType: OpenLineage event is {} and should not be emitted", RUNNING);
4761
return false;
4862
}
4963
return true;
@@ -52,11 +66,11 @@ private static Boolean isPermitedEventType(RunEvent event) {
5266
private static Boolean isPermittedJobName(RunEvent event) {
5367
String jobName = event.getJob().getName();
5468
if (isNull(jobName)) {
55-
log.info("NuOpenLineageLog: OpenLineage event has no job name and should not be emitted");
69+
log.info("NuOpenLineageLog: NuEventEmitter: isPermittedJobName: OpenLineage event has no job name and should not be emitted");
5670
return false;
5771
}
5872
if (WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) {
59-
log.info("NuOpenLineageLog: OpenLineage event job name {} has no permitted substring and should not be emitted", jobName);
73+
log.info("NuOpenLineageLog: NuEventEmitter: isPermittedJobName: OpenLineage event job name {} has no permitted substring and should not be emitted", jobName);
6074
return false;
6175
}
6276
return true;
@@ -65,7 +79,7 @@ private static Boolean isPermittedJobName(RunEvent event) {
6579
private static Boolean shouldEmit(RunEvent event) {
6680
return Stream.of(
6781
isPermittedJobType(event),
68-
isPermitedEventType(event),
82+
isPermittedEventType(event),
6983
isPermittedJobName(event)
7084
).noneMatch(Boolean.FALSE::equals);
7185
}
@@ -83,34 +97,38 @@ private static void discardColumnLineageFacet(RunEvent event) {
8397
.collect(Collectors.toList())
8498
.forEach(dataset -> {
8599
try {
86-
log.info("NuOpenLineageLog: Discarding column lineage facet for dataset {} {} {}",
100+
log.info("NuOpenLineageLog: NuEventEmitter: discardColumnLineageFacet: Discarding column lineage facet for dataset {} {} {}",
87101
dataset.getClass().getSimpleName(), dataset.getNamespace(), dataset.getName());
88102
columnLineageFacetField.set(dataset.getFacets(), null);
89103
} catch (IllegalAccessException e) {
90-
log.info("NuOpenLineageLog: Failed to discard column lineage facet", e);
104+
log.info("NuOpenLineageLog: NuEventEmitter: discardColumnLineageFacet: Failed to discard column lineage facet", e);
91105
}
92106
});
93107
} catch (NoSuchFieldException e) {
94-
log.info("NuOpenLineageLog: Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e);
108+
log.info("NuOpenLineageLog: NuEventEmitter: discardColumnLineageFacet: Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e);
95109
}
96110
}
97111

98112
public static void emit(RunEvent event, EventEmitter eventEmitter) {
99-
log.info("NuOpenLineageLog: Begin: Emitting OpenLineage event {} with job name {} and job type {}",
100-
event.getEventType(), event.getJob().getName(), event.getJob().getFacets().getJobType().getJobType());
113+
String jsonEvent = OpenLineageClientUtils.toJson(event);
114+
double estimatedEventSize = estimateDeepSize(jsonEvent) / 1024.0;
115+
116+
log.info("NuOpenLineageLog: NuEventEmitter: emit: Begin: Emitting OpenLineage event {} with job name {} and job type {} and size {} KB",
117+
event.getEventType(), event.getJob().getName(), event.getJob().getFacets().getJobType().getJobType(), estimatedEventSize);
101118
if (!shouldEmit(event)) {
102-
log.info("NuOpenLineageLog: OpenLineage event {} has no lineage value and should not be emitted", event.getEventType());
119+
log.info("NuOpenLineageLog: NuEventEmitter: emit: OpenLineage event {} has no lineage value and should not be emitted", event.getEventType());
103120
return;
104121
}
105122

106123
if (shouldDiscardColumnLineageFacet(event.getEventType())) {
107-
log.info("NuOpenLineageLog: Discarding column lineage facet for event {}", event.getEventType());
124+
log.info("NuOpenLineageLog: NuEventEmitter: emit: Discarding column lineage facet for event {}", event.getEventType());
108125
discardColumnLineageFacet(event);
109126
}
110127

111128
eventEmitter.emit(event);
112129

113-
log.info("NuOpenLineageLog: End: Emitting OpenLineage event {} with job name {} and job type {}",
114-
event.getEventType(), event.getJob().getName(), event.getJob().getFacets().getJobType().getJobType());
130+
log.info("NuOpenLineageLog: NuEventEmitter: emit: End: Emitting OpenLineage event {} with job name {} and job type {} and size {} KB",
131+
event.getEventType(), event.getJob().getName(), event.getJob().getFacets().getJobType().getJobType(), estimatedEventSize);
115132
}
133+
116134
}

0 commit comments

Comments
 (0)