Skip to content

Commit 7f1b811

Browse files
authored
[sui-indexer-alt-framework] Address first_checkpoint + pipeline with no watermark row commit stall (#23209) (#23754)
## Description Currently, if `first_checkpoint` is configured and a pipeline with no watermark row is added to the indexer, it'll essentially stall since the pipeline's committer task creates a default watermark with checkpoint_hi_inclusive = 0 and waits for this checkpoint. This prevents someone from using `first_checkpoint` to arbitrarily seed the starting point of pipelines for the first time. Consequently, indexer operators have to manually update the watermarks table to set a starting point that isn't 0. In this PR, the indexer is now responsible for determining the `next_checkpoint` to commit and make watermark updates for each pipeline. In practice, for a valid `first_checkpoint` (`next_checkpoint`), sequential pipelines will wait to commit the next processed checkpoint after its watermark, while concurrent pipelines will start from the same `first_checkpoint`. Meanwhile, the pipeline tasks initialize a default watermark variable for the logger, but in practice will always be overwritten by the next processed checkpoint for metrics and other reporting. ## Test plan - Unit tests for the behavior above - commit_watermark and sequential committer tests --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: ## Description Describe the changes or additions included in this PR. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK:
1 parent 0cddd6b commit 7f1b811

File tree

10 files changed

+750
-230
lines changed

10 files changed

+750
-230
lines changed

crates/sui-indexer-alt-framework/src/lib.rs

Lines changed: 556 additions & 53 deletions
Large diffs are not rendered by default.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
// Copyright (c) Mysten Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
pub mod mock_store;
4+
pub mod store;

crates/sui-indexer-alt-framework/src/testing/mock_store.rs renamed to crates/sui-indexer-alt-framework/src/mocks/store.rs

Lines changed: 60 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ pub struct Failures {
5252
pub attempts: AtomicUsize,
5353
}
5454

55-
/// A mock store for testing. It maintains a map of checkpoint sequence numbers to transaction
56-
/// sequence numbers, and a watermark that can be used to test the watermark task.
55+
/// A mock store for testing. Represents an indexer with a single pipeline. It maintains a map of
56+
/// checkpoint sequence numbers to transaction sequence numbers, and a watermark that can be used to
57+
/// test the watermark task.
5758
#[derive(Clone, Default)]
5859
pub struct MockStore {
59-
/// Tracks various watermark states (committer, reader, pruner)
60-
pub watermarks: Arc<Mutex<MockWatermark>>,
60+
/// Tracks various watermark states (committer, reader, pruner). This value can be optional
61+
/// until a checkpoint is committed.
62+
pub watermark: Arc<Mutex<Option<MockWatermark>>>,
6163
/// Stores the actual data, mapping checkpoint sequence numbers to transaction sequence numbers
6264
pub data: Arc<Mutex<HashMap<u64, Vec<u64>>>>,
6365
/// Tracks the order of checkpoint processing for testing sequential processing
@@ -88,23 +90,23 @@ impl Connection for MockConnection<'_> {
8890
&mut self,
8991
_pipeline: &'static str,
9092
) -> Result<Option<CommitterWatermark>, anyhow::Error> {
91-
let watermarks = self.0.watermarks.lock().unwrap();
92-
Ok(Some(CommitterWatermark {
93-
epoch_hi_inclusive: watermarks.epoch_hi_inclusive,
94-
checkpoint_hi_inclusive: watermarks.checkpoint_hi_inclusive,
95-
tx_hi: watermarks.tx_hi,
96-
timestamp_ms_hi_inclusive: watermarks.timestamp_ms_hi_inclusive,
93+
let watermark = self.0.watermark();
94+
Ok(watermark.map(|w| CommitterWatermark {
95+
epoch_hi_inclusive: w.epoch_hi_inclusive,
96+
checkpoint_hi_inclusive: w.checkpoint_hi_inclusive,
97+
tx_hi: w.tx_hi,
98+
timestamp_ms_hi_inclusive: w.timestamp_ms_hi_inclusive,
9799
}))
98100
}
99101

100102
async fn reader_watermark(
101103
&mut self,
102104
_pipeline: &'static str,
103105
) -> Result<Option<ReaderWatermark>, anyhow::Error> {
104-
let watermarks = self.0.watermarks.lock().unwrap();
105-
Ok(Some(ReaderWatermark {
106-
checkpoint_hi_inclusive: watermarks.checkpoint_hi_inclusive,
107-
reader_lo: watermarks.reader_lo,
106+
let watermark = self.0.watermark();
107+
Ok(watermark.map(|w| ReaderWatermark {
108+
checkpoint_hi_inclusive: w.checkpoint_hi_inclusive,
109+
reader_lo: w.reader_lo,
108110
}))
109111
}
110112

@@ -113,17 +115,19 @@ impl Connection for MockConnection<'_> {
113115
_pipeline: &'static str,
114116
delay: Duration,
115117
) -> Result<Option<PrunerWatermark>, anyhow::Error> {
116-
let watermarks = self.0.watermarks.lock().unwrap();
117-
let elapsed_ms = watermarks.pruner_timestamp as i64
118-
- SystemTime::now()
119-
.duration_since(UNIX_EPOCH)
120-
.unwrap()
121-
.as_millis() as i64;
122-
let wait_for_ms = delay.as_millis() as i64 + elapsed_ms;
123-
Ok(Some(PrunerWatermark {
124-
pruner_hi: watermarks.pruner_hi,
125-
reader_lo: watermarks.reader_lo,
126-
wait_for_ms,
118+
let watermark = self.0.watermark();
119+
Ok(watermark.map(|w| {
120+
let elapsed_ms = w.pruner_timestamp as i64
121+
- SystemTime::now()
122+
.duration_since(UNIX_EPOCH)
123+
.unwrap()
124+
.as_millis() as i64;
125+
let wait_for_ms = delay.as_millis() as i64 + elapsed_ms;
126+
PrunerWatermark {
127+
pruner_hi: w.pruner_hi,
128+
reader_lo: w.reader_lo,
129+
wait_for_ms,
130+
}
127131
}))
128132
}
129133

@@ -132,11 +136,16 @@ impl Connection for MockConnection<'_> {
132136
_pipeline: &'static str,
133137
watermark: CommitterWatermark,
134138
) -> anyhow::Result<bool> {
135-
let mut watermarks = self.0.watermarks.lock().unwrap();
136-
watermarks.epoch_hi_inclusive = watermark.epoch_hi_inclusive;
137-
watermarks.checkpoint_hi_inclusive = watermark.checkpoint_hi_inclusive;
138-
watermarks.tx_hi = watermark.tx_hi;
139-
watermarks.timestamp_ms_hi_inclusive = watermark.timestamp_ms_hi_inclusive;
139+
let mut curr = self.0.watermark.lock().unwrap();
140+
*curr = Some(MockWatermark {
141+
epoch_hi_inclusive: watermark.epoch_hi_inclusive,
142+
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
143+
tx_hi: watermark.tx_hi,
144+
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
145+
reader_lo: curr.as_ref().map(|w| w.reader_lo).unwrap_or(0),
146+
pruner_timestamp: curr.as_ref().map(|w| w.pruner_timestamp).unwrap_or(0),
147+
pruner_hi: curr.as_ref().map(|w| w.pruner_hi).unwrap_or(0),
148+
});
140149
Ok(true)
141150
}
142151

@@ -160,12 +169,15 @@ impl Connection for MockConnection<'_> {
160169
return Err(anyhow::anyhow!("set_reader_watermark failed"));
161170
}
162171

163-
let mut watermarks = self.0.watermarks.lock().unwrap();
164-
watermarks.reader_lo = reader_lo;
165-
watermarks.pruner_timestamp = SystemTime::now()
166-
.duration_since(UNIX_EPOCH)
167-
.unwrap()
168-
.as_millis() as u64;
172+
let mut curr = self.0.watermark.lock().unwrap();
173+
*curr = Some(MockWatermark {
174+
reader_lo,
175+
pruner_timestamp: SystemTime::now()
176+
.duration_since(UNIX_EPOCH)
177+
.unwrap()
178+
.as_millis() as u64,
179+
..curr.as_ref().unwrap().clone()
180+
});
169181
Ok(true)
170182
}
171183

@@ -174,8 +186,11 @@ impl Connection for MockConnection<'_> {
174186
_pipeline: &'static str,
175187
pruner_hi: u64,
176188
) -> anyhow::Result<bool> {
177-
let mut watermarks = self.0.watermarks.lock().unwrap();
178-
watermarks.pruner_hi = pruner_hi;
189+
let mut curr = self.0.watermark.lock().unwrap();
190+
*curr = Some(MockWatermark {
191+
pruner_hi,
192+
..curr.as_ref().unwrap().clone()
193+
});
179194
Ok(true)
180195
}
181196
}
@@ -350,9 +365,9 @@ impl MockStore {
350365
self.sequential_checkpoint_data.lock().unwrap().clone()
351366
}
352367

353-
/// Helper to get the current watermark state for testing
354-
pub fn get_watermark(&self) -> MockWatermark {
355-
self.watermarks.lock().unwrap().clone()
368+
/// Helper to get the current watermark state for testing.
369+
pub fn watermark(&self) -> Option<MockWatermark> {
370+
self.watermark.lock().unwrap().clone()
356371
}
357372

358373
/// Helper to get the number of connection attempts for testing
@@ -440,9 +455,10 @@ impl MockStore {
440455
) -> MockWatermark {
441456
let start = std::time::Instant::now();
442457
while start.elapsed() < timeout_duration {
443-
let watermark = self.get_watermark();
444-
if watermark.checkpoint_hi_inclusive >= checkpoint {
445-
return watermark;
458+
if let Some(watermark) = self.watermark() {
459+
if watermark.checkpoint_hi_inclusive >= checkpoint {
460+
return watermark;
461+
}
446462
}
447463
tokio::time::sleep(Duration::from_millis(50)).await;
448464
}

crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs

Lines changed: 28 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use super::Handler;
4444
/// the watermark cannot be progressed. If `skip_watermark` is set, the task will shutdown
4545
/// immediately.
4646
pub(super) fn commit_watermark<H: Handler + 'static>(
47-
initial_watermark: Option<CommitterWatermark>,
47+
mut next_checkpoint: u64,
4848
config: CommitterConfig,
4949
skip_watermark: bool,
5050
mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
@@ -67,12 +67,9 @@ pub(super) fn commit_watermark<H: Handler + 'static>(
6767
// watermark as much as possible without going over any holes in the sequence of
6868
// checkpoints (entirely missing watermarks, or incomplete watermarks).
6969
let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
70-
let (mut watermark, mut next_checkpoint) = if let Some(watermark) = initial_watermark {
71-
let next = watermark.checkpoint_hi_inclusive + 1;
72-
(watermark, next)
73-
} else {
74-
(CommitterWatermark::default(), 0)
75-
};
70+
// Initially, this watermark is synthetic, and will be overwritten by a processed
71+
// checkpoint.
72+
let mut watermark = CommitterWatermark::default();
7673

7774
// The watermark task will periodically output a log message at a higher log level to
7875
// demonstrate that the pipeline is making progress.
@@ -84,7 +81,10 @@ pub(super) fn commit_watermark<H: Handler + 'static>(
8481
&metrics.watermark_checkpoint_in_db,
8582
);
8683

87-
info!(pipeline = H::NAME, ?watermark, "Starting commit watermark");
84+
info!(
85+
pipeline = H::NAME,
86+
next_checkpoint, "Starting commit watermark task"
87+
);
8888

8989
loop {
9090
tokio::select! {
@@ -275,9 +275,9 @@ mod tests {
275275

276276
use crate::{
277277
metrics::IndexerMetrics,
278+
mocks::store::*,
278279
pipeline::{CommitterConfig, Processor, WatermarkPart},
279280
store::CommitterWatermark,
280-
testing::mock_store::*,
281281
FieldCount,
282282
};
283283

@@ -318,7 +318,7 @@ mod tests {
318318

319319
fn setup_test<H: Handler<Store = MockStore> + 'static>(
320320
config: CommitterConfig,
321-
initial_watermark: Option<CommitterWatermark>,
321+
next_checkpoint: u64,
322322
store: MockStore,
323323
) -> TestSetup {
324324
let (watermark_tx, watermark_rx) = mpsc::channel(100);
@@ -329,7 +329,7 @@ mod tests {
329329
let cancel_clone = cancel.clone();
330330

331331
let commit_watermark_handle = commit_watermark::<H>(
332-
initial_watermark,
332+
next_checkpoint,
333333
config,
334334
false,
335335
watermark_rx,
@@ -360,11 +360,7 @@ mod tests {
360360
#[tokio::test]
361361
async fn test_basic_watermark_progression() {
362362
let config = CommitterConfig::default();
363-
let initial_watermark = Some(CommitterWatermark {
364-
checkpoint_hi_inclusive: 0,
365-
..Default::default()
366-
});
367-
let setup = setup_test::<DataPipeline>(config, initial_watermark, MockStore::default());
363+
let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
368364

369365
// Send watermark parts in order
370366
for cp in 1..4 {
@@ -376,7 +372,7 @@ mod tests {
376372
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
377373

378374
// Verify watermark progression
379-
let watermark = setup.store.get_watermark();
375+
let watermark = setup.store.watermark().unwrap();
380376
assert_eq!(watermark.checkpoint_hi_inclusive, 3);
381377

382378
// Clean up
@@ -387,11 +383,7 @@ mod tests {
387383
#[tokio::test]
388384
async fn test_out_of_order_watermarks() {
389385
let config = CommitterConfig::default();
390-
let initial_watermark = Some(CommitterWatermark {
391-
checkpoint_hi_inclusive: 0,
392-
..Default::default()
393-
});
394-
let setup = setup_test::<DataPipeline>(config, initial_watermark, MockStore::default());
386+
let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
395387

396388
// Send watermark parts out of order
397389
let parts = vec![
@@ -405,7 +397,7 @@ mod tests {
405397
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
406398

407399
// Verify watermark hasn't progressed past 2
408-
let watermark = setup.store.get_watermark();
400+
let watermark = setup.store.watermark().unwrap();
409401
assert_eq!(watermark.checkpoint_hi_inclusive, 2);
410402

411403
// Send checkpoint 3 to fill the gap
@@ -419,7 +411,7 @@ mod tests {
419411
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
420412

421413
// Verify watermark has progressed to 4
422-
let watermark = setup.store.get_watermark();
414+
let watermark = setup.store.watermark().unwrap();
423415
assert_eq!(watermark.checkpoint_hi_inclusive, 4);
424416

425417
// Clean up
@@ -433,12 +425,8 @@ mod tests {
433425
watermark_interval_ms: 1_000, // Long polling interval to test connection retry
434426
..Default::default()
435427
};
436-
let initial_watermark = Some(CommitterWatermark {
437-
checkpoint_hi_inclusive: 0,
438-
..Default::default()
439-
});
440428
let store = MockStore::default().with_connection_failures(1);
441-
let setup = setup_test::<DataPipeline>(config, initial_watermark, store);
429+
let setup = setup_test::<DataPipeline>(config, 1, store);
442430

443431
// Send watermark part
444432
let part = create_watermark_part_for_checkpoint(1);
@@ -448,14 +436,14 @@ mod tests {
448436
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
449437

450438
// Verify watermark hasn't progressed
451-
let watermark = setup.store.get_watermark();
452-
assert_eq!(watermark.checkpoint_hi_inclusive, 0);
439+
let watermark = setup.store.watermark();
440+
assert!(watermark.is_none());
453441

454442
// Wait for next polling and processing
455443
tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
456444

457445
// Verify watermark has progressed
458-
let watermark = setup.store.get_watermark();
446+
let watermark = setup.store.watermark().unwrap();
459447
assert_eq!(watermark.checkpoint_hi_inclusive, 1);
460448

461449
// Clean up
@@ -469,11 +457,7 @@ mod tests {
469457
watermark_interval_ms: 1_000, // Long polling interval to test adding complete part
470458
..Default::default()
471459
};
472-
let initial_watermark = Some(CommitterWatermark {
473-
checkpoint_hi_inclusive: 0,
474-
..Default::default()
475-
});
476-
let setup = setup_test::<DataPipeline>(config, initial_watermark, MockStore::default());
460+
let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
477461

478462
// Send the first incomplete watermark part
479463
let part = WatermarkPart {
@@ -490,8 +474,8 @@ mod tests {
490474
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
491475

492476
// Verify watermark hasn't progressed
493-
let watermark = setup.store.get_watermark();
494-
assert_eq!(watermark.checkpoint_hi_inclusive, 0);
477+
let watermark = setup.store.watermark();
478+
assert!(watermark.is_none());
495479

496480
// Send the other two parts to complete the watermark
497481
setup
@@ -504,7 +488,7 @@ mod tests {
504488
tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
505489

506490
// Verify watermark has progressed
507-
let watermark = setup.store.get_watermark();
491+
let watermark = setup.store.watermark().unwrap();
508492
assert_eq!(watermark.checkpoint_hi_inclusive, 1);
509493

510494
// Clean up
@@ -515,8 +499,7 @@ mod tests {
515499
#[tokio::test]
516500
async fn test_no_initial_watermark() {
517501
let config = CommitterConfig::default();
518-
let initial_watermark = None;
519-
let setup = setup_test::<DataPipeline>(config, initial_watermark, MockStore::default());
502+
let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
520503

521504
// Send the checkpoint 1 watermark
522505
setup
@@ -529,8 +512,8 @@ mod tests {
529512
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
530513

531514
// Verify watermark hasn't progressed
532-
let watermark = setup.store.get_watermark();
533-
assert_eq!(watermark.checkpoint_hi_inclusive, 0);
515+
let watermark = setup.store.watermark();
516+
assert!(watermark.is_none());
534517

535518
// Send the checkpoint 0 watermark to fill the gap.
536519
setup
@@ -543,7 +526,7 @@ mod tests {
543526
tokio::time::sleep(tokio::time::Duration::from_millis(1200)).await;
544527

545528
// Verify watermark has progressed
546-
let watermark = setup.store.get_watermark();
529+
let watermark = setup.store.watermark().unwrap();
547530
assert_eq!(watermark.checkpoint_hi_inclusive, 1);
548531

549532
// Clean up

0 commit comments

Comments
 (0)