Skip to content

Commit 6ddf259

Browse files
committed
NuOpenLineageLog - Adding or setting log level to info for debugging the metrics degradation issue. This version should not be merged to main
1 parent 44ebf0d commit 6ddf259

File tree

3 files changed

+45
-34
lines changed

3 files changed

+45
-34
lines changed

integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ public class NuEventEmitter {
3535
private static Boolean isPermittedJobType(RunEvent event) {
3636
String jobType = event.getJob().getFacets().getJobType().getJobType();
3737
if (WANTED_JOB_TYPES.stream().noneMatch(jobType::equals)) {
38-
log.debug("OpenLineage event with job type {} has no lineage value and should not be emitted", jobType);
38+
log.info("NuOpenLineageLog: OpenLineage event with job type {} has no lineage value and should not be emitted", jobType);
3939
return false;
4040
}
4141
return true;
4242
}
4343

4444
private static Boolean isPermitedEventType(RunEvent event) {
4545
if (RUNNING.equals(event.getEventType())) {
46-
log.debug("OpenLineage event is {} and should not be emitted", RUNNING);
46+
log.info("NuOpenLineageLog: OpenLineage event is {} and should not be emitted", RUNNING);
4747
return false;
4848
}
4949
return true;
@@ -52,11 +52,11 @@ private static Boolean isPermitedEventType(RunEvent event) {
5252
private static Boolean isPermittedJobName(RunEvent event) {
5353
String jobName = event.getJob().getName();
5454
if (isNull(jobName)) {
55-
log.debug("OpenLineage event has no job name and should not be emitted");
55+
log.info("NuOpenLineageLog: OpenLineage event has no job name and should not be emitted");
5656
return false;
5757
}
5858
if (WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) {
59-
log.debug("OpenLineage event job name {} has no permitted substring and should not be emitted", jobName);
59+
log.info("NuOpenLineageLog: OpenLineage event job name {} has no permitted substring and should not be emitted", jobName);
6060
return false;
6161
}
6262
return true;
@@ -83,27 +83,34 @@ private static void discardColumnLineageFacet(RunEvent event) {
8383
.collect(Collectors.toList())
8484
.forEach(dataset -> {
8585
try {
86-
log.debug("Discarding column lineage facet for dataset {} {} {}",
86+
log.info("NuOpenLineageLog: Discarding column lineage facet for dataset {} {} {}",
8787
dataset.getClass().getSimpleName(), dataset.getNamespace(), dataset.getName());
8888
columnLineageFacetField.set(dataset.getFacets(), null);
8989
} catch (IllegalAccessException e) {
90-
log.error("Failed to discard column lineage facet", e);
90+
log.info("NuOpenLineageLog: Failed to discard column lineage facet", e);
9191
}
9292
});
9393
} catch (NoSuchFieldException e) {
94-
log.error("Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e);
94+
log.info("NuOpenLineageLog: Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e);
9595
}
9696
}
9797

9898
public static void emit(RunEvent event, EventEmitter eventEmitter) {
99+
log.info("NuOpenLineageLog: Begin: Emitting OpenLineage event {} with job name {} and job type {}",
100+
event.getEventType(), event.getJob().getName(), event.getJob().getFacets().getJobType().getJobType());
99101
if (!shouldEmit(event)) {
102+
log.info("NuOpenLineageLog: OpenLineage event {} has no lineage value and should not be emitted", event.getEventType());
100103
return;
101104
}
102105

103106
if (shouldDiscardColumnLineageFacet(event.getEventType())) {
107+
log.info("NuOpenLineageLog: Discarding column lineage facet for event {}", event.getEventType());
104108
discardColumnLineageFacet(event);
105109
}
106110

107111
eventEmitter.emit(event);
112+
113+
log.info("NuOpenLineageLog: End: Emitting OpenLineage event {} with job name {} and job type {}",
114+
event.getEventType(), event.getJob().getName(), event.getJob().getFacets().getJobType().getJobType());
108115
}
109116
}

integration/spark/app/src/main/java/io/openlineage/spark/agent/OpenLineageSparkListener.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public void onOtherEvent(SparkListenerEvent event) {
111111

112112
/** called by the SparkListener when a spark-sql (Dataset api) execution starts */
113113
private void sparkSQLExecStart(SparkListenerSQLExecutionStart startEvent) {
114+
log.info("NuOpenLineageLog: sparkSQLExecStart with activeJobId {}", activeJobId);
114115
getSparkSQLExecutionContext(startEvent.executionId())
115116
.ifPresent(
116117
context -> {
@@ -126,7 +127,7 @@ private void sparkSQLExecStart(SparkListenerSQLExecutionStart startEvent) {
126127

127128
/** called by the SparkListener when a spark-sql (Dataset api) execution ends */
128129
private void sparkSQLExecEnd(SparkListenerSQLExecutionEnd endEvent) {
129-
log.debug("sparkSQLExecEnd with activeJobId {}", activeJobId);
130+
log.info("NuOpenLineageLog: sparkSQLExecEnd with activeJobId {}", activeJobId);
130131
ExecutionContext context = sparkSqlExecutionRegistry.remove(endEvent.executionId());
131132
meterRegistry.counter("openlineage.spark.event.sql.end").increment();
132133
if (context != null) {
@@ -157,7 +158,7 @@ public void onJobStart(SparkListenerJobStart jobStart) {
157158
return;
158159
}
159160
activeJobId = Optional.of(jobStart.jobId());
160-
log.debug("onJobStart called {}", jobStart);
161+
log.info("NuOpenLineageLog: onJobStart called {}", jobStart);
161162
initializeContextFactoryIfNotInitialized();
162163
meterRegistry.counter("openlineage.spark.event.job.start").increment();
163164
Optional<ActiveJob> activeJob =
@@ -218,6 +219,7 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) {
218219
if (isDisabled) {
219220
return;
220221
}
222+
log.info("NuOpenLineageLog: onJobEnd called {}", jobEnd);
221223
ExecutionContext context = rddExecutionRegistry.remove(jobEnd.jobId());
222224
meterRegistry.counter("openlineage.spark.event.job.end").increment();
223225
circuitBreaker.run(
@@ -237,7 +239,7 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
237239
if (isDisabled || sparkVersion.startsWith("2")) {
238240
return;
239241
}
240-
log.debug("onTaskEnd {}", taskEnd);
242+
log.info("NuOpenLineageLog: onTaskEnd called {}", taskEnd);
241243
jobMetrics.addMetrics(taskEnd.stageId(), taskEnd.taskMetrics());
242244
}
243245

@@ -284,6 +286,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
284286
if (isDisabled) {
285287
return;
286288
}
289+
log.info("NuOpenLineageLog: onApplicationEnd called {}", applicationEnd);
287290
meterRegistry.counter("openlineage.spark.event.app.end").increment();
288291
meterRegistry
289292
.counter("openlineage.spark.event.app.end.memoryusage")
@@ -308,6 +311,7 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
308311
if (isDisabled) {
309312
return;
310313
}
314+
log.info("NuOpenLineageLog: onApplicationStart called {}", applicationStart);
311315
initializeContextFactoryIfNotInitialized(applicationStart.appName());
312316
meterRegistry.counter("openlineage.spark.event.app.start").increment();
313317
meterRegistry

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ public SparkSQLExecutionContext(
7979

8080
@Override
8181
public void start(SparkListenerSQLExecutionStart startEvent) {
82-
if (log.isDebugEnabled()) {
83-
log.debug("SparkListenerSQLExecutionStart - executionId: {}", startEvent.executionId());
84-
}
82+
// if (log.isDebugEnabled()) {
83+
log.info("NuOpenLineageLog: SparkListenerSQLExecutionStart - executionId: {}", startEvent.executionId());
84+
// }
8585
if (!olContext.getQueryExecution().isPresent()) {
8686
log.info(NO_EXECUTION_INFO, olContext);
8787
return;
8888
} else if (EventFilterUtils.isDisabled(olContext, startEvent)) {
8989
log.info(
90-
"OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart");
90+
"NuOpenLineageLog: OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart");
9191
// return;
9292
}
9393
olContext.setActiveJobId(activeJobId);
@@ -110,15 +110,15 @@ public void start(SparkListenerSQLExecutionStart startEvent) {
110110
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
111111
.build());
112112

113-
log.debug("Posting event for start {}: {}", executionId, event);
113+
log.info("NuOpenLineageLog: SparkListenerSQLExecutionStart Posting event for start {}: {}", executionId, event);
114114
NuEventEmitter.emit(event, eventEmitter);
115115
}
116116

117117
@Override
118118
public void end(SparkListenerSQLExecutionEnd endEvent) {
119-
if (log.isDebugEnabled()) {
120-
log.debug("SparkListenerSQLExecutionEnd - executionId: {}", endEvent.executionId());
121-
}
119+
// if (log.isDebugEnabled()) {
120+
log.info("NuOpenLineageLog: SparkListenerSQLExecutionEnd - executionId: {}", endEvent.executionId());
121+
// }
122122
// TODO: can we get failed event here?
123123
// If not, then we probably need to use this only for LogicalPlans that emit no Job events.
124124
// Maybe use QueryExecutionListener?
@@ -128,7 +128,7 @@ public void end(SparkListenerSQLExecutionEnd endEvent) {
128128
return;
129129
} else if (EventFilterUtils.isDisabled(olContext, endEvent)) {
130130
log.info(
131-
"OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionEnd");
131+
"NuOpenLineageLog: OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionEnd");
132132
// return;
133133
}
134134

@@ -157,9 +157,9 @@ public void end(SparkListenerSQLExecutionEnd endEvent) {
157157
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
158158
.build());
159159

160-
if (log.isDebugEnabled()) {
161-
log.debug("Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event));
162-
}
160+
// if (log.isDebugEnabled()) {
161+
log.info("NuOpenLineageLog: SparkListenerSQLExecutionEnd Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event));
162+
// }
163163
NuEventEmitter.emit(event, eventEmitter);
164164
}
165165

@@ -171,7 +171,7 @@ public void start(SparkListenerStageSubmitted stageSubmitted) {
171171
return;
172172
} else if (EventFilterUtils.isDisabled(olContext, stageSubmitted)) {
173173
log.info(
174-
"OpenLineage received Spark event that is configured to be skipped: SparkListenerStageSubmitted");
174+
"NuOpenLineageLog: OpenLineage received Spark event that is configured to be skipped: SparkListenerStageSubmitted");
175175
return;
176176
}
177177

@@ -190,7 +190,7 @@ public void start(SparkListenerStageSubmitted stageSubmitted) {
190190
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
191191
.build());
192192

193-
log.debug("Posting event for stage submitted {}: {}", executionId, event);
193+
log.info("NuOpenLineageLog: SparkListenerStageSubmitted Posting event for stage submitted {}: {}", executionId, event);
194194
NuEventEmitter.emit(event, eventEmitter);
195195
}
196196

@@ -202,7 +202,7 @@ public void end(SparkListenerStageCompleted stageCompleted) {
202202
return;
203203
} else if (EventFilterUtils.isDisabled(olContext, stageCompleted)) {
204204
log.info(
205-
"OpenLineage received Spark event that is configured to be skipped: SparkListenerStageCompleted");
205+
"NuOpenLineageLog: OpenLineage received Spark event that is configured to be skipped: SparkListenerStageCompleted");
206206
return;
207207
}
208208
RunEvent event =
@@ -220,7 +220,7 @@ public void end(SparkListenerStageCompleted stageCompleted) {
220220
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
221221
.build());
222222

223-
log.debug("Posting event for stage completed {}: {}", executionId, event);
223+
log.info("NuOpenLineageLog: Posting event for stage completed {}: {}", executionId, event);
224224
NuEventEmitter.emit(event, eventEmitter);
225225
}
226226

@@ -238,18 +238,18 @@ public void setActiveJobId(Integer activeJobId) {
238238
public void setActiveJob(ActiveJob activeJob) {
239239
olContext.setActiveJobId(activeJob.jobId());
240240
runEventBuilder.registerJob(activeJob);
241-
log.debug("Registering jobId: {} into runUid: {}", activeJob, olContext.getRunUuid());
241+
log.info("Registering jobId: {} into runUid: {}", activeJob, olContext.getRunUuid());
242242
}
243243

244244
@Override
245245
public void start(SparkListenerJobStart jobStart) {
246-
log.debug("SparkListenerJobStart - executionId: {}", executionId);
246+
log.info("NuOpenLineageLog: SparkListenerJobStart - executionId: {}", executionId);
247247
if (!olContext.getQueryExecution().isPresent()) {
248248
log.info(NO_EXECUTION_INFO, olContext);
249249
return;
250250
} else if (EventFilterUtils.isDisabled(olContext, jobStart)) {
251251
log.info(
252-
"OpenLineage received Spark event that is configured to be skipped: SparkListenerJobStart");
252+
"NuOpenLineageLog: OpenLineage received Spark event that is configured to be skipped: SparkListenerJobStart");
253253
// return;
254254
}
255255

@@ -273,16 +273,16 @@ public void start(SparkListenerJobStart jobStart) {
273273
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
274274
.build());
275275

276-
log.debug("Posting event for start {}: {}", executionId, event);
276+
log.info("NuOpenLineageLog: SparkListenerJobStart Posting event for start {}: {}", executionId, event);
277277
NuEventEmitter.emit(event, eventEmitter);
278278
}
279279

280280
@Override
281281
public void end(SparkListenerJobEnd jobEnd) {
282-
log.debug("SparkListenerJobEnd - executionId: {}", executionId);
282+
log.info("NuOpenLineageLog: SparkListenerJobEnd - executionId: {}", executionId);
283283
olContext.setActiveJobId(jobEnd.jobId());
284284
if (!finished.compareAndSet(false, true)) {
285-
log.debug("Event already finished, returning");
285+
log.info("NuOpenLineageLog: Event already finished, returning");
286286
return;
287287
}
288288

@@ -291,7 +291,7 @@ public void end(SparkListenerJobEnd jobEnd) {
291291
return;
292292
} else if (EventFilterUtils.isDisabled(olContext, jobEnd)) {
293293
log.info(
294-
"OpenLineage received Spark event that is configured to be skipped: SparkListenerJobEnd");
294+
"NuOpenLineageLog: OpenLineage received Spark event that is configured to be skipped: SparkListenerJobEnd");
295295
// return;
296296
}
297297

@@ -322,7 +322,7 @@ public void end(SparkListenerJobEnd jobEnd) {
322322
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
323323
.build());
324324

325-
log.debug("Posting event for end {}: {}", executionId, event);
325+
log.info("NuOpenLineageLog: SparkListenerJobEnd Posting event for end {}: {}", executionId, event);
326326
NuEventEmitter.emit(event, eventEmitter);
327327
}
328328

0 commit comments

Comments
 (0)