Skip to content

关于数据同步过程中对于源表指定字段的最大值的记录 #9999

@youlindu

Description

@youlindu

在使用zeta引擎进行分布式执行任务的过程中,如何记录一个指定的源表增量字段的最大值。
比如我指定了一个源表的字段incre_column,想知道任务执行成功后本次执行中该字段的最大值是多少,并记录到我自己的业务表里,这样下次执行时就可以只抽取incre_column大于这个值的记录了。
要实现这一功能,是不是可以修改seatunnel的所有source连接器,在其每读取一条数据后,更新incre_column的最大值,等任务执行成功后,调用我自己的接口来记录该最大值。由于任务是多节点执行,是不是每个节点都会产生一个incre_column最大值,我提供的接口里需要根据任务id来判断哪个是incre_column真正的最大值并进行更新。
另外,我看到seatunnel中有一个DynamicCompile的transformer,可以在这个里面使用java代码获取到读取的所有数据的incre_column最大值吗,然后想办法把它传给我的接口。比如直接在DynamicCompile中调用接口,但由于我在DynamicCompile中不知道jobId(可以获取吗),没法把jobId传给接口,所以接口中没办法正确更新incre_column最大值。
或者在DynamicCompile中发送一个事件(可以实现吗),将incre_column最大值传给该事件,最后事件处理器中调用我的接口,将incre_column最大值和jobId传给接口。
我对seatunnel中的各项机制及代码不甚熟悉,请问这个功能有什么好的实现方式,我上述的思路有可行性吗,如有赐教,不胜感激。

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions