Skip to content

Commit 3dadb7f

Browse files
committed
Checkpoint 62 - fix flaky test
1 parent 314b7ef commit 3dadb7f

File tree

3 files changed

+133
-77
lines changed

3 files changed

+133
-77
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.flink.cdc.connectors.hudi.sink.coordinator;
1919

20-
import org.apache.flink.cdc.common.event.CreateTableEvent;
2120
import org.apache.flink.cdc.common.event.TableId;
2221
import org.apache.flink.cdc.common.schema.Schema;
2322
import org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent;
24-
import org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent;
2523
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent.CreateTableRequest;
26-
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent.CreateTableResponse;
2724
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent.MultiTableInstantTimeRequest;
25+
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent.SchemaChangeRequest;
26+
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent.SchemaChangeResponse;
2827
import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
2928
import org.apache.flink.configuration.Configuration;
3029
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -392,13 +391,30 @@ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
392391
CompletableFuture<CoordinationResponse> future = new CompletableFuture<>();
393392
executor.execute(
394393
() -> {
395-
boolean isSuccess = handleCreateTableEvent(((CreateTableRequest) request));
394+
CreateTableRequest createTableRequest = (CreateTableRequest) request;
395+
boolean isSuccess = handleCreateTableRequest(createTableRequest);
396396
future.complete(
397-
CoordinationResponseSerDe.wrap(new CreateTableResponse(isSuccess)));
397+
CoordinationResponseSerDe.wrap(
398+
SchemaChangeResponse.of(
399+
createTableRequest.getTableId(), isSuccess)));
398400
},
399401
"Handling create table request: ",
400402
request);
401403
return future;
404+
} else if (request instanceof SchemaChangeRequest) {
405+
CompletableFuture<CoordinationResponse> future = new CompletableFuture<>();
406+
executor.execute(
407+
() -> {
408+
SchemaChangeRequest createTableRequest = (SchemaChangeRequest) request;
409+
boolean isSuccess = handleSchemaChangeRequest(createTableRequest);
410+
future.complete(
411+
CoordinationResponseSerDe.wrap(
412+
SchemaChangeResponse.of(
413+
createTableRequest.getTableId(), isSuccess)));
414+
},
415+
"Handling create schema change request: ",
416+
request);
417+
return future;
402418
} else {
403419
LOG.warn("Received an unknown coordination request: {}", request.getClass().getName());
404420
return super.handleCoordinationRequest(request);
@@ -423,9 +439,7 @@ public void handleEventFromOperator(
423439
int subtask, int attemptNumber, OperatorEvent operatorEvent) {
424440
executor.execute(
425441
() -> {
426-
if (operatorEvent instanceof SchemaChangeOperatorEvent) {
427-
handleSchemaChangeEvent((SchemaChangeOperatorEvent) operatorEvent);
428-
} else if (operatorEvent instanceof EnhancedWriteMetadataEvent) {
442+
if (operatorEvent instanceof EnhancedWriteMetadataEvent) {
429443
handleEnhancedWriteMetadataEvent(
430444
(EnhancedWriteMetadataEvent) operatorEvent);
431445
} else {
@@ -438,16 +452,14 @@ public void handleEventFromOperator(
438452
operatorEvent);
439453
}
440454

441-
private boolean handleCreateTableEvent(CreateTableRequest createTableRequest) {
442-
CreateTableEvent event = createTableRequest.getCreateTableEvent();
443-
TableId tableId = event.tableId();
444-
455+
private boolean handleCreateTableRequest(CreateTableRequest createTableRequest) {
456+
TableId tableId = createTableRequest.getTableId();
445457
// Store the schema for this table
446-
tableSchemas.put(tableId, event.getSchema());
458+
tableSchemas.put(tableId, createTableRequest.getSchema());
447459
LOG.info(
448460
"Cached schema for table {}: {} columns",
449461
tableId,
450-
event.getSchema().getColumnCount());
462+
createTableRequest.getSchema().getColumnCount());
451463

452464
TableContext tableContext =
453465
tableContexts.computeIfAbsent(
@@ -493,28 +505,36 @@ private boolean handleCreateTableEvent(CreateTableRequest createTableRequest) {
493505
* Handles schema change events from the sink functions. Updates the cached schema and recreates
494506
* the write client to ensure it uses the new schema.
495507
*
496-
* @param event The schema change event containing the table ID and new schema
508+
* @param request The schema change request containing the table ID and new schema
497509
*/
498-
private void handleSchemaChangeEvent(SchemaChangeOperatorEvent event) {
499-
TableId tableId = event.getTableId();
500-
Schema newSchema = event.getNewSchema();
510+
private boolean handleSchemaChangeRequest(SchemaChangeRequest request) {
511+
TableId tableId = request.getTableId();
512+
Schema newSchema = request.getSchema();
501513

502514
LOG.info(
503515
"Received schema change event for table {}: {} columns",
504516
tableId,
505517
newSchema.getColumnCount());
506518

519+
Schema oldSchema = tableSchemas.get(tableId);
520+
if (Objects.equals(oldSchema, newSchema)) {
521+
LOG.warn("Schema change already applied, tableId: {}, schema: {}.", tableId, newSchema);
522+
return true;
523+
}
507524
// Update the cached schema
508525
tableSchemas.put(tableId, newSchema);
509-
LOG.info("Updated coordinator's schema cache for table: {}", tableId);
526+
LOG.info(
527+
"Updated coordinator's schema cache for table: {}, new schema: {}",
528+
tableId,
529+
newSchema);
510530

511531
// Get the existing table context
512532
TableContext oldContext = tableContexts.get(tableId);
513533
if (oldContext == null) {
514534
LOG.warn(
515535
"Received schema change for unknown table: {}. Skipping write client update.",
516536
tableId);
517-
return;
537+
return true;
518538
}
519539

520540
try {
@@ -543,6 +563,7 @@ private void handleSchemaChangeEvent(SchemaChangeOperatorEvent event) {
543563
tableContexts.put(tableId, newContext);
544564

545565
LOG.info("Successfully updated write client for table {} after schema change", tableId);
566+
return true;
546567
} catch (Exception e) {
547568
LOG.error("Failed to update write client for table {} after schema change", tableId, e);
548569
context.failJob(
@@ -551,6 +572,7 @@ private void handleSchemaChangeEvent(SchemaChangeOperatorEvent event) {
551572
+ tableId
552573
+ " after schema change",
553574
e));
575+
return false;
554576
}
555577
}
556578

@@ -736,7 +758,21 @@ private void commitInstantForTable(
736758
long checkpointId,
737759
String instant,
738760
WriteMetadataEvent[] eventBuffer) {
739-
761+
final HoodieTimeline completedTimeline =
762+
tableContext
763+
.writeClient
764+
.getHoodieTable()
765+
.getMetaClient()
766+
.getActiveTimeline()
767+
.filterCompletedInstants();
768+
if (completedTimeline.containsInstant(instant)) {
769+
LOG.info(
770+
"Instant {} already committed, table {}, checkpoint id: {}.",
771+
instant,
772+
tableId,
773+
checkpointId);
774+
return;
775+
}
740776
if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
741777
LOG.info("No events for instant {}, table {}. Resetting buffer.", instant, tableId);
742778
tableContext.eventBuffers.reset(checkpointId);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/event/TableAwareCorrespondent.java

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.cdc.common.event.CreateTableEvent;
2222
import org.apache.flink.cdc.common.event.TableId;
23+
import org.apache.flink.cdc.common.schema.Schema;
2324
import org.apache.flink.cdc.connectors.hudi.sink.coordinator.MultiTableStreamWriteOperatorCoordinator;
2425
import org.apache.flink.runtime.jobgraph.OperatorID;
2526
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -117,7 +118,31 @@ public TableId getTableId() {
117118
public boolean requestCreatingTable(CreateTableEvent createTableEvent) {
118119
try {
119120
CreateTableRequest request = new CreateTableRequest(createTableEvent);
120-
CreateTableResponse response =
121+
SchemaChangeResponse response =
122+
CoordinationResponseSerDe.unwrap(
123+
this.gateway
124+
.sendRequestToCoordinator(
125+
this.operatorID, new SerializedValue<>(request))
126+
.get());
127+
return response.isSuccess();
128+
} catch (Exception e) {
129+
throw new HoodieException(
130+
"Error requesting the instant time from the coordinator for table " + tableId,
131+
e);
132+
}
133+
}
134+
135+
/**
136+
* Send a request to coordinator to apply the schema change.
137+
*
138+
* @param tableId the id of table
139+
* @param newSchema the new table schema
140+
* @return Whether the schema change is applied successfully.
141+
*/
142+
public boolean requestSchemaChange(TableId tableId, Schema newSchema) {
143+
try {
144+
SchemaChangeRequest request = new SchemaChangeRequest(tableId, newSchema);
145+
SchemaChangeResponse response =
121146
CoordinationResponseSerDe.unwrap(
122147
this.gateway
123148
.sendRequestToCoordinator(
@@ -139,38 +164,70 @@ public boolean requestCreatingTable(CreateTableEvent createTableEvent) {
139164
* the CDC stream. The coordinator uses this event to initialize all necessary resources for the
140165
* new table, such as its dedicated write client and event buffers, before any data is written.
141166
*/
142-
public static class CreateTableRequest implements CoordinationRequest {
167+
public static class CreateTableRequest extends SchemaChangeRequest {
143168
private static final long serialVersionUID = 1L;
144-
private final CreateTableEvent createTableEvent;
145169

146-
public CreateTableRequest(CreateTableEvent createtableEvent) {
147-
this.createTableEvent = createtableEvent;
170+
public CreateTableRequest(CreateTableEvent createTableEvent) {
171+
super(createTableEvent.tableId(), createTableEvent.getSchema());
148172
}
173+
}
149174

150-
public CreateTableEvent getCreateTableEvent() {
151-
return createTableEvent;
175+
/**
176+
* A CoordinationRequest that represents a request to change table schema.
177+
*
178+
* <p>This request is sent from the {@code MultiTableEventStreamWriteFunction} to the {@code
179+
* MultiTableStreamWriteOperatorCoordinator} to signal that a schema change has been discovered
180+
* in the CDC stream.
181+
*/
182+
public static class SchemaChangeRequest implements CoordinationRequest {
183+
private static final long serialVersionUID = 1L;
184+
185+
private final TableId tableId;
186+
private final Schema schema;
187+
188+
public SchemaChangeRequest(TableId tableId, Schema schema) {
189+
this.tableId = tableId;
190+
this.schema = schema;
191+
}
192+
193+
public TableId getTableId() {
194+
return tableId;
195+
}
196+
197+
public Schema getSchema() {
198+
return schema;
152199
}
153200

154201
@Override
155202
public String toString() {
156-
return "CreateTableRequest{" + "tableId=" + createTableEvent.tableId() + '}';
203+
return "SchemaChangeRequest{" + "tableId=" + tableId + ", schema=" + schema + '}';
157204
}
158205
}
159206

160207
/**
161-
* Response for a {@link CreateTableRequest}. This response is sent from writer coordinator to
162-
* indicate whether the table is created successfully.
208+
* Response for a {@link CreateTableRequest} or {@link SchemaChangeRequest}. This response is
209+
* sent from writer coordinator to indicate whether the schema change is applied successfully.
163210
*/
164-
public static class CreateTableResponse implements CoordinationResponse {
211+
public static class SchemaChangeResponse implements CoordinationResponse {
165212
private static final long serialVersionUID = 1L;
213+
private final TableId tableId;
166214
private final boolean success;
167215

168-
public CreateTableResponse(boolean success) {
216+
private SchemaChangeResponse(TableId tableId, boolean success) {
217+
this.tableId = tableId;
169218
this.success = success;
170219
}
171220

172221
public boolean isSuccess() {
173222
return success;
174223
}
224+
225+
public TableId getTableId() {
226+
return tableId;
227+
}
228+
229+
public static SchemaChangeResponse of(TableId tableId, boolean success) {
230+
return new SchemaChangeResponse(tableId, success);
231+
}
175232
}
176233
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/MultiTableEventStreamWriteFunction.java

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.flink.cdc.common.utils.SchemaUtils;
3333
import org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent;
3434
import org.apache.flink.cdc.connectors.hudi.sink.event.HudiRecordEventSerializer;
35-
import org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent;
3635
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent;
3736
import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
3837
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
@@ -312,8 +311,7 @@ public void processSchemaChange(SchemaChangeEvent event) throws Exception {
312311

313312
// Notify coordinator about schema change so it can update its write client
314313
try {
315-
getOperatorEventGateway()
316-
.sendEventToCoordinator(new SchemaChangeOperatorEvent(tableId, newSchema));
314+
getTableAwareCorrespondent(tableId).requestSchemaChange(tableId, newSchema);
317315
LOG.info("Sent SchemaChangeOperatorEvent to coordinator for table: {}", tableId);
318316
} catch (Exception e) {
319317
LOG.error(
@@ -493,6 +491,9 @@ private ExtendedBucketStreamWriteFunction createTableFunction(TableId tableId)
493491

494492
try {
495493
tableFunction.initializeState(functionInitializationContext);
494+
if (this.checkpointId != -1) {
495+
tableFunction.setCheckpointId(this.checkpointId);
496+
}
496497
LOG.info("Successfully initialized state for table function: {}", tableId);
497498
} catch (Exception e) {
498499
LOG.error("Failed to initialize state for table function: {}", tableId, e);
@@ -566,61 +567,23 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
566567
LOG.info("Persisted {} schemas to state", schemaMaps.size());
567568
}
568569

569-
// Delegate snapshot to table functions
570-
// Child functions are composition objects, not Flink operators, so they shouldn't
571-
// go through the full snapshotState(FunctionSnapshotContext) lifecycle which
572-
// includes state reloading. Instead, we:
573-
// 1. Call their abstract snapshotState() to flush buffers
574-
// 2. Manually update their checkpointId for instant requests
575-
long checkpointId = context.getCheckpointId();
576570
for (Map.Entry<TableId, ExtendedBucketStreamWriteFunction> entry :
577571
tableFunctions.entrySet()) {
578572
try {
579573
ExtendedBucketStreamWriteFunction tableFunction = entry.getValue();
580574
LOG.debug(
581575
"Delegating snapshotState for table: {} with checkpointId: {}",
582576
entry.getKey(),
583-
checkpointId);
584-
585-
// Call abstract snapshotState() to flush buffers
586-
tableFunction.snapshotState();
587-
588-
// Update the child function's checkpointId
589-
// This is necessary because child functions need the current checkpointId
590-
// when requesting instants from the coordinator
591-
tableFunction.setCheckpointId(checkpointId);
592-
577+
context.getCheckpointId());
578+
tableFunction.snapshotState(context);
593579
LOG.debug("Successfully snapshotted state for table: {}", entry.getKey());
594580
} catch (Exception e) {
595581
LOG.error("Failed to snapshot state for table: {}", entry.getKey(), e);
596582
throw new RuntimeException(
597583
"Failed to snapshot state for table: " + entry.getKey(), e);
598584
}
599585
}
600-
}
601-
602-
protected void flushRemaining(boolean endInput) {
603-
boolean hasData = !tableFunctions.isEmpty();
604-
this.currentInstant = instantToWrite(hasData);
605-
606-
if (this.currentInstant == null) {
607-
if (hasData) {
608-
throw new RuntimeException(
609-
"No inflight instant when flushing data for multi-table function!");
610-
}
611-
}
612-
613-
LOG.debug(
614-
"Multi-table function requested instant: {} for {} table functions",
615-
this.currentInstant,
616-
tableFunctions.size());
617-
618-
// This method is intentionally overridden to be a no-op.
619-
// The MultiTableEventStreamWriteFunction is a dispatcher and does not have its own
620-
// data buffers to flush. Flushing is handled by the individual, table-specific
621-
// write functions it manages. Calling the parent's flushRemaining would cause
622-
// an erroneous, non-table-specific instant request to be sent to the coordinator,
623-
// resulting in the NullPointerException.
586+
this.checkpointId = context.getCheckpointId();
624587
}
625588

626589
@Override

0 commit comments

Comments
 (0)