Skip to content

Commit 53dc022

Browse files
committed
[to] recover transactions through TransactionDriver (#24385)
## Description Recover transactions through Transaction Driver instead of Quorum Driver, and clear recovered transactions explicitly. Also remove more usages of Quorum Driver via Transaction Orchestrator. ## Test plan CI
1 parent 8402917 commit 53dc022

File tree

7 files changed

+243
-235
lines changed

7 files changed

+243
-235
lines changed

crates/sui-core/src/transaction_driver/mod.rs

Lines changed: 112 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ impl<A> TransactionDriver<A>
8787
where
8888
A: AuthorityAPI + Send + Sync + 'static + Clone,
8989
{
90+
// TODO: accept a TransactionDriverConfig to set default allowed & blocked validators.
9091
pub fn new(
9192
authority_aggregator: Arc<AuthorityAggregator<A>>,
9293
reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
@@ -120,109 +121,17 @@ where
120121
driver
121122
}
122123

123-
// Runs a background task to send ping transactions to all validators to perform latency checks to test both the fast path and the consensus path.
124-
async fn run_latency_checks(self: Arc<Self>) {
125-
const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
126-
const MAX_JITTER: Duration = Duration::from_secs(10);
127-
const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
128-
129-
let mut interval = interval(INTERVAL_BETWEEN_RUNS);
130-
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
131-
132-
loop {
133-
interval.tick().await;
134-
135-
let mut tasks = JoinSet::new();
136-
137-
for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
138-
Self::ping_for_tx_type(
139-
self.clone(),
140-
&mut tasks,
141-
tx_type,
142-
MAX_JITTER,
143-
PING_REQUEST_TIMEOUT,
144-
);
145-
}
146-
147-
while let Some(result) = tasks.join_next().await {
148-
if let Err(e) = result {
149-
tracing::debug!("Error while driving ping transaction: {}", e);
150-
}
151-
}
152-
}
124+
/// Returns the authority aggregator wrapper which upgrades on epoch changes.
125+
pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
126+
&self.authority_aggregator
153127
}
154128

155-
/// Pings all validators for e2e latency with the provided transaction type.
156-
fn ping_for_tx_type(
157-
self: Arc<Self>,
158-
tasks: &mut JoinSet<()>,
159-
tx_type: TxType,
160-
max_jitter: Duration,
161-
ping_timeout: Duration,
162-
) {
163-
// We are iterating over the single writer and shared object transaction types to test both the fast path and the consensus path.
164-
let auth_agg = self.authority_aggregator.load().clone();
165-
let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
166-
167-
self.metrics
168-
.latency_check_runs
169-
.with_label_values(&[tx_type.as_str()])
170-
.inc();
171-
172-
for name in validators {
173-
let display_name = auth_agg.get_display_name(&name);
174-
let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
175-
let self_clone = self.clone();
176-
177-
let task = async move {
178-
// Add some random delay to the task to avoid all tasks running at the same time
179-
if delay_ms > 0 {
180-
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
181-
}
182-
let start_time = Instant::now();
183-
184-
let ping_type = if tx_type == TxType::SingleWriter {
185-
PingType::FastPath
186-
} else {
187-
PingType::Consensus
188-
};
189-
190-
// Now send a ping transaction to the chosen validator for the provided tx type
191-
match self_clone
192-
.drive_transaction(
193-
SubmitTxRequest::new_ping(ping_type),
194-
SubmitTransactionOptions {
195-
allowed_validators: vec![display_name.clone()],
196-
..Default::default()
197-
},
198-
Some(ping_timeout),
199-
)
200-
.await
201-
{
202-
Ok(_) => {
203-
tracing::debug!(
204-
"Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
205-
display_name,
206-
tx_type.as_str(),
207-
start_time.elapsed().as_secs_f64()
208-
);
209-
}
210-
Err(err) => {
211-
tracing::debug!(
212-
"Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
213-
tx_type.as_str(),
214-
display_name,
215-
err
216-
);
217-
}
218-
}
219-
};
220-
221-
tasks.spawn(task);
222-
}
223-
}
224-
225-
/// Drives transaction to submission and effects certification. Ping requests are derived from the submitted payload.
129+
/// Drives transaction to finalization.
130+
///
131+
/// Internally, retries the attempt to finalize a transaction until:
132+
/// - The transaction is finalized.
133+
/// - The transaction observes a non-retriable error.
134+
/// - Timeout is reached.
226135
#[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
227136
pub async fn drive_transaction(
228137
&self,
@@ -438,6 +347,108 @@ where
438347
result
439348
}
440349

350+
// Runs a background task to send ping transactions to all validators to perform latency checks to test both the fast path and the consensus path.
351+
async fn run_latency_checks(self: Arc<Self>) {
352+
const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
353+
const MAX_JITTER: Duration = Duration::from_secs(10);
354+
const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
355+
356+
let mut interval = interval(INTERVAL_BETWEEN_RUNS);
357+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
358+
359+
loop {
360+
interval.tick().await;
361+
362+
let mut tasks = JoinSet::new();
363+
364+
for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
365+
Self::ping_for_tx_type(
366+
self.clone(),
367+
&mut tasks,
368+
tx_type,
369+
MAX_JITTER,
370+
PING_REQUEST_TIMEOUT,
371+
);
372+
}
373+
374+
while let Some(result) = tasks.join_next().await {
375+
if let Err(e) = result {
376+
tracing::debug!("Error while driving ping transaction: {}", e);
377+
}
378+
}
379+
}
380+
}
381+
382+
/// Pings all validators for e2e latency with the provided transaction type.
383+
fn ping_for_tx_type(
384+
self: Arc<Self>,
385+
tasks: &mut JoinSet<()>,
386+
tx_type: TxType,
387+
max_jitter: Duration,
388+
ping_timeout: Duration,
389+
) {
390+
// We are iterating over the single writer and shared object transaction types to test both the fast path and the consensus path.
391+
let auth_agg = self.authority_aggregator.load().clone();
392+
let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
393+
394+
self.metrics
395+
.latency_check_runs
396+
.with_label_values(&[tx_type.as_str()])
397+
.inc();
398+
399+
for name in validators {
400+
let display_name = auth_agg.get_display_name(&name);
401+
let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
402+
let self_clone = self.clone();
403+
404+
let task = async move {
405+
// Add some random delay to the task to avoid all tasks running at the same time
406+
if delay_ms > 0 {
407+
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
408+
}
409+
let start_time = Instant::now();
410+
411+
let ping_type = if tx_type == TxType::SingleWriter {
412+
PingType::FastPath
413+
} else {
414+
PingType::Consensus
415+
};
416+
417+
// Now send a ping transaction to the chosen validator for the provided tx type
418+
match self_clone
419+
.drive_transaction(
420+
SubmitTxRequest::new_ping(ping_type),
421+
SubmitTransactionOptions {
422+
allowed_validators: vec![display_name.clone()],
423+
..Default::default()
424+
},
425+
Some(ping_timeout),
426+
)
427+
.await
428+
{
429+
Ok(_) => {
430+
tracing::debug!(
431+
"Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
432+
display_name,
433+
tx_type.as_str(),
434+
start_time.elapsed().as_secs_f64()
435+
);
436+
}
437+
Err(err) => {
438+
tracing::debug!(
439+
"Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
440+
tx_type.as_str(),
441+
display_name,
442+
err
443+
);
444+
}
445+
}
446+
};
447+
448+
tasks.spawn(task);
449+
}
450+
}
451+
441452
fn enable_reconfig(
442453
self: &Arc<Self>,
443454
reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,

0 commit comments

Comments
 (0)