-
Notifications
You must be signed in to change notification settings - Fork 26
perf: db unbundled blocks query optimization #204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
simulated fuel node
…ing' into feat/db_query_optimization
|
Thanks for the contribution! Before we can merge this, we need @MujkicA to sign the Fuel Labs Contributor License Agreement. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| sqlx::query!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, can you move the fragment preparation and query builder before the tx.begin()?
this will help reduce the time spent in the transaction, and therefore reduce waiting elsewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
|
|
||
| let mut total_bytes = 0; | ||
| async fn take_limited_amount_of_blocks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| async fn take_limited_amount_of_blocks( | |
| async fn take_limited_contiguous_blocks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let mut contiguous_blocks = Vec::new(); | ||
| let mut cumulative_bytes = 0; | ||
| let mut total_blocks_count = 0; | ||
| let mut last_height: Option<u32> = None; | ||
| while let Some(val) = stream.try_next().await? { | ||
| let data_len = val.data.len(); | ||
| if total_bytes + data_len > max_cumulative_bytes as usize { | ||
|
|
||
| while cumulative_bytes < target_cumulative_bytes { | ||
| let Some(db_block) = stream.try_next().await? else { | ||
| break; | ||
| } | ||
| }; | ||
|
|
||
| let block = CompressedFuelBlock::try_from(val)?; | ||
| total_blocks_count += 1; | ||
| let data_len = db_block.data.len() as u32; | ||
| let block = CompressedFuelBlock::try_from(db_block)?; | ||
| let height = block.height; | ||
| total_bytes += data_len; | ||
|
|
||
| blocks.push(block); | ||
|
|
||
| match &mut last_height { | ||
| Some(last_height) if height != last_height.saturating_add(1) => { | ||
| // Check if the block is contiguous. | ||
| if let Some(prev_height) = last_height { | ||
| if height != prev_height.saturating_add(1) { | ||
| // A gap is detected. Break out without adding this block. | ||
| break; | ||
| } | ||
| _ => { | ||
| last_height = Some(height); | ||
| } | ||
| } | ||
| last_height = Some(height); | ||
| contiguous_blocks.push(block); | ||
| cumulative_bytes += data_len; | ||
| } | ||
|
|
||
| Ok(blocks) | ||
| if cumulative_bytes < target_cumulative_bytes { | ||
| total_blocks_count += count_remaining_blocks(&mut stream, block_buildup_threshold).await?; | ||
| let buildup_detected = total_blocks_count >= block_buildup_threshold; | ||
| Ok((contiguous_blocks, Some(buildup_detected))) | ||
| } else { | ||
| // If target bytes are reached without encountering a gap, no buildup detection is needed. | ||
| Ok((contiguous_blocks, None)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would something like -
let mut last_height: Option<u32> = None;
let mut cumulative_bytes = 0;
let mut total_blocks_count = 0;
let contiguous_blocks: Vec<CompressedFuelBlock> = stream
.try_filter_map(|db_block| async {
total_blocks_count += 1;
let block = CompressedFuelBlock::try_from(db_block)?;
let data_len = block.data.len() as u32;
// If there is a height gap, stop processing.
if let Some(prev_height) = last_height {
if block.height != prev_height.saturating_add(1) {
return Ok(None);
}
}
last_height = Some(block.height);
cumulative_bytes += data_len;
Ok(Some(block))
})
.take_while(|_| std::future::ready(cumulative_bytes < target_cumulative_bytes))
.try_collect()
.await?;
let buildup_detected = if cumulative_bytes < target_cumulative_bytes {
total_blocks_count += count_remaining_blocks(&mut stream, block_buildup_threshold).await?;
Some(total_blocks_count >= block_buildup_threshold)
} else {
None
};
Ok((contiguous_blocks, buildup_detected))
}work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two issues:
cumulative_bytesis being borrowed mutably by thetry_filter_mapand immutably by thetake_whilecausing borrowing issues. Can be worked around by usingArc<AtomicU32>.try_collectwill consume the stream, so I cannotcount_remaining_blocksafter that.
| -- 1. Add the column without enforcing NOT NULL initially | ||
| ALTER TABLE fuel_blocks | ||
| ADD COLUMN IF NOT EXISTS is_bundled BOOLEAN; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curios why can't we just set default value to be false from the beginning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason. Will update now.
6ffff3a
| pub trait Storage: Sync { | ||
| async fn lowest_sequence_of_unbundled_blocks( | ||
| &self, | ||
| starting_height: u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need starting_height now? It looks like you can sort blocks based by height and is_bundled = false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's there to give a hard limit to what blocks we no longer care about.
Defensive programming for the unlikely case the committer has not posted for more than 1w.
The starting height is set so that we don't post 1w+ old blocks even if their is_bundled is false.
| #[allow(async_fn_in_trait)] | ||
| #[trait_variant::make(Send)] | ||
| pub trait Storage: Sync { | ||
| async fn lowest_sequence_of_unbundled_blocks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming for me was confusing, now when I see how it works, I think that maybe next_candidates_for_bundling, if we remove dependency on starting_height.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rename it even though removing starting_height is still being discussed. Candidates is more easily understood, can add a comment on the nature of choosing the candidates.
forgot the comment:
4c6abd3
| target_cumulative_bytes: u32, | ||
| block_buildup_detection_threshold: u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] It would he nice if you had some BundlingLimits or something like that, where you store all criteria used to limit number of blocks in the bundle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Bundler service has a config whose fields impact the number of blocks that are going to end up in a bundle. So all of the relevant limits are found there.
A subsection of that config is given to the storage adapter here to fetch blocks for bundling. The remainder of the bundler config will influence how many of the fetched blocks end up in a bundle.
| path: /health | ||
| port: http | ||
| initialDelaySeconds: 10 | ||
| initialDelaySeconds: 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curios is there a way to know how long will be migration on the real database from devnet/testnet/mainnet?
e2e/tests/tests/harness.rs
Outdated
|
|
||
| let finished = db | ||
| .lowest_sequence_of_unbundled_blocks(0, 1) | ||
| .lowest_sequence_of_unbundled_blocks(0, 1, u32::MAX) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Something like:
| .lowest_sequence_of_unbundled_blocks(0, 1, u32::MAX) | |
| .lowest_sequence_of_unbundled_blocks(0, 1, BytesLimit::Unlimited) |
Could be more readable=D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than at the end
| ) -> services::Result<Option<UnbundledBlocks>> { | ||
| self.db | ||
| ._lowest_unbundled_blocks(starting_height, max_cumulative_bytes) | ||
| ._next_candidates_for_bundling( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] In fuel-core, we try not to use _ for internal methods. Instead, we add _inner suffix.
| // If target bytes are reached without encountering a gap, no buildup detection is needed. | ||
| Ok((consecutive_blocks, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, why is it not needed? It is not clear from the function name and return types=D
closes: #198
Couldn't get the desired performance via indexes for production-level loads.
No choice but to denormalize adding a
is_bundledtofuel_blocks.On the upside makes the queries a lot simpler.