|
19 | 19 | package org.apache.paimon.catalog; |
20 | 20 |
|
21 | 21 | import org.apache.paimon.CoreOptions; |
| 22 | +import org.apache.paimon.Snapshot; |
22 | 23 | import org.apache.paimon.TableType; |
23 | 24 | import org.apache.paimon.fs.FileIO; |
24 | 25 | import org.apache.paimon.fs.Path; |
25 | 26 | import org.apache.paimon.manifest.PartitionEntry; |
26 | 27 | import org.apache.paimon.options.Options; |
27 | 28 | import org.apache.paimon.partition.Partition; |
| 29 | +import org.apache.paimon.rest.exceptions.NotImplementedException; |
28 | 30 | import org.apache.paimon.schema.Schema; |
29 | 31 | import org.apache.paimon.schema.SchemaManager; |
30 | 32 | import org.apache.paimon.schema.TableSchema; |
@@ -323,6 +325,18 @@ private static List<Pair<Table, TableSnapshot>> toTableAndSnapshots( |
323 | 325 | snapshot = optional.get(); |
324 | 326 | } |
325 | 327 | } catch (Catalog.TableNotExistException ignored) { |
| 328 | + } catch (NotImplementedException e) { |
| 329 | + // does not support load external paimon table snapshot from rest server |
| 330 | + // construct TableSnapshot from local snapshot and table scan |
| 331 | + if (table instanceof FileStoreTable) { |
| 332 | + FileStoreTable fileStoreTable = (FileStoreTable) table; |
| 333 | + Snapshot lastSnapshot = table.latestSnapshot().orElse(null); |
| 334 | + if (lastSnapshot != null) { |
| 335 | + snapshot = tableSnapshotIdFromFileSystem(fileStoreTable, lastSnapshot); |
| 336 | + } |
| 337 | + } else { |
| 338 | + throw new NotImplementedException(e.getMessage()); |
| 339 | + } |
326 | 340 | } |
327 | 341 | } |
328 | 342 | tableAndSnapshots.add(Pair.of(table, snapshot)); |
@@ -427,4 +441,29 @@ private static IcebergTable toIcebergTable( |
427 | 441 | .comment(schema.comment()) |
428 | 442 | .build(); |
429 | 443 | } |
| 444 | + |
| 445 | + private static TableSnapshot tableSnapshotIdFromFileSystem( |
| 446 | + FileStoreTable fileStoreTable, Snapshot snapshot) { |
| 447 | + Long totalRecordCount = snapshot.totalRecordCount(); |
| 448 | + long recordCount = totalRecordCount != null ? totalRecordCount : 0L; |
| 449 | + long fileSizeInBytes = 0L; |
| 450 | + long fileCount = 0L; |
| 451 | + long lastFileCreationTime = 0L; |
| 452 | + |
| 453 | + List<PartitionEntry> partitionEntries = |
| 454 | + fileStoreTable.newSnapshotReader().withSnapshot(snapshot).partitionEntries(); |
| 455 | + if (partitionEntries != null) { |
| 456 | + for (PartitionEntry entry : partitionEntries) { |
| 457 | + if (entry != null) { |
| 458 | + fileSizeInBytes += entry.fileSizeInBytes(); |
| 459 | + fileCount += entry.fileCount(); |
| 460 | + lastFileCreationTime = |
| 461 | + Math.max(lastFileCreationTime, entry.lastFileCreationTime()); |
| 462 | + } |
| 463 | + } |
| 464 | + } |
| 465 | + |
| 466 | + return new TableSnapshot( |
| 467 | + snapshot, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime); |
| 468 | + } |
430 | 469 | } |
0 commit comments