Skip to content

Conversation

@segfault-magnet
Copy link
Contributor

closes: #198

Couldn't get the desired performance via indexes for production-level loads.

No choice but to denormalize adding a is_bundled to fuel_blocks.

On the upside makes the queries a lot simpler.

Base automatically changed from bug/schemaspy_orhpan_tables_missing to master March 13, 2025 11:37
@hal3e hal3e requested a review from a team as a code owner April 3, 2025 15:04
@fuel-cla-bot
Copy link

fuel-cla-bot bot commented Apr 3, 2025

Thanks for the contribution! Before we can merge this, we need @MujkicA to sign the Fuel Labs Contributor License Agreement.

Copy link
Member

@rymnc rymnc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviewed on my phone, more to come (left a marker for me to pick up after)

I think the cla is failing because of a couple commits that need to be reauthored -
e74403b & 70ba25e

})
.collect::<Vec<_>>();

sqlx::query!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, can you move the fragment preparation and query builder before the tx.begin()?

this will help reduce the time spent in the transaction, and therefore reduce waiting elsewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

let mut total_bytes = 0;
async fn take_limited_amount_of_blocks(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn take_limited_amount_of_blocks(
async fn take_limited_contiguous_blocks(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 1247 to 1281
let mut contiguous_blocks = Vec::new();
let mut cumulative_bytes = 0;
let mut total_blocks_count = 0;
let mut last_height: Option<u32> = None;
while let Some(val) = stream.try_next().await? {
let data_len = val.data.len();
if total_bytes + data_len > max_cumulative_bytes as usize {

while cumulative_bytes < target_cumulative_bytes {
let Some(db_block) = stream.try_next().await? else {
break;
}
};

let block = CompressedFuelBlock::try_from(val)?;
total_blocks_count += 1;
let data_len = db_block.data.len() as u32;
let block = CompressedFuelBlock::try_from(db_block)?;
let height = block.height;
total_bytes += data_len;

blocks.push(block);

match &mut last_height {
Some(last_height) if height != last_height.saturating_add(1) => {
// Check if the block is contiguous.
if let Some(prev_height) = last_height {
if height != prev_height.saturating_add(1) {
// A gap is detected. Break out without adding this block.
break;
}
_ => {
last_height = Some(height);
}
}
last_height = Some(height);
contiguous_blocks.push(block);
cumulative_bytes += data_len;
}

Ok(blocks)
if cumulative_bytes < target_cumulative_bytes {
total_blocks_count += count_remaining_blocks(&mut stream, block_buildup_threshold).await?;
let buildup_detected = total_blocks_count >= block_buildup_threshold;
Ok((contiguous_blocks, Some(buildup_detected)))
} else {
// If target bytes are reached without encountering a gap, no buildup detection is needed.
Ok((contiguous_blocks, None))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would something like -

let mut last_height: Option<u32> = None;
    let mut cumulative_bytes = 0;
    let mut total_blocks_count = 0;

    let contiguous_blocks: Vec<CompressedFuelBlock> = stream
        .try_filter_map(|db_block| async {
            total_blocks_count += 1;
            let block = CompressedFuelBlock::try_from(db_block)?;
            let data_len = block.data.len() as u32;

            // If there is a height gap, stop processing.
            if let Some(prev_height) = last_height {
                if block.height != prev_height.saturating_add(1) {
                    return Ok(None);
                }
            }

            last_height = Some(block.height);
            cumulative_bytes += data_len;

            Ok(Some(block))
        })
        .take_while(|_| std::future::ready(cumulative_bytes < target_cumulative_bytes))
        .try_collect()
        .await?;

    let buildup_detected = if cumulative_bytes < target_cumulative_bytes {
        total_blocks_count += count_remaining_blocks(&mut stream, block_buildup_threshold).await?;
        Some(total_blocks_count >= block_buildup_threshold)
    } else {
        None
    };

    Ok((contiguous_blocks, buildup_detected))
  }

work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues:

  1. cumulative_bytes is being borrowed mutably by the try_filter_map and immutably by the take_while causing borrowing issues. Can be worked around by using Arc<AtomicU32>.
  2. try_collect will consume the stream, so I cannot count_remaining_blocks after that.

@rymnc rymnc requested review from a team and removed request for MujkicA, Salka1988 and digorithm April 3, 2025 16:53
Comment on lines 3 to 5
-- 1. Add the column without enforcing NOT NULL initially
ALTER TABLE fuel_blocks
ADD COLUMN IF NOT EXISTS is_bundled BOOLEAN;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curios why can't we just set default value to be false from the beginning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason. Will update now.
6ffff3a

pub trait Storage: Sync {
async fn lowest_sequence_of_unbundled_blocks(
&self,
starting_height: u32,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need starting_height now? It looks like you can sort blocks based by height and is_bundled = false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's there to give a hard limit to what blocks we no longer care about.

Defensive programming for the unlikely case the committer has not posted for more than 1w.

The starting height is set so that we don't post 1w+ old blocks even if their is_bundled is false.

#[allow(async_fn_in_trait)]
#[trait_variant::make(Send)]
pub trait Storage: Sync {
async fn lowest_sequence_of_unbundled_blocks(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming for me was confusing, now when I see how it works, I think that maybe next_candidates_for_bundling, if we remove dependency on starting_height.

Copy link
Contributor Author

@segfault-magnet segfault-magnet Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename it even though removing starting_height is still being discussed. Candidates is more easily understood, can add a comment on the nature of choosing the candidates.

c53ca9f

forgot the comment:
4c6abd3

Comment on lines 380 to 381
target_cumulative_bytes: u32,
block_buildup_detection_threshold: u32,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] It would he nice if you had some BundlingLimits or something like that, where you store all criteria used to limit number of blocks in the bundle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Bundler service has a config whose fields impact the number of blocks that are going to end up in a bundle. So all of the relevant limits are found there.

A subsection of that config is given to the storage adapter here to fetch blocks for bundling. The remainder of the bundler config will influence how many of the fetched blocks end up in a bundle.

path: /health
port: http
initialDelaySeconds: 10
initialDelaySeconds: 60
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curios is there a way to know how long will be migration on the real database from devnet/testnet/mainnet?


let finished = db
.lowest_sequence_of_unbundled_blocks(0, 1)
.lowest_sequence_of_unbundled_blocks(0, 1, u32::MAX)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Something like:

Suggested change
.lowest_sequence_of_unbundled_blocks(0, 1, u32::MAX)
.lowest_sequence_of_unbundled_blocks(0, 1, BytesLimit::Unlimited)

Could be more readable=D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@segfault-magnet
Copy link
Contributor Author

I think the cla is failing because of a couple commits that need to be reauthored - e74403b & 70ba25e

I'll reauthor them as soon as all comments are addressed. So that I don't break commit links I've been leaving in replies to the PR comments.

) -> services::Result<Option<UnbundledBlocks>> {
self.db
._lowest_unbundled_blocks(starting_height, max_cumulative_bytes)
._next_candidates_for_bundling(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] In fuel-core, we try not to use _ for internal methods. Instead, we add _inner suffix.

Comment on lines +1273 to +1274
// If target bytes are reached without encountering a gap, no buildup detection is needed.
Ok((consecutive_blocks, None))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why is it not needed? It is not clear from the function name and return types=D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Test unbundled blocks query for a 7d lookback window (and optimize if needed)

6 participants