Skip to content

Commit 290ccad

Browse files
author
wardli
committed
[Subtask]: Use a new configuration item to control whether master & slave mode is enabled. #3845
1 parent f5a403d commit 290ccad

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,25 @@ protected TaskWriter<Record> dataWriter() {
101101
@Override
102102
protected long targetSize() {
103103
long targetSize = super.targetSize();
104-
long inputSize =
105-
Arrays.stream(input.rewrittenDataFiles()).mapToLong(DataFile::fileSizeInBytes).sum();
106-
// When the input files’ total size is below targetSize, remove the output file size limit to
104+
DataFile[] rewrittenDataFiles = input.rewrittenDataFiles();
105+
if (rewrittenDataFiles == null || rewrittenDataFiles.length == 0) {
106+
return targetSize;
107+
}
108+
long inputSize = Arrays.stream(rewrittenDataFiles).mapToLong(DataFile::fileSizeInBytes).sum();
109+
// When the input files' total size is below targetSize, remove the output file size limit to
107110
// avoid outputting multiple files.
108111
// For more details, please refer to: https://github.com/apache/amoro/issues/3645
109-
return inputSize < targetSize ? Long.MAX_VALUE : targetSize;
112+
if (inputSize < targetSize) {
113+
return Long.MAX_VALUE;
114+
}
115+
// Even if total size >= targetSize, if average file size is small (less than targetSize),
116+
// we should still merge files to avoid outputting too many small files.
117+
// This ensures that many small files can be merged effectively.
118+
long averageFileSize = inputSize / rewrittenDataFiles.length;
119+
if (averageFileSize < targetSize) {
120+
return Long.MAX_VALUE;
121+
}
122+
return targetSize;
110123
}
111124

112125
private PartitionSpec fileSpec() {

amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractPartitionPlan.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,7 @@ public Weight getWeight() {
263263
/**
264264
* When splitTask has only one undersized segment file, it needs to be triggered again to
265265
* determine whether to rewrite pos. If needed, add it to rewritePosDataFiles and bin-packing
266-
* together. If it doesn't need to rewrite pos delete, add it to rewriteDataFiles so it can be
267-
* merged with fragment files or other files during the second bin-packing, which prevents file
268-
* loss and ensures proper file consolidation.
266+
* together, else reserved delete files.
269267
*/
270268
protected void disposeUndersizedSegmentFile(SplitTask splitTask) {
271269
Optional<DataFile> dataFile = splitTask.getRewriteDataFiles().stream().findFirst();
@@ -275,10 +273,7 @@ protected void disposeUndersizedSegmentFile(SplitTask splitTask) {
275273
if (evaluator().segmentShouldRewritePos(rewriteDataFile, deletes)) {
276274
rewritePosDataFiles.put(rewriteDataFile, deletes);
277275
} else {
278-
// Add to rewriteDataFiles so it can be merged with other files (fragment files, etc.)
279-
// during the second bin-packing. This prevents file loss and ensures files can be
280-
// consolidated when possible.
281-
rewriteDataFiles.put(rewriteDataFile, deletes);
276+
reservedDeleteFiles(deletes);
282277
}
283278
}
284279
}

0 commit comments

Comments
 (0)