Skip to content

Commit e566daf

Browse files
committed
Checkpoint 61 - fix table initialization
1 parent db61821 commit e566daf

File tree

4 files changed

+182
-153
lines changed

4 files changed

+182
-153
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java

Lines changed: 56 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import org.apache.flink.cdc.common.event.CreateTableEvent;
2121
import org.apache.flink.cdc.common.event.TableId;
2222
import org.apache.flink.cdc.common.schema.Schema;
23-
import org.apache.flink.cdc.connectors.hudi.sink.event.CreateTableOperatorEvent;
2423
import org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent;
2524
import org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent;
25+
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent.CreateTableRequest;
26+
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent.CreateTableResponse;
27+
import org.apache.flink.cdc.connectors.hudi.sink.event.TableAwareCorrespondent.MultiTableInstantTimeRequest;
2628
import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
2729
import org.apache.flink.configuration.Configuration;
2830
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -96,29 +98,6 @@ public class MultiTableStreamWriteOperatorCoordinator extends StreamWriteOperato
9698
private static final Logger LOG =
9799
LoggerFactory.getLogger(MultiTableStreamWriteOperatorCoordinator.class);
98100

99-
/**
100-
* A custom coordination request that includes the TableId to request an instant for a specific
101-
* table.
102-
*/
103-
public static class MultiTableInstantTimeRequest implements CoordinationRequest, Serializable {
104-
private static final long serialVersionUID = 1L;
105-
private final long checkpointId;
106-
private final TableId tableId;
107-
108-
public MultiTableInstantTimeRequest(long checkpointId, TableId tableId) {
109-
this.checkpointId = checkpointId;
110-
this.tableId = tableId;
111-
}
112-
113-
public long getCheckpointId() {
114-
return checkpointId;
115-
}
116-
117-
public TableId getTableId() {
118-
return tableId;
119-
}
120-
}
121-
122101
/**
123102
* Encapsulates all state and resources for a single table. This simplifies management by
124103
* grouping related objects, making the coordinator logic cleaner and less prone to errors.
@@ -406,9 +385,20 @@ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
406385
Correspondent.InstantTimeResponse.getInstance(
407386
instantTime)));
408387
},
409-
"handling instant time request for checkpoint %d",
388+
"Handling instant time request for checkpoint %d",
410389
((MultiTableInstantTimeRequest) request).getCheckpointId());
411390
return future;
391+
} else if (request instanceof CreateTableRequest) {
392+
CompletableFuture<CoordinationResponse> future = new CompletableFuture<>();
393+
executor.execute(
394+
() -> {
395+
boolean isSuccess = handleCreateTableEvent(((CreateTableRequest) request));
396+
future.complete(
397+
CoordinationResponseSerDe.wrap(new CreateTableResponse(isSuccess)));
398+
},
399+
"Handling create table request: ",
400+
request);
401+
return future;
412402
} else {
413403
LOG.warn("Received an unknown coordination request: {}", request.getClass().getName());
414404
return super.handleCoordinationRequest(request);
@@ -433,9 +423,7 @@ public void handleEventFromOperator(
433423
int subtask, int attemptNumber, OperatorEvent operatorEvent) {
434424
executor.execute(
435425
() -> {
436-
if (operatorEvent instanceof CreateTableOperatorEvent) {
437-
handleCreateTableEvent((CreateTableOperatorEvent) operatorEvent);
438-
} else if (operatorEvent instanceof SchemaChangeOperatorEvent) {
426+
if (operatorEvent instanceof SchemaChangeOperatorEvent) {
439427
handleSchemaChangeEvent((SchemaChangeOperatorEvent) operatorEvent);
440428
} else if (operatorEvent instanceof EnhancedWriteMetadataEvent) {
441429
handleEnhancedWriteMetadataEvent(
@@ -450,8 +438,8 @@ public void handleEventFromOperator(
450438
operatorEvent);
451439
}
452440

453-
private void handleCreateTableEvent(CreateTableOperatorEvent createTableOperatorEvent) {
454-
CreateTableEvent event = createTableOperatorEvent.getCreateTableEvent();
441+
private boolean handleCreateTableEvent(CreateTableRequest createTableRequest) {
442+
CreateTableEvent event = createTableRequest.getCreateTableEvent();
455443
TableId tableId = event.tableId();
456444

457445
// Store the schema for this table
@@ -461,37 +449,44 @@ private void handleCreateTableEvent(CreateTableOperatorEvent createTableOperator
461449
tableId,
462450
event.getSchema().getColumnCount());
463451

464-
tableContexts.computeIfAbsent(
465-
tableId,
466-
tId -> {
467-
LOG.info("New table detected: {}. Initializing Hudi resources.", tId);
468-
try {
469-
Configuration tableConfig = createTableSpecificConfig(tId);
470-
String tablePath = tableConfig.getString(FlinkOptions.PATH);
471-
pathToTableId.put(tablePath, tId);
472-
473-
// Create physical directory for Hudi table before initializing
474-
createHudiTablePath(tableConfig);
475-
476-
StreamerUtil.initTableIfNotExists(tableConfig);
477-
HoodieFlinkWriteClient<?> writeClient =
478-
FlinkWriteClients.createWriteClient(tableConfig);
479-
TableState tableState = new TableState(tableConfig);
480-
EventBuffers eventBuffers = EventBuffers.getInstance(tableConfig);
481-
482-
LOG.info(
483-
"Successfully initialized resources for table: {} at path: {}",
484-
tId,
485-
tablePath);
486-
return new TableContext(writeClient, eventBuffers, tableState, tablePath);
487-
} catch (Exception e) {
488-
LOG.error("Failed to initialize Hudi table resources for: {}", tId, e);
489-
context.failJob(
490-
new HoodieException(
491-
"Failed to initialize Hudi writer for table " + tId, e));
492-
return null;
493-
}
494-
});
452+
TableContext tableContext =
453+
tableContexts.computeIfAbsent(
454+
tableId,
455+
tId -> {
456+
LOG.info("New table detected: {}. Initializing Hudi resources.", tId);
457+
try {
458+
Configuration tableConfig = createTableSpecificConfig(tId);
459+
String tablePath = tableConfig.getString(FlinkOptions.PATH);
460+
pathToTableId.put(tablePath, tId);
461+
462+
// Create physical directory for Hudi table before initializing
463+
createHudiTablePath(tableConfig);
464+
465+
StreamerUtil.initTableIfNotExists(tableConfig);
466+
HoodieFlinkWriteClient<?> writeClient =
467+
FlinkWriteClients.createWriteClient(tableConfig);
468+
TableState tableState = new TableState(tableConfig);
469+
EventBuffers eventBuffers = EventBuffers.getInstance(tableConfig);
470+
471+
LOG.info(
472+
"Successfully initialized resources for table: {} at path: {}",
473+
tId,
474+
tablePath);
475+
return new TableContext(
476+
writeClient, eventBuffers, tableState, tablePath);
477+
} catch (Exception e) {
478+
LOG.error(
479+
"Failed to initialize Hudi table resources for: {}",
480+
tId,
481+
e);
482+
context.failJob(
483+
new HoodieException(
484+
"Failed to initialize Hudi writer for table " + tId,
485+
e));
486+
return null;
487+
}
488+
});
489+
return tableContext != null;
495490
}
496491

497492
/**

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/event/CreateTableOperatorEvent.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/event/TableAwareCorrespondent.java

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
package org.apache.flink.cdc.connectors.hudi.sink.event;
2020

21+
import org.apache.flink.cdc.common.event.CreateTableEvent;
2122
import org.apache.flink.cdc.common.event.TableId;
2223
import org.apache.flink.cdc.connectors.hudi.sink.coordinator.MultiTableStreamWriteOperatorCoordinator;
2324
import org.apache.flink.runtime.jobgraph.OperatorID;
2425
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
26+
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
27+
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
2528
import org.apache.flink.util.SerializedValue;
2629

2730
import org.apache.hudi.exception.HoodieException;
@@ -66,10 +69,8 @@ public static TableAwareCorrespondent getInstance(
6669
@Override
6770
public String requestInstantTime(long checkpointId) {
6871
try {
69-
MultiTableStreamWriteOperatorCoordinator.MultiTableInstantTimeRequest request =
70-
new MultiTableStreamWriteOperatorCoordinator.MultiTableInstantTimeRequest(
71-
checkpointId, tableId);
72-
72+
MultiTableInstantTimeRequest request =
73+
new MultiTableInstantTimeRequest(checkpointId, tableId);
7374
Correspondent.InstantTimeResponse response =
7475
CoordinationResponseSerDe.unwrap(
7576
this.gateway
@@ -83,4 +84,89 @@ public String requestInstantTime(long checkpointId) {
8384
e);
8485
}
8586
}
87+
88+
/**
89+
* A custom coordination request that includes the TableId to request an instant for a specific
90+
* table.
91+
*/
92+
public static class MultiTableInstantTimeRequest implements CoordinationRequest {
93+
private static final long serialVersionUID = 1L;
94+
private final long checkpointId;
95+
private final TableId tableId;
96+
97+
public MultiTableInstantTimeRequest(long checkpointId, TableId tableId) {
98+
this.checkpointId = checkpointId;
99+
this.tableId = tableId;
100+
}
101+
102+
public long getCheckpointId() {
103+
return checkpointId;
104+
}
105+
106+
public TableId getTableId() {
107+
return tableId;
108+
}
109+
}
110+
111+
/**
112+
* Send a request to coordinator to create a hudi table.
113+
*
114+
* @param createTableEvent The creating table event.
115+
* @return Whether the table is created successfully.
116+
*/
117+
public boolean requestCreatingTable(CreateTableEvent createTableEvent) {
118+
try {
119+
CreateTableRequest request = new CreateTableRequest(createTableEvent);
120+
CreateTableResponse response =
121+
CoordinationResponseSerDe.unwrap(
122+
this.gateway
123+
.sendRequestToCoordinator(
124+
this.operatorID, new SerializedValue<>(request))
125+
.get());
126+
return response.isSuccess();
127+
} catch (Exception e) {
128+
throw new HoodieException(
129+
"Error requesting the instant time from the coordinator for table " + tableId,
130+
e);
131+
}
132+
}
133+
134+
/**
135+
* An CoordinationRequest that encapsulates a {@link CreateTableEvent}.
136+
*
137+
* <p>This request is sent from the {@code MultiTableEventStreamWriteFunction} to the {@code
138+
* MultiTableStreamWriteOperatorCoordinator} to signal that a new table has been discovered in
139+
* the CDC stream. The coordinator uses this event to initialize all necessary resources for the
140+
* new table, such as its dedicated write client and event buffers, before any data is written.
141+
*/
142+
public static class CreateTableRequest implements CoordinationRequest {
143+
private static final long serialVersionUID = 1L;
144+
private final CreateTableEvent createTableEvent;
145+
146+
public CreateTableRequest(CreateTableEvent createtableEvent) {
147+
this.createTableEvent = createtableEvent;
148+
}
149+
150+
public CreateTableEvent getCreateTableEvent() {
151+
return createTableEvent;
152+
}
153+
154+
@Override
155+
public String toString() {
156+
return "CreateTableRequest{" + "tableId=" + createTableEvent.tableId() + '}';
157+
}
158+
}
159+
160+
public static class CreateTableResponse implements CoordinationResponse {
161+
private static final long serialVersionUID = 1L;
162+
private final boolean success;
163+
164+
public CreateTableResponse(boolean success) {
165+
this.success = success;
166+
}
167+
168+
public boolean isSuccess() {
169+
return success;
170+
}
171+
}
86172
}

0 commit comments

Comments
 (0)