diff --git a/.sqlx/query-4f59a2060860fc37728a1d3cb5510f0d3c7476f90fd254645465ffe25cdc86cd.json b/.sqlx/query-4f59a2060860fc37728a1d3cb5510f0d3c7476f90fd254645465ffe25cdc86cd.json new file mode 100644 index 00000000..aeb898ca --- /dev/null +++ b/.sqlx/query-4f59a2060860fc37728a1d3cb5510f0d3c7476f90fd254645465ffe25cdc86cd.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE fuel_blocks SET is_bundled = true WHERE height BETWEEN $1 AND $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "4f59a2060860fc37728a1d3cb5510f0d3c7476f90fd254645465ffe25cdc86cd" +} diff --git a/.sqlx/query-51a61136e7db724c9f7ac0df21e4c01167b107c55d0d71861af1373c4a638089.json b/.sqlx/query-51a61136e7db724c9f7ac0df21e4c01167b107c55d0d71861af1373c4a638089.json new file mode 100644 index 00000000..57d6480e --- /dev/null +++ b/.sqlx/query-51a61136e7db724c9f7ac0df21e4c01167b107c55d0d71861af1373c4a638089.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO bundles (start_height, end_height) VALUES ($1, $2)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "51a61136e7db724c9f7ac0df21e4c01167b107c55d0d71861af1373c4a638089" +} diff --git a/.sqlx/query-6115bcc0ad482ff3ef1c3014a2a6c69f5b7acb65ee9d9f1c025191be09b8f1f4.json b/.sqlx/query-6115bcc0ad482ff3ef1c3014a2a6c69f5b7acb65ee9d9f1c025191be09b8f1f4.json new file mode 100644 index 00000000..46166d2f --- /dev/null +++ b/.sqlx/query-6115bcc0ad482ff3ef1c3014a2a6c69f5b7acb65ee9d9f1c025191be09b8f1f4.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT fb.height, fb.data\n FROM fuel_blocks fb \n WHERE fb.is_bundled = false\n AND fb.height >= $1\n ORDER BY fb.height", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "height", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "data", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "6115bcc0ad482ff3ef1c3014a2a6c69f5b7acb65ee9d9f1c025191be09b8f1f4" +} diff --git a/.sqlx/query-6bf15856cd5f235e0def12bd2597b885851cab3db4fc665374765a36b735097c.json b/.sqlx/query-6bf15856cd5f235e0def12bd2597b885851cab3db4fc665374765a36b735097c.json new file mode 100644 index 00000000..9ea5b382 --- /dev/null +++ b/.sqlx/query-6bf15856cd5f235e0def12bd2597b885851cab3db4fc665374765a36b735097c.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO fuel_blocks (height, data) VALUES ($1, $2)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "6bf15856cd5f235e0def12bd2597b885851cab3db4fc665374765a36b735097c" +} diff --git a/.sqlx/query-9b0462800c1d54c2f53679747a9adab59b442b31fdcea196fca3bfeaa85903ac.json b/.sqlx/query-9b0462800c1d54c2f53679747a9adab59b442b31fdcea196fca3bfeaa85903ac.json deleted file mode 100644 index b42093ec..00000000 --- a/.sqlx/query-9b0462800c1d54c2f53679747a9adab59b442b31fdcea196fca3bfeaa85903ac.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT fb.* \n FROM fuel_blocks fb \n WHERE fb.height >= $1\n AND NOT EXISTS (\n SELECT 1 FROM bundles b \n WHERE fb.height BETWEEN b.start_height AND b.end_height\n AND b.end_height >= $1\n ) \n ORDER BY fb.height", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "height", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "data", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false, - false - ] - }, - "hash": "9b0462800c1d54c2f53679747a9adab59b442b31fdcea196fca3bfeaa85903ac" -} diff --git a/.sqlx/query-a9d6241277f3c0b687ea4854a93dc8336b9f305cd0fd4cca9c0264c520fec142.json b/.sqlx/query-a9d6241277f3c0b687ea4854a93dc8336b9f305cd0fd4cca9c0264c520fec142.json deleted file mode 100644 index c16885ec..00000000 --- a/.sqlx/query-a9d6241277f3c0b687ea4854a93dc8336b9f305cd0fd4cca9c0264c520fec142.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT COUNT(*)\n FROM fuel_blocks fb \n WHERE fb.height >= $1\n AND NOT EXISTS (\n SELECT 1 FROM bundles b \n WHERE fb.height BETWEEN b.start_height AND b.end_height\n AND b.end_height >= $1\n )", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "a9d6241277f3c0b687ea4854a93dc8336b9f305cd0fd4cca9c0264c520fec142" -} diff --git a/.sqlx/query-aca7e6c4e3d6019b9d4db75284ae6a63441c50a4d87b12e2308f9a7ccfe037f1.json b/.sqlx/query-aca7e6c4e3d6019b9d4db75284ae6a63441c50a4d87b12e2308f9a7ccfe037f1.json new file mode 100644 index 00000000..85f1c151 --- /dev/null +++ b/.sqlx/query-aca7e6c4e3d6019b9d4db75284ae6a63441c50a4d87b12e2308f9a7ccfe037f1.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT height, is_bundled FROM fuel_blocks ORDER BY height", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "height", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "is_bundled", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "aca7e6c4e3d6019b9d4db75284ae6a63441c50a4d87b12e2308f9a7ccfe037f1" +} diff --git a/.sqlx/query-f3cfded5657439dc92850da9265940398f326d9f3d5175ccb984881d9cfc4c78.json b/.sqlx/query-f3cfded5657439dc92850da9265940398f326d9f3d5175ccb984881d9cfc4c78.json new file mode 100644 index 00000000..42edb257 --- /dev/null +++ b/.sqlx/query-f3cfded5657439dc92850da9265940398f326d9f3d5175ccb984881d9cfc4c78.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT indexname FROM pg_indexes \n WHERE tablename = 'fuel_blocks' \n AND indexname = 'idx_fuel_blocks_is_bundled_height'", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "indexname", + "type_info": "Name" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + true + ] + }, + "hash": "f3cfded5657439dc92850da9265940398f326d9f3d5175ccb984881d9cfc4c78" +} diff --git a/db_preview/relationships.svg b/db_preview/relationships.svg index 065bb29e..ace176d6 100644 --- a/db_preview/relationships.svg +++ b/db_preview/relationships.svg @@ -4,11 +4,11 @@ - - + + largeRelationshipsDiagram - + Generated by SchemaSpy @@ -20,7 +20,7 @@ l1_fuel_block_submission [table] - + id @@ -52,7 +52,7 @@ bundles [table] - + id @@ -82,7 +82,7 @@ l1_blob_transaction [table] - + id @@ -123,10 +123,10 @@ l1_transaction [table] - + id - + submission_id @@ -172,10 +172,10 @@ l1_transaction_fragments [table] - + transaction_id - + fragment_id @@ -206,7 +206,7 @@ l1_fragments [table] - + id idx @@ -217,7 +217,7 @@ unused_bytes - + bundle_id @@ -255,7 +255,7 @@ bundle_cost [table] - + bundle_id da_block_height @@ -295,7 +295,7 @@ _sqlx_migrations [table] - + version int8[19] @@ -336,20 +336,25 @@ fuel_blocks - - - -fuel_blocks -[table] + + + +fuel_blocks +[table] + + +height + +int8[19] - -height +data -int8[19] +bytea[2147483647] + -data +is_bundled -bytea[2147483647] +bool[1] < 0 @@ -359,7 +364,7 @@ 0 > - + @@ -367,35 +372,35 @@ l1_submissions - - - -l1_submissions -[table] + + + +l1_submissions +[table] + + +id + +serial[10] - -id +fuel_block_hash -serial[10] +bytea[2147483647] + -fuel_block_hash +fuel_block_height -bytea[2147483647] - - -fuel_block_height - -int8[19] - - -< 0 - - -   - - -0 > - +int8[19] + + +< 0 + + +   + + +0 > + diff --git a/e2e/benches/src/handlers.rs b/e2e/benches/src/handlers.rs index 0442f93d..18e47cab 100644 --- a/e2e/benches/src/handlers.rs +++ b/e2e/benches/src/handlers.rs @@ -1,6 +1,8 @@ use super::*; -use crate::data::{AppData, ConfigForm}; -use crate::template; +use crate::{ + data::{AppData, ConfigForm}, + template, +}; pub async fn serve_control_panel(data: web::Data) -> HttpResponse { let cfg = data.simulation_config.lock().await; diff --git a/e2e/benches/src/main.rs b/e2e/benches/src/main.rs index 7bbb0059..1831314a 100644 --- a/e2e/benches/src/main.rs +++ b/e2e/benches/src/main.rs @@ -1,5 +1,4 @@ -use std::sync::Arc; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use actix_web::{App, HttpResponse, HttpServer, web}; use anyhow::Result; diff --git a/e2e/tests/tests/harness.rs b/e2e/tests/tests/harness.rs index 93b39ee7..4f66c0f5 100644 --- a/e2e/tests/tests/harness.rs +++ b/e2e/tests/tests/harness.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::Result; use e2e_helpers::whole_stack::{FuelNodeType, WholeStack}; +use services::block_bundler::port::BytesLimit; use tokio::{sync::Mutex, time::sleep_until}; #[tokio::test(flavor = "multi_thread")] @@ -142,7 +143,7 @@ async fn state_submitting_finished( }; let finished = db - .lowest_sequence_of_unbundled_blocks(0, 1) + .next_candidates_for_bundling(0, BytesLimit(1), u32::MAX) .await? .is_none() && db.oldest_nonfinalized_fragments(0, 1).await?.is_empty() diff --git a/helm/fuel-block-committer/templates/deployment.yaml b/helm/fuel-block-committer/templates/deployment.yaml index faa9a31d..7c4e5a44 100644 --- a/helm/fuel-block-committer/templates/deployment.yaml +++ b/helm/fuel-block-committer/templates/deployment.yaml @@ -49,7 +49,7 @@ spec: httpGet: path: /health port: http - initialDelaySeconds: 10 + initialDelaySeconds: 60 periodSeconds: 5 timeoutSeconds: 10 resources: diff --git a/packages/adapters/storage/migrations/0008_add_is_bundled_column.up.sql b/packages/adapters/storage/migrations/0008_add_is_bundled_column.up.sql new file mode 100644 index 00000000..30ee28db --- /dev/null +++ b/packages/adapters/storage/migrations/0008_add_is_bundled_column.up.sql @@ -0,0 +1,26 @@ +BEGIN; + +-- 1. Add the column without enforcing NOT NULL initially +ALTER TABLE fuel_blocks + ADD COLUMN IF NOT EXISTS is_bundled BOOLEAN DEFAULT FALSE; + +-- 2. Set is_bundled to true for blocks that fall within any bundle's range. +UPDATE fuel_blocks fb +SET is_bundled = true +FROM bundles b +WHERE fb.height BETWEEN b.start_height AND b.end_height; + +-- 3. For blocks not updated above, set is_bundled to false +UPDATE fuel_blocks +SET is_bundled = false +WHERE is_bundled IS NULL; + +-- 4. Make the column NOT NULL and set the default for future inserts. +ALTER TABLE fuel_blocks + ALTER COLUMN is_bundled SET NOT NULL; + +-- Create the composite index. +CREATE INDEX IF NOT EXISTS idx_fuel_blocks_is_bundled_height + ON fuel_blocks(is_bundled, height); + +COMMIT; diff --git a/packages/adapters/storage/src/lib.rs b/packages/adapters/storage/src/lib.rs index fcb8cedd..eac28ebe 100644 --- a/packages/adapters/storage/src/lib.rs +++ b/packages/adapters/storage/src/lib.rs @@ -12,7 +12,7 @@ mod postgres; pub use postgres::{DbConfig, Postgres}; use services::{ Result, - block_bundler::port::UnbundledBlocks, + block_bundler::port::{BytesLimit, UnbundledBlocks}, types::{ BlockSubmission, BlockSubmissionTx, BundleCost, CompressedFuelBlock, DateTime, Fragment, L1Tx, NonEmpty, NonNegative, TransactionCostUpdate, TransactionState, Utc, @@ -86,14 +86,19 @@ impl services::block_importer::port::Storage for Postgres { } impl services::block_bundler::port::Storage for Postgres { - async fn lowest_sequence_of_unbundled_blocks( + async fn next_candidates_for_bundling( &self, starting_height: u32, - max_cumulative_bytes: u32, + max_cumulative_bytes: BytesLimit, + block_buildup_threshold: u32, ) -> Result> { - self._lowest_unbundled_blocks(starting_height, max_cumulative_bytes) - .await - .map_err(Into::into) + self._next_candidates_for_bundling( + starting_height, + max_cumulative_bytes, + block_buildup_threshold, + ) + .await + .map_err(Into::into) } async fn insert_bundle_and_fragments( &self, @@ -202,6 +207,7 @@ impl services::state_pruner::port::Storage for Postgres { #[cfg(test)] mod tests { + use std::ops::RangeInclusive; use std::time::Duration; use clock::TestClock; @@ -213,12 +219,15 @@ mod tests { cost_reporter::port::Storage as CostStorage, state_committer::port::Storage as CommitterStorage, state_listener::port::Storage as ListenerStorage, - types::{CollectNonEmpty, L1Tx, TransactionCostUpdate, TransactionState, nonempty}, + types::{ + CollectNonEmpty, CompressedFuelBlock, Fragment, L1Tx, TransactionCostUpdate, + TransactionState, nonempty, + }, }; use super::*; - // Helper function to create a storage instance for testing + // Helper function to create a storage instance for testing. async fn start_db() -> DbWithProcess { PostgresProcess::shared() .await @@ -514,7 +523,7 @@ mod tests { storage: impl services::block_importer::port::Storage, range: RangeInclusive, ) { - // Insert blocks in chunks to enable setting up the db for a load test + // Insert blocks in chunks to enable setting up the db for a load test. let chunk_size = 10_000; for chunk in range.chunks(chunk_size).into_iter() { let blocks = chunk @@ -559,13 +568,19 @@ mod tests { .unwrap(); } - async fn lowest_unbundled_sequence( + // Updated helper to accept the new buildup_detection_threshold parameter. + async fn next_candidates_for_bundling( storage: impl services::block_bundler::port::Storage, starting_height: u32, - max_cumulative_bytes: u32, + max_cumulative_bytes: BytesLimit, + buildup_detection_threshold: u32, ) -> RangeInclusive { storage - .lowest_sequence_of_unbundled_blocks(starting_height, max_cumulative_bytes) + .next_candidates_for_bundling( + starting_height, + max_cumulative_bytes, + buildup_detection_threshold, + ) .await .unwrap() .unwrap() @@ -574,15 +589,16 @@ mod tests { } #[tokio::test] - async fn can_get_lowest_sequence_of_unbundled_blocks() { + async fn can_get_next_candidates_for_bundling() { // given let storage = start_db().await; - // Insert blocks 1 to 10 + // Insert blocks 1 to 10. insert_sequence_of_unbundled_blocks(storage.clone(), 1..=10).await; - // when - let height_range = lowest_unbundled_sequence(storage.clone(), 0, u32::MAX).await; + // when: use a very high buildup threshold so that buildup detection is not triggered. + let height_range = + next_candidates_for_bundling(storage.clone(), 0, BytesLimit::UNLIMITED, u32::MAX).await; // then assert_eq!(height_range, 1..=10); @@ -596,8 +612,9 @@ mod tests { insert_sequence_of_unbundled_blocks(storage.clone(), 0..=2).await; insert_sequence_of_unbundled_blocks(storage.clone(), 4..=6).await; - // when - let height_range = lowest_unbundled_sequence(storage.clone(), 0, u32::MAX).await; + // when: use a high buildup threshold so that buildup detection is not triggered. + let height_range = + next_candidates_for_bundling(storage.clone(), 0, BytesLimit::UNLIMITED, u32::MAX).await; // then assert_eq!(height_range, 0..=2); @@ -610,8 +627,9 @@ mod tests { insert_sequence_of_unbundled_blocks(storage.clone(), 0..=10).await; - // when - let height_range = lowest_unbundled_sequence(storage.clone(), 2, u32::MAX).await; + // when: pass starting height = 2. + let height_range = + next_candidates_for_bundling(storage.clone(), 2, BytesLimit::UNLIMITED, u32::MAX).await; // then assert_eq!(height_range, 2..=10); @@ -624,10 +642,11 @@ mod tests { insert_sequence_of_unbundled_blocks(storage.clone(), 0..=10).await; - // when - let height_range = lowest_unbundled_sequence(storage.clone(), 0, 2).await; + // when: use cumulative limit 2 and a high buildup threshold. + let height_range = + next_candidates_for_bundling(storage.clone(), 0, BytesLimit(2), u32::MAX).await; - // then + // then: expect only blocks 0 and 1 (2 blocks total). assert_eq!(height_range, 0..=1); } @@ -649,13 +668,14 @@ mod tests { insert_sequence_of_unbundled_blocks(storage.clone(), 3..=4).await; // when - let height_range = lowest_unbundled_sequence(storage.clone(), 0, u32::MAX).await; + let height_range = + next_candidates_for_bundling(storage.clone(), 0, BytesLimit::UNLIMITED, u32::MAX).await; // then assert_eq!(height_range, 3..=4); } - /// This can happen if we change the lookback config a couple of times in a short period of time + /// This can happen if we change the lookback config a couple of times in a short period of time. #[tokio::test] async fn can_handle_bundled_blocks_appearing_after_unbundled_ones() { // given @@ -666,9 +686,10 @@ mod tests { insert_sequence_of_unbundled_blocks(storage.clone(), 11..=15).await; // when - let height_range = lowest_unbundled_sequence(storage.clone(), 0, u32::MAX).await; + let height_range = + next_candidates_for_bundling(storage.clone(), 0, BytesLimit::UNLIMITED, u32::MAX).await; - // then + // then: the first unbundled sequence (0..=2) is returned. assert_eq!(height_range, 0..=2); } @@ -688,10 +709,15 @@ mod tests { let blocks_to_retrieve = 3500; let start_time = std::time::Instant::now(); - // each block has only 1 B of data + // each block has only 1 byte of data let max_cumulative_bytes = blocks_to_retrieve; - let height_range = - lowest_unbundled_sequence(storage.clone(), start_height, max_cumulative_bytes).await; + let height_range = next_candidates_for_bundling( + storage.clone(), + start_height, + BytesLimit(max_cumulative_bytes), + u32::MAX, + ) + .await; let elapsed_time = start_time.elapsed(); let expected_range = unbundled_start..=(unbundled_start + blocks_to_retrieve - 1); @@ -701,12 +727,12 @@ mod tests { assert!(elapsed_time.as_secs_f64() <= 2.0); } - // Important because sqlx panics if the bundle is too big + // Important because sqlx panics if the bundle is too big. #[tokio::test] async fn can_insert_big_batches() { let storage = start_db().await; - // u16::MAX because of implementation details + // u16::MAX because of implementation details. insert_sequence_of_bundled_blocks( storage.clone(), 0..=u16::MAX as u32 * 2, @@ -721,12 +747,12 @@ mod tests { let storage = start_db().await; let starting_height = 10; - // Insert a bundle that ends before the starting_height + // Insert a bundle that ends before the starting_height. let next_id = storage.next_bundle_id().await.unwrap(); storage .insert_bundle_and_fragments( next_id, - 1..=5, // Bundle ends at 5 + 1..=5, // Bundle ends at 5. nonempty!(Fragment { data: nonempty![0], unused_bytes: 1000, @@ -736,7 +762,7 @@ mod tests { .await .unwrap(); - // Insert a bundle that ends after the starting_height + // Insert a bundle that ends after the starting_height. let fragment = Fragment { data: nonempty![1], unused_bytes: 1000, @@ -747,7 +773,7 @@ mod tests { storage .insert_bundle_and_fragments( next_id, - 10..=15, // Bundle ends at 15 + 10..=15, // Bundle ends at 15. nonempty!(fragment.clone()), ) .await @@ -770,7 +796,7 @@ mod tests { let storage = start_db().await; let starting_height = 10; - // Insert a bundle that ends exactly at the starting_height + // Insert a bundle that ends exactly at the starting_height. let fragment = Fragment { data: nonempty![2], unused_bytes: 1000, @@ -780,7 +806,7 @@ mod tests { storage .insert_bundle_and_fragments( next_id, - 5..=10, // Bundle ends at 10 + 5..=10, // Bundle ends at 10. nonempty!(fragment.clone()), ) .await @@ -803,7 +829,7 @@ mod tests { let storage = start_db().await; let starting_height = 10; - // Insert a bundle that ends exactly at the starting_height + // Insert a bundle that ends exactly at the starting_height. let fragment = Fragment { data: nonempty![2], unused_bytes: 1000, @@ -813,7 +839,7 @@ mod tests { storage .insert_bundle_and_fragments( next_id, - 5..=10, // Bundle ends at 10 + 5..=10, // Bundle ends at 10. nonempty!(fragment.clone()), ) .await @@ -1357,56 +1383,55 @@ mod tests { storage.insert_blocks(blocks).await.unwrap(); - let lowest_unbundled_heights = |starting_height: u32, max_cumulative_bytes: u32| { + let next_candidates_for_bundling = |starting_height: u32, max_cumulative_bytes: u32| { let storage = storage.clone(); async move { storage - .lowest_sequence_of_unbundled_blocks(starting_height, max_cumulative_bytes) + .next_candidates_for_bundling( + starting_height, + BytesLimit(max_cumulative_bytes), + u32::MAX, + ) .await .unwrap() .map(|seq| seq.oldest.height_range()) } }; - // Case A: With max_cumulative_bytes = 7, we can fit blocks 0..=2 (sizes: 2+4+1=7). - // Block 3 (size 10) would push us to 17 total, exceeding 7, so we must stop before height 3. + // Case A: With max_cumulative_bytes = 7, we can fit blocks 0..=2 (sizes: 2+4+1=7). + // Block 3 (size 10) would push total to 17, exceeding 7, so we must stop before height 3. assert_eq!( - lowest_unbundled_heights(0, 7).await, + next_candidates_for_bundling(0, 7).await, Some(0..=2), "We should get blocks 0..=2 under a 7-byte cumulative limit" ); - // Case B: If the first block alone exceeds the limit, we should get no blocks. - // Try max_cumulative_bytes = 1 => even block 0 has 2 bytes, so we skip everything. + // Case B: If the first block alone exceeds the limit, we should get one block. assert_eq!( - lowest_unbundled_heights(0, 1).await, - None, - "If the first block is bigger than the limit, we get none" + next_candidates_for_bundling(0, 1).await, + Some(0..=0), + "If the first block is bigger than the limit, we get at least one" ); - // Case C: If we increase the cumulative limit to 25, we can include all blocks 0..=4. - // Summing their sizes = 2+4+1+10+2 = 19 <= 25 + // Case C: Increase cumulative limit to 25, we can include all blocks 0..=4. + // Total sizes = 2+4+1+10+2 = 19 <= 25. assert_eq!( - lowest_unbundled_heights(0, 25).await, + next_candidates_for_bundling(0, 25).await, Some(0..=4), "We should be able to include all blocks if the limit is large enough" ); - // Case D: Verify starting_height is respected. If we start from height=2 and have a - // large limit (25), then we only pick blocks >= 2: i.e. heights 2..=4 => sizes 1+10+2=13. - // That is still <= 25, so we get 2..=4. + // Case D: Verify starting_height is respected. + // If starting from height 2 with limit 25, then blocks 2..=4 (sizes 1+10+2=13) are included. assert_eq!( - lowest_unbundled_heights(2, 25).await, + next_candidates_for_bundling(2, 25).await, Some(2..=4), "Should start counting from height=2 and pick blocks 2..=4" ); - // Case E: Ensure bundled blocks are indeed excluded from the selection. - // Let's 'bundle' block 0..=1, then retest. We'll confirm that - // the function no longer returns them. + // Case E: Ensure bundled blocks are excluded. let bundle_id = storage.next_bundle_id().await.unwrap(); let fragments = nonempty!(Fragment { - // the data inside a Fragment is unrelated to the block data; we just need any non-empty data data: nonempty![123], unused_bytes: 0, total_bytes: 1.try_into().unwrap(), @@ -1416,12 +1441,109 @@ mod tests { .await .unwrap(); - // Now blocks 0 and 1 are considered 'bundled', so we only have 2..=4 as unbundled - // if we query from 0 with a large limit + // Now blocks 0 and 1 are bundled; only blocks 2..=4 remain unbundled. assert_eq!( - lowest_unbundled_heights(0, 25).await, + next_candidates_for_bundling(0, 25).await, Some(2..=4), "Blocks 0..=1 are excluded after bundling" ); } + + /// Test that when the cumulative bytes limit is reached, + /// we return only as many blocks as fit and the buildup (buildup_detected) indicator is None. + #[tokio::test] + async fn test_buildup_detected_due_to_limit() { + let storage = start_db().await; + + // Insert blocks 1 through 10, each with 1 byte of data. + let blocks: Vec = (1u32..=10) + .map(|h| CompressedFuelBlock { + height: h, + data: NonEmpty::from_vec(vec![0u8]).expect("Non-empty block data"), + }) + .collect(); + let blocks = NonEmpty::from_vec(blocks).expect("Should have at least one block"); + storage.insert_blocks(blocks).await.unwrap(); + + // Use a cumulative byte limit of 5 bytes. + // When the target cumulative bytes is reached without a gap, buildup detection should return None. + let unbundled = storage + .next_candidates_for_bundling(1, BytesLimit(5), u32::MAX) + .await + .unwrap() + .expect("Expected some unbundled blocks"); + let height_range = unbundled.oldest.height_range(); + let count = *height_range.end() - *height_range.start() + 1; + assert_eq!(count, 5, "Expected only 5 blocks due to cumulative limit"); + assert!( + unbundled.buildup_detected.is_none(), + "Expected 'buildup_detected' to be None when target cumulative bytes are reached" + ); + } + + /// Test that when there is a gap in the block sequence, + /// the function returns blocks up to the gap and sets the buildup (buildup_detected) indicator to true. + #[tokio::test] + async fn test_buildup_detected_due_to_gap() { + let storage = start_db().await; + + // Insert blocks with a gap: 1,2,3 then (no 4), then 5 and 6. + let blocks = [1, 2, 3, 5, 6] + .map(|height| CompressedFuelBlock { + height, + data: NonEmpty::from_vec(vec![0]).expect("Non-empty"), + }) + .into_iter() + .collect_nonempty() + .expect("not empty"); + + storage.insert_blocks(blocks).await.unwrap(); + + // Query starting from height 1 with a generous cumulative limit. + let unbundled = storage + .next_candidates_for_bundling(1, BytesLimit(100), 5) + .await + .unwrap() + .expect("Expected some unbundled blocks"); + // The sequence should stop at height 3 (gap after 3). + assert_eq!( + unbundled.oldest.height_range(), + 1..=3, + "Sequence should stop at the gap" + ); + assert!( + unbundled.buildup_detected.unwrap(), + "Expected 'buildup_detected' to be true when a gap causes early termination and a buildup happens" + ); + } + + /// Test that when all available blocks form a complete sequence and the cumulative limit isn't exceeded, + /// the function returns all blocks and the buildup (buildup_detected) indicator is false since + /// the buildup threshold is high + #[tokio::test] + async fn test_no_more_when_all_fetched() { + let storage = start_db().await; + + // Insert consecutive blocks from heights 10 to 15, each with 1 byte of data. + let blocks: Vec = (10u32..=15) + .map(|h| CompressedFuelBlock { + height: h, + data: NonEmpty::from_vec(vec![0u8]).expect("Non-empty"), + }) + .collect(); + let blocks = NonEmpty::from_vec(blocks).expect("Should have blocks"); + storage.insert_blocks(blocks).await.unwrap(); + + let unbundled = storage + .next_candidates_for_bundling(10, BytesLimit(100), u32::MAX) + .await + .unwrap() + .expect("Expected unbundled blocks"); + assert_eq!( + unbundled.oldest.height_range(), + 10..=15, + "Expected full consecutive sequence" + ); + assert!(!unbundled.buildup_detected.unwrap()); + } } diff --git a/packages/adapters/storage/src/postgres.rs b/packages/adapters/storage/src/postgres.rs index f134e82d..ccaaf2e1 100644 --- a/packages/adapters/storage/src/postgres.rs +++ b/packages/adapters/storage/src/postgres.rs @@ -4,7 +4,7 @@ use futures::{TryStreamExt, stream::BoxStream}; use itertools::Itertools; use metrics::{RegistersMetrics, prometheus::IntGauge}; use services::{ - block_bundler::port::UnbundledBlocks, + block_bundler::port::{BytesLimit, UnbundledBlocks}, types::{ BlockSubmission, BlockSubmissionTx, BundleCost, CompressedFuelBlock, DateTime, Fragment, NonEmpty, NonNegative, TransactionCostUpdate, TransactionState, Utc, @@ -479,47 +479,16 @@ impl Postgres { Ok(response) } - pub(crate) async fn total_unbundled_blocks(&self, starting_height: u32) -> Result { - let count = sqlx::query!( - r#"SELECT COUNT(*) - FROM fuel_blocks fb - WHERE fb.height >= $1 - AND NOT EXISTS ( - SELECT 1 FROM bundles b - WHERE fb.height BETWEEN b.start_height AND b.end_height - AND b.end_height >= $1 - )"#, - i64::from(starting_height) - ) - .fetch_one(&self.connection_pool) - .await? - .count - .unwrap_or_default(); - - let count = u64::try_from(count) - .map_err(|_| crate::error::Error::Conversion("invalid block count".to_string()))?; - - Ok(count) - } - - pub(crate) async fn _lowest_unbundled_blocks( + pub(crate) async fn _next_candidates_for_bundling( &self, starting_height: u32, - max_cumulative_bytes: u32, + max_cumulative_bytes: BytesLimit, + block_buildup_threshold: u32, ) -> Result> { - // We're not using snapshot isolation here, so the count may change by the time - // we retrieve the blocks. If the pruner is configured aggressively to track the - // lookback window closely, the worst-case scenario is a temporary over- or - // underestimation of the block count. This discrepancy should resolve quickly, - // making the overhead of a snapshot-isolated transaction unnecessary. - let total_unbundled_blocks = self.total_unbundled_blocks(starting_height).await?; - - if total_unbundled_blocks == 0 { - return Ok(None); - } - let stream = self.stream_unbundled_blocks(starting_height); - let blocks = take_blocks_until_limit(stream, max_cumulative_bytes).await?; + let (blocks, buildup_detected) = + take_consecutive_blocks(stream, max_cumulative_bytes.0, block_buildup_threshold) + .await?; let sequential_blocks = { let Some(nonempty_blocks) = NonEmpty::from_vec(blocks) else { @@ -535,9 +504,7 @@ impl Postgres { Ok(Some(UnbundledBlocks { oldest: sequential_blocks, - total_unbundled: (total_unbundled_blocks as usize) - .try_into() - .expect("checked already"), + buildup_detected, })) } @@ -548,15 +515,11 @@ impl Postgres { sqlx::query_as!( tables::DBCompressedFuelBlock, r#" - SELECT fb.* - FROM fuel_blocks fb - WHERE fb.height >= $1 - AND NOT EXISTS ( - SELECT 1 FROM bundles b - WHERE fb.height BETWEEN b.start_height AND b.end_height - AND b.end_height >= $1 - ) - ORDER BY fb.height"#, + SELECT fb.height, fb.data + FROM fuel_blocks fb + WHERE fb.is_bundled = false + AND fb.height >= $1 + ORDER BY fb.height"#, i64::from(starting_height), ) .fetch(&self.connection_pool) @@ -1028,21 +991,23 @@ impl Postgres { block_range: RangeInclusive, fragments: NonEmpty, ) -> Result<()> { - let mut tx = self.connection_pool.begin().await?; - - let start = *block_range.start(); - let end = *block_range.end(); - - // Insert a new bundle and retrieve its ID - let bundle_id = sqlx::query!( + let start_height = *block_range.start(); + let end_height = *block_range.end(); + let update_bundled_status = sqlx::query!( + "UPDATE fuel_blocks SET is_bundled = true WHERE height BETWEEN $1 AND $2", + i64::from(start_height), + i64::from(end_height) + ); + let insert_bundles_query = sqlx::query!( "INSERT INTO bundles(id, start_height, end_height) VALUES ($1, $2, $3) RETURNING id", bundle_id.get(), - i64::from(start), - i64::from(end) - ) - .fetch_one(&mut *tx) - .await? - .id; + i64::from(start_height), + i64::from(end_height) + ); + + let mut tx = self.connection_pool.begin().await?; + + let bundle_id = insert_bundles_query.fetch_one(&mut *tx).await?.id; let bundle_id: NonNegative = bundle_id.try_into().map_err(|e| { crate::error::Error::Conversion(format!("invalid bundle id received from db: {}", e)) @@ -1071,7 +1036,7 @@ impl Postgres { .collect::>>()?; // Batch insert fragments - let queries = fragment_rows + let fragment_insertion_queries = fragment_rows .into_iter() .chunks(MAX_FRAGMENTS_PER_QUERY) .into_iter() @@ -1092,12 +1057,12 @@ impl Postgres { }) .collect::>(); - // Execute all fragment insertion queries - for mut query in queries { + update_bundled_status.execute(&mut *tx).await?; + + for mut query in fragment_insertion_queries { query.build().execute(&mut *tx).await?; } - // Commit the transaction tx.commit().await?; Ok(()) @@ -1252,37 +1217,62 @@ impl Postgres { } } -async fn take_blocks_until_limit( - mut stream: BoxStream<'_, std::result::Result>, - max_cumulative_bytes: u32, -) -> Result> { - let mut blocks = vec![]; +// Helper function to count additional blocks until the buildup threshold is reached or the stream is exhausted. +async fn count_remaining_blocks( + stream: &mut BoxStream<'_, std::result::Result>, + buildup_detection_threshold: u32, +) -> Result { + let mut count = 0u32; + while count < buildup_detection_threshold { + if stream.try_next().await?.is_some() { + count += 1; + } else { + break; + } + } + Ok(count) +} - let mut total_bytes = 0; +async fn take_consecutive_blocks( + mut stream: BoxStream<'_, std::result::Result>, + target_cumulative_bytes: u32, + block_buildup_threshold: u32, +) -> Result<(Vec, Option)> { + let mut consecutive_blocks = Vec::new(); + let mut cumulative_bytes = 0; + let mut total_blocks_count = 0; let mut last_height: Option = None; - while let Some(val) = stream.try_next().await? { - let data_len = val.data.len(); - if total_bytes + data_len > max_cumulative_bytes as usize { + + while cumulative_bytes < target_cumulative_bytes { + let Some(db_block) = stream.try_next().await? else { break; - } + }; - let block = CompressedFuelBlock::try_from(val)?; + total_blocks_count += 1; + let data_len = db_block.data.len() as u32; + let block = CompressedFuelBlock::try_from(db_block)?; let height = block.height; - total_bytes += data_len; - - blocks.push(block); - match &mut last_height { - Some(last_height) if height != last_height.saturating_add(1) => { + // Check if the block is consecutive + if let Some(prev_height) = last_height { + if height != prev_height.saturating_add(1) { + // A gap is detected. Break out without adding this block. break; } - _ => { - last_height = Some(height); - } } + last_height = Some(height); + consecutive_blocks.push(block); + cumulative_bytes += data_len; } - Ok(blocks) + if cumulative_bytes < target_cumulative_bytes { + total_blocks_count += count_remaining_blocks(&mut stream, block_buildup_threshold).await?; + let buildup_detected = total_blocks_count >= block_buildup_threshold; + Ok((consecutive_blocks, Some(buildup_detected))) + } else { + // If target bytes are reached without encountering a gap, no buildup detection is needed. + Ok((consecutive_blocks, None)) + } } fn create_ranges(heights: Vec) -> Vec> { @@ -1312,368 +1302,343 @@ fn create_ranges(heights: Vec) -> Vec> { #[cfg(test)] mod tests { - use std::{env, fs, path::Path}; - - use rand::Rng; - use services::types::{CollectNonEmpty, Fragment, L1Tx, TransactionState}; - use sqlx::{Executor, PgPool, Row}; - use tokio::time::Instant; - - use super::*; - use crate::test_instance; - - #[tokio::test] - async fn test_second_migration_applies_successfully() { - let db = test_instance::PostgresProcess::shared() - .await - .expect("Failed to initialize PostgresProcess") - .create_noschema_random_db() - .await - .expect("Failed to create random test database"); - - let manifest_dir = env!("CARGO_MANIFEST_DIR"); - let migrations_path = Path::new(manifest_dir).join("migrations"); + mod migrations { + use sqlx::Executor; + + use crate::Result; + use std::path::PathBuf; + + use crate::DbWithProcess; + + #[tokio::test] + async fn test_migration_8_populates_is_bundled_correctly() { + let process = get_test_pool().await.unwrap(); + let db = process.db.pool(); + + // --- Apply migrations 1 through 7 --- + // These migrations create the necessary tables. + let mig_files = [ + "0001_initial.up.sql", + "0002_better_fragmentation.up.sql", + "0003_block_submission_tx_id.up.sql", + "0004_blob_gas_bumping.sql", + "0005_tx_state_added.up.sql", + "0006_fuel_block_drop_hash_and_set_height_as_pkey.up.sql", + "0007_cost_tracking.sql", + ]; + + for file in &mig_files { + let sql = load_migration_file(file); + + db.execute(sqlx::raw_sql(&sql)).await.unwrap(); + } - async fn apply_migration(pool: &sqlx::Pool, path: &Path) { - let sql = fs::read_to_string(path) - .map_err(|e| format!("Failed to read migration file {:?}: {}", path, e)) + // At this point, the fuel_blocks table (created in migration 0002 and then altered in 0006) + // now has only "height" and "data" columns. + // Insert some sample fuel blocks with various heights. + let blocks = vec![ + // Block not bundled (height 50) + (50i64, b"block50".as_ref()), + // Blocks that should be bundled via first bundle (range 100-150) + (100i64, b"block100".as_ref()), + (125i64, b"block125".as_ref()), + (150i64, b"block150".as_ref()), + // Blocks not bundled (height 175, 200) + (175i64, b"block175".as_ref()), + (200i64, b"block200".as_ref()), + // Block bundled via second bundle (range 300-350) + (320i64, b"block320".as_ref()), + ]; + for (height, data) in blocks { + db.execute(sqlx::query!( + "INSERT INTO fuel_blocks (height, data) VALUES ($1, $2)", + height, + data + )) + .await .unwrap(); - pool.execute(sqlx::raw_sql(&sql)).await.unwrap(); - } + } - // ----------------------- - // Apply Initial Migration - // ----------------------- - let initial_migration_path = migrations_path.join("0001_initial.up.sql"); - apply_migration(&db.db.pool(), &initial_migration_path).await; - - // Insert sample data into initial tables - - let fuel_block_hash = vec![0u8; 32]; - let insert_l1_submissions = r#" - INSERT INTO l1_submissions (fuel_block_hash, fuel_block_height) - VALUES ($1, $2) - RETURNING id - "#; - let row = sqlx::query(insert_l1_submissions) - .bind(&fuel_block_hash) - .bind(1000i64) - .fetch_one(&db.db.pool()) + // Insert sample bundles. + // First bundle covers heights 100 to 150. + db.execute(sqlx::query!( + "INSERT INTO bundles (start_height, end_height) VALUES ($1, $2)", + 100i64, + 150i64 + )) .await .unwrap(); - let submission_id: i32 = row.try_get("id").unwrap(); - - let insert_l1_fuel_block_submission = r#" - INSERT INTO l1_fuel_block_submission (fuel_block_hash, fuel_block_height, completed, submittal_height) - VALUES ($1, $2, $3, $4) - "#; - sqlx::query(insert_l1_fuel_block_submission) - .bind(&fuel_block_hash) - .bind(1000i64) - .bind(true) - .bind(950i64) - .execute(&db.db.pool()) + // Second bundle covers heights 300 to 350. + db.execute(sqlx::query!( + "INSERT INTO bundles (start_height, end_height) VALUES ($1, $2)", + 300i64, + 350i64 + )) .await .unwrap(); - // Insert into l1_transactions - let tx_hash = vec![1u8; 32]; - let insert_l1_transactions = r#" - INSERT INTO l1_transactions (hash, state) - VALUES ($1, $2) - RETURNING id - "#; - let row = sqlx::query(insert_l1_transactions) - .bind(&tx_hash) - .bind(0i16) - .fetch_one(&db.db.pool()) - .await - .unwrap(); - let transaction_id: i32 = row.try_get("id").unwrap(); - - // Insert into l1_fragments - let fragment_data = vec![2u8; 10]; - let insert_l1_fragments = r#" - INSERT INTO l1_fragments (fragment_idx, submission_id, data) - VALUES ($1, $2, $3) - RETURNING id - "#; - let row = sqlx::query(insert_l1_fragments) - .bind(0i64) - .bind(submission_id) - .bind(&fragment_data) - .fetch_one(&db.db.pool()) - .await - .unwrap(); - let fragment_id: i32 = row.try_get("id").unwrap(); - - // Insert into l1_transaction_fragments - let insert_l1_transaction_fragments = r#" - INSERT INTO l1_transaction_fragments (transaction_id, fragment_id) - VALUES ($1, $2) - "#; - sqlx::query(insert_l1_transaction_fragments) - .bind(transaction_id) - .bind(fragment_id) - .execute(&db.db.pool()) - .await - .unwrap(); + // --- Apply Migration 8 --- + // Load migration 8 from its file. + let mig8_sql = load_migration_file("0008_add_is_bundled_column.up.sql"); + db.execute(sqlx::raw_sql(&mig8_sql)).await.unwrap(); + + // --- Verification --- + // Expected logic: + // Blocks with heights 100, 125, 150 (first bundle) and 320 (second bundle) should be marked as bundled. + // Blocks with heights 50, 175, and 200 should remain not bundled. + let rows = sqlx::query!("SELECT height, is_bundled FROM fuel_blocks ORDER BY height") + .fetch_all(&db) + .await + .unwrap(); + for row in rows { + match row.height { + 50 => assert!(!row.is_bundled, "Block at height 50 should not be bundled"), + 100 => assert!(row.is_bundled, "Block at height 100 should be bundled"), + 125 => assert!(row.is_bundled, "Block at height 125 should be bundled"), + 150 => assert!(row.is_bundled, "Block at height 150 should be bundled"), + 175 => assert!(!row.is_bundled, "Block at height 175 should not be bundled"), + 200 => assert!(!row.is_bundled, "Block at height 200 should not be bundled"), + 320 => assert!(row.is_bundled, "Block at height 320 should be bundled"), + other => panic!("Unexpected block height: {}", other), + } + } - // ------------------------ - // Apply Second Migration - // ------------------------ - let second_migration_path = migrations_path.join("0002_better_fragmentation.up.sql"); - apply_migration(&db.db.pool(), &second_migration_path).await; - - // ------------------------ - // Verification Steps - // ------------------------ - - // Function to check table existence - async fn table_exists(pool: &PgPool, table_name: &str) -> bool { - let query = r#" - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_schema = 'public' - AND table_name = $1 + // Verify that the composite index exists. + let index = sqlx::query!( + "SELECT indexname FROM pg_indexes + WHERE tablename = 'fuel_blocks' + AND indexname = 'idx_fuel_blocks_is_bundled_height'" ) - "#; - let row = sqlx::query(query) - .bind(table_name) - .fetch_one(pool) - .await - .expect("Failed to execute table_exists query"); - row.try_get::(0).unwrap_or(false) + .fetch_optional(&db) + .await + .unwrap(); + assert!( + index.is_some(), + "Index 'idx_fuel_blocks_is_bundled_height' should exist" + ); } - // Function to check column existence and type - async fn column_info(pool: &PgPool, table_name: &str, column_name: &str) -> Option { - let query = r#" - SELECT data_type - FROM information_schema.columns - WHERE table_name = $1 AND column_name = $2 - "#; - let row = sqlx::query(query) - .bind(table_name) - .bind(column_name) - .fetch_optional(pool) - .await - .expect("Failed to execute column_info query"); - row.map(|row| row.try_get("data_type").unwrap_or_default()) + fn load_migration_file(file: &str) -> String { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("migrations"); + path.push(file); + std::fs::read_to_string(path) + .unwrap_or_else(|_| panic!("failed to read migration file {file}")) } - let fuel_blocks_exists = table_exists(&db.db.pool(), "fuel_blocks").await; - assert!(fuel_blocks_exists, "fuel_blocks table does not exist"); - - let bundles_exists = table_exists(&db.db.pool(), "bundles").await; - assert!(bundles_exists, "bundles table does not exist"); - - async fn check_columns(pool: &PgPool, table: &str, column: &str, expected_type: &str) { - let info = column_info(pool, table, column).await; - assert!( - info.is_some(), - "Column '{}' does not exist in table '{}'", - column, - table - ); - let data_type = info.unwrap(); - assert_eq!( - data_type, expected_type, - "Column '{}' in table '{}' has type '{}', expected '{}'", - column, table, data_type, expected_type - ); + async fn get_test_pool() -> Result { + crate::test_instance::PostgresProcess::shared() + .await? + .create_noschema_random_db() + .await } + } - // Check that 'l1_fragments' has new columns - check_columns(&db.db.pool(), "l1_fragments", "idx", "integer").await; - check_columns(&db.db.pool(), "l1_fragments", "total_bytes", "bigint").await; - check_columns(&db.db.pool(), "l1_fragments", "unused_bytes", "bigint").await; - check_columns(&db.db.pool(), "l1_fragments", "bundle_id", "integer").await; - - // Verify 'l1_transactions' has 'finalized_at' column - check_columns( - &db.db.pool(), - "l1_transactions", - "finalized_at", - "timestamp with time zone", - ) - .await; + mod performance { + use crate::postgres::Postgres; + use crate::test_instance::{self, PostgresProcess}; + use itertools::Itertools; + use rand::Rng; + use services::block_bundler::port::BytesLimit; + use services::types::{ + CollectNonEmpty, CompressedFuelBlock, Fragment, L1Tx, NonEmpty, TransactionCostUpdate, + TransactionState, Utc, + }; + use std::cmp; + use std::time::{Duration, Instant}; + + #[tokio::test] + async fn stress_test_update_costs() -> crate::Result<()> { + use services::{ + block_bundler::port::Storage, state_committer::port::Storage as CommitterStorage, + state_listener::port::Storage as ListenerStorage, + }; - // Verify that l1_fragments and l1_transaction_fragments are empty after migration - let count_l1_fragments = sqlx::query_scalar::<_, i64>( - r#" - SELECT COUNT(*) FROM l1_fragments - "#, - ) - .fetch_one(&db.db.pool()) - .await - .unwrap(); - assert_eq!( - count_l1_fragments, 0, - "l1_fragments table is not empty after migration" - ); + let mut rng = rand::thread_rng(); - let count_l1_transaction_fragments = sqlx::query_scalar::<_, i64>( - r#" - SELECT COUNT(*) FROM l1_transaction_fragments - "#, - ) - .fetch_one(&db.db.pool()) - .await - .unwrap(); - assert_eq!( - count_l1_transaction_fragments, 0, - "l1_transaction_fragments table is not empty after migration" - ); - - // Insert a default bundle to satisfy the foreign key constraint for future inserts - let insert_default_bundle = r#" - INSERT INTO bundles (start_height, end_height) - VALUES ($1, $2) - RETURNING id - "#; - let row = sqlx::query(insert_default_bundle) - .bind(0i64) - .bind(0i64) - .fetch_one(&db.db.pool()) - .await - .unwrap(); - let bundle_id: i32 = row.try_get("id").unwrap(); - assert_eq!(bundle_id, 1, "Default bundle ID is not 1"); - - // Attempt to insert a fragment with empty data - let insert_invalid_fragment = r#" - INSERT INTO l1_fragments (idx, data, total_bytes, unused_bytes, bundle_id) - VALUES ($1, $2, $3, $4, $5) - "#; - let result = sqlx::query(insert_invalid_fragment) - .bind(1i32) - .bind::<&[u8]>(&[]) // Empty data should fail due to check constraint - .bind(10i64) - .bind(5i64) - .bind(1i32) // Valid bundle_id - .execute(&db.db.pool()) - .await; - - assert!( - result.is_err(), - "Inserting empty data should fail due to check constraint" - ); + let storage = test_instance::PostgresProcess::shared() + .await + .expect("Failed to initialize PostgresProcess") + .create_random_db() + .await + .expect("Failed to create random test database"); + + let fragments_per_bundle = 1_000_000; + let txs_per_fragment = 100; + + // insert the bundle and fragments + let bundle_id = storage.next_bundle_id().await.unwrap(); + let end_height = rng.gen_range(1..5000); + let range = 0..=end_height; + + // create fragments for the bundle + let fragments = (0..fragments_per_bundle) + .map(|_| Fragment { + data: NonEmpty::from_vec(vec![rng.r#gen()]).unwrap(), + unused_bytes: rng.gen_range(0..1000), + total_bytes: rng.gen_range(1000..5000).try_into().unwrap(), + }) + .collect::>(); + let fragments = NonEmpty::from_vec(fragments).unwrap(); - // Insert a valid fragment - let fragment_data_valid = vec![3u8; 15]; - let insert_valid_fragment = r#" - INSERT INTO l1_fragments (idx, data, total_bytes, unused_bytes, bundle_id) - VALUES ($1, $2, $3, $4, $5) - RETURNING id - "#; - let row = sqlx::query(insert_valid_fragment) - .bind(1i32) - .bind(&fragment_data_valid) - .bind(15i64) - .bind(0i64) - .bind(1i32) - .fetch_one(&db.db.pool()) - .await - .unwrap(); + storage + .insert_bundle_and_fragments(bundle_id, range, fragments.clone()) + .await + .unwrap(); - let new_fragment_id: i32 = row.try_get("id").unwrap(); - assert!(new_fragment_id > 0, "Failed to insert a valid fragment"); - } + let fragment_ids = storage + .oldest_nonfinalized_fragments(0, 2) + .await + .unwrap() + .into_iter() + .map(|f| f.id) + .collect_nonempty() + .unwrap(); - #[tokio::test] - async fn stress_test_update_costs() -> Result<()> { - use services::{ - block_bundler::port::Storage, state_committer::port::Storage as CommitterStorage, - state_listener::port::Storage as ListenerStorage, - }; + let mut tx_changes = vec![]; + let mut cost_updates = vec![]; + + // for each fragment, create multiple transactions + for _id in fragment_ids.iter() { + for _ in 0..txs_per_fragment { + let tx_hash = rng.r#gen::<[u8; 32]>(); + let tx = L1Tx { + hash: tx_hash, + nonce: rng.r#gen(), + ..Default::default() + }; + + storage + .record_pending_tx(tx.clone(), fragment_ids.clone(), Utc::now()) + .await + .unwrap(); + + // update transaction state to simulate finalized transactions + let finalization_time = Utc::now(); + tx_changes.push((tx.hash, TransactionState::Finalized(finalization_time))); + + // cost updates + let total_fee = rng.gen_range(1_000_000u128..10_000_000u128); + let da_block_height = rng.gen_range(1_000_000u64..10_000_000u64); + cost_updates.push(TransactionCostUpdate { + tx_hash, + total_fee, + da_block_height, + }); + } + } - let mut rng = rand::thread_rng(); + // update transaction states and costs + let start_time = Instant::now(); - let storage = test_instance::PostgresProcess::shared() - .await - .expect("Failed to initialize PostgresProcess") - .create_random_db() - .await - .expect("Failed to create random test database"); - - let fragments_per_bundle = 1_000_000; - let txs_per_fragment = 100; - - // insert the bundle and fragments - let bundle_id = storage.next_bundle_id().await.unwrap(); - let end_height = rng.gen_range(1..5000); - let range = 0..=end_height; - - // create fragments for the bundle - let fragments = (0..fragments_per_bundle) - .map(|_| Fragment { - data: NonEmpty::from_vec(vec![rng.r#gen()]).unwrap(), - unused_bytes: rng.gen_range(0..1000), - total_bytes: rng.gen_range(1000..5000).try_into().unwrap(), - }) - .collect::>(); - let fragments = NonEmpty::from_vec(fragments).unwrap(); + storage + .update_tx_states_and_costs(tx_changes, vec![], cost_updates) + .await + .unwrap(); - storage - .insert_bundle_and_fragments(bundle_id, range, fragments.clone()) - .await - .unwrap(); + let duration = start_time.elapsed(); - let fragment_ids = storage - .oldest_nonfinalized_fragments(0, 2) - .await - .unwrap() - .into_iter() - .map(|f| f.id) - .collect_nonempty() - .unwrap(); + assert!(duration.as_secs() < 60); - let mut tx_changes = vec![]; - let mut cost_updates = vec![]; - - // for each fragment, create multiple transactions - for _id in fragment_ids.iter() { - for _ in 0..txs_per_fragment { - let tx_hash = rng.r#gen::<[u8; 32]>(); - let tx = L1Tx { - hash: tx_hash, - nonce: rng.r#gen(), - ..Default::default() - }; + Ok(()) + } - storage - .record_pending_tx(tx.clone(), fragment_ids.clone(), Utc::now()) + // Helper function to insert fuel blocks in batches. + // Each block's data is 344 bytes (mimicking production). + async fn insert_fuel_blocks(db: &Postgres, start: u32, end: u32, batch_size: usize) { + // Create a payload of 344 bytes (using a constant value). + let payload = vec![1u8; 344]; + for chunk in (start..=end).chunks(batch_size).into_iter() { + let blocks: Vec = chunk + .into_iter() + .map(|height| CompressedFuelBlock { + height, + data: NonEmpty::from_vec(payload.clone()).expect("NonEmpty data"), + }) + .collect(); + let nonempty_blocks = + NonEmpty::from_vec(blocks).expect("Batch should be non-empty"); + db._insert_blocks(nonempty_blocks) .await - .unwrap(); - - // update transaction state to simulate finalized transactions - let finalization_time = Utc::now(); - tx_changes.push((tx.hash, TransactionState::Finalized(finalization_time))); - - // cost updates - let total_fee = rng.gen_range(1_000_000u128..10_000_000u128); - let da_block_height = rng.gen_range(1_000_000u64..10_000_000u64); - cost_updates.push(TransactionCostUpdate { - tx_hash, - total_fee, - da_block_height, - }); + .expect("Insertion should succeed"); } } - // update transaction states and costs - let start_time = Instant::now(); + #[tokio::test] + async fn test_next_candidates_for_bundling_performance_4m_blocks() { + // Set total number of blocks to insert (around 4 million) + let total_blocks = 7 * 24 * 3600 + 3600; + // We'll leave the last 2500 blocks unbundled. + let bundled_end = total_blocks - 2500; - storage - .update_tx_states_and_costs(tx_changes, vec![], cost_updates) - .await - .unwrap(); + // Set up the test database. + let process = PostgresProcess::shared() + .await + .expect("Failed to start test PostgresProcess"); + let db_with_process = process + .create_random_db() + .await + .expect("Failed to create random test database"); + let db = &db_with_process.db; + + // Insert fuel blocks from 1 to total_blocks with 344-byte payloads. + // Using a batch size of 1000. + insert_fuel_blocks(db, 1, total_blocks, 62000).await; + + // Bundle blocks from 1 to bundled_end in small bundles of at most 3600 blocks. + let bundle_max_size = 3600; + // Create a dummy fragment with a 344-byte payload. + let fragment_payload = vec![1u8; 344]; + let dummy_fragment = Fragment { + data: NonEmpty::from_vec(fragment_payload).expect("NonEmpty data"), + unused_bytes: 0, + total_bytes: 344u32.try_into().unwrap(), + }; - let duration = start_time.elapsed(); + let mut current_start = 1u32; + while current_start <= bundled_end { + let current_end = cmp::min(current_start + bundle_max_size - 1, bundled_end); + let range = current_start..=current_end; + let next_bundle_id = db + ._next_bundle_id() + .await + .expect("Should be able to get a bundle id"); + db._insert_bundle_and_fragments( + next_bundle_id, + range, + NonEmpty::from_vec(vec![dummy_fragment.clone()]) + .expect("Non-empty fragment list"), + ) + .await + .expect("Bundle insertion failed"); + current_start = current_end + 1; + } - assert!(duration.as_secs() < 60); + // Now run the unbundled blocks query. + // Since blocks 1 to bundled_end (3,997,500) are bundled, only blocks 3,997,501 to 4,000,000 (2500 blocks) + // remain unbundled. + let start_height = total_blocks - (7 * 24 * 3600); + let start_time = Instant::now(); + let result = db + ._next_candidates_for_bundling(start_height, BytesLimit::UNLIMITED, u32::MAX) + .await + .expect("Query should execute correctly"); + let duration = start_time.elapsed(); + + // Determine the count of unbundled blocks returned. + let unbundled_count = result + .as_ref() + .map(|seq| { + let range = seq.oldest.height_range(); + range.end() - range.start() + 1 + }) + .unwrap_or(0); - Ok(()) + assert_eq!( + unbundled_count, 2500, + "Expected exactly 2500 unbundled blocks" + ); + assert!(duration < Duration::from_secs(1)); + } } } diff --git a/packages/adapters/storage/src/test_instance.rs b/packages/adapters/storage/src/test_instance.rs index 1e75b4fe..11faca43 100644 --- a/packages/adapters/storage/src/test_instance.rs +++ b/packages/adapters/storage/src/test_instance.rs @@ -6,7 +6,10 @@ use std::{ use delegate::delegate; use services::{ - block_bundler::{self, port::UnbundledBlocks}, + block_bundler::{ + self, + port::{BytesLimit, UnbundledBlocks}, + }, block_committer, block_importer, types::{ BlockSubmission, BlockSubmissionTx, BundleCost, CompressedFuelBlock, DateTime, Fragment, @@ -232,13 +235,18 @@ impl block_importer::port::Storage for DbWithProcess { } impl block_bundler::port::Storage for DbWithProcess { - async fn lowest_sequence_of_unbundled_blocks( + async fn next_candidates_for_bundling( &self, starting_height: u32, - max_cumulative_bytes: u32, + max_cumulative_bytes: BytesLimit, + block_buildup_threshold: u32, ) -> services::Result> { self.db - ._lowest_unbundled_blocks(starting_height, max_cumulative_bytes) + ._next_candidates_for_bundling( + starting_height, + max_cumulative_bytes, + block_buildup_threshold, + ) .await .map_err(Into::into) } diff --git a/packages/services/src/block_bundler.rs b/packages/services/src/block_bundler.rs index acfe575d..15168cff 100644 --- a/packages/services/src/block_bundler.rs +++ b/packages/services/src/block_bundler.rs @@ -16,7 +16,7 @@ pub mod service { }; use crate::{ Error, Result, Runner, - types::{DateTime, Utc, storage::SequentialFuelBlocks}, + types::{DateTime, Utc}, }; #[derive(Debug, Clone, Copy)] @@ -40,7 +40,7 @@ pub mod service { max_fragments_per_bundle: NonZeroUsize::MAX, lookback_window: 1000, max_bundles_per_optimization_run: 1.try_into().unwrap(), - blocks_to_accumulate: NonZeroUsize::new(10).unwrap(), + blocks_to_accumulate: 10.try_into().unwrap(), } } } @@ -159,23 +159,24 @@ pub mod service { async fn bundle_and_fragment_blocks(&mut self) -> Result<()> { let starting_height = self.get_starting_height().await?; - while let Some(UnbundledBlocks { - oldest, - total_unbundled, - }) = self + while let Some(unbundled_blocks) = self .storage - .lowest_sequence_of_unbundled_blocks( + .next_candidates_for_bundling( starting_height, - self.config.bytes_to_accumulate.get() as u32, + super::port::BytesLimit::from(self.config.bytes_to_accumulate.get() as u32), + self.config.blocks_to_accumulate.get() as u32, ) .await? { - if self.should_wait(&oldest, total_unbundled)? { + if self.should_wait(&unbundled_blocks)? { return Ok(()); } let next_id = self.storage.next_bundle_id().await?; - let bundler = self.bundler_factory.build(oldest, next_id).await; + let bundler = self + .bundler_factory + .build(unbundled_blocks.oldest, next_id) + .await; let optimization_start = self.clock.now(); let BundleProposal { @@ -205,34 +206,36 @@ pub mod service { fn should_wait( &self, - blocks: &SequentialFuelBlocks, - total_available: NonZeroUsize, + UnbundledBlocks { + oldest, + buildup_detected, + }: &UnbundledBlocks, ) -> Result { - let cum_size = blocks.cumulative_size(); - let has_more = total_available > blocks.len(); + let cum_size = oldest.cumulative_size(); let still_time_to_accumulate_more = self.still_time_to_accumulate_more()?; - // We use `total_available` because we previously encountered a scenario with: - // - A few very large blocks, + // We use `buildup_detected` because we previously encountered a scenario with: + // - A few unbundled blocks, // - Followed by a long sequence of bundled blocks, // - And then additional unbundled blocks. - // Since bundling required a fixed number of sequential blocks, the process skipped over + // Since bundling required a fixed number of block bytes to be available, the process skipped over // the blocks before the gap until a timeout occurred. Even after timing out, only one bundle // was created, and then the system waited for another timeout. To avoid this, we ignore the // total count of unbundled blocks when deciding whether to wait. The trade-off is that if there // is a gap of unimported blocks followed by many unbundled blocks, processing of the newer // blocks is deferred until the older ones are bundled. This can lead to the creation of small // bundles if the import process cannot supply blocks quickly enough. + let buildup_detected = buildup_detected.unwrap_or(false); + + let enough_bytes = cum_size >= self.config.bytes_to_accumulate; - let should_wait = cum_size < self.config.bytes_to_accumulate - && total_available < self.config.blocks_to_accumulate - && !has_more - && still_time_to_accumulate_more; + let should_wait = !enough_bytes && !buildup_detected && still_time_to_accumulate_more; let available_data = human_readable_size(cum_size); if should_wait { let needed_data = human_readable_size(self.config.bytes_to_accumulate); + let num_blocks = oldest.len(); let until_timeout = humantime::format_duration( self.config @@ -242,13 +245,13 @@ pub mod service { ); tracing::info!( - "Not bundling yet (accumulated {available_data} of required {needed_data}, {total_available}/{} blocks accumulated, timeout in {until_timeout}); waiting for more.", + "Not bundling yet (accumulated {available_data}/{needed_data}, {num_blocks}/{} blocks, timeout in {until_timeout}); waiting for more.", self.config.blocks_to_accumulate ); } else { tracing::info!( "Proceeding to bundle with {} blocks (accumulated {available_data}).", - blocks.len() + oldest.len() ); } @@ -323,7 +326,7 @@ pub mod service { } pub mod port { - use std::{num::NonZeroUsize, ops::RangeInclusive}; + use std::ops::RangeInclusive; use nonempty::NonEmpty; @@ -332,6 +335,25 @@ pub mod port { types::{DateTime, Fragment, NonNegative, Utc, storage::SequentialFuelBlocks}, }; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub struct BytesLimit(pub u32); + + impl BytesLimit { + pub const UNLIMITED: Self = Self(u32::MAX); + } + + impl From for BytesLimit { + fn from(value: u32) -> Self { + Self(value) + } + } + + impl From for u32 { + fn from(limit: BytesLimit) -> Self { + limit.0 + } + } + pub mod fuel { #[allow(async_fn_in_trait)] #[trait_variant::make(Send)] @@ -365,16 +387,20 @@ pub mod port { #[derive(Debug, Clone)] pub struct UnbundledBlocks { pub oldest: SequentialFuelBlocks, - pub total_unbundled: NonZeroUsize, + pub buildup_detected: Option, } #[allow(async_fn_in_trait)] #[trait_variant::make(Send)] pub trait Storage: Sync { - async fn lowest_sequence_of_unbundled_blocks( + /// Candidates are chosen so that: + /// * the first block's height is the lowest available + /// * all blocks are consecutive in height + async fn next_candidates_for_bundling( &self, starting_height: u32, - max_cumulative_bytes: u32, + target_cumulative_bytes: BytesLimit, + block_buildup_detection_threshold: u32, ) -> Result>; async fn insert_bundle_and_fragments( &self, diff --git a/packages/services/tests/block_bundler.rs b/packages/services/tests/block_bundler.rs index bf0ce167..2897bb9e 100644 --- a/packages/services/tests/block_bundler.rs +++ b/packages/services/tests/block_bundler.rs @@ -8,7 +8,7 @@ use metrics::RegistersMetrics; use services::{ BlockBundler, BlockBundlerConfig, Bundle, BundleProposal, Bundler, BundlerFactory, ControllableBundlerFactory, Metadata, Result, Runner, - block_bundler::port::l1::FragmentEncoder, + block_bundler::port::{BytesLimit, l1::FragmentEncoder}, types::{ CollectNonEmpty, CompressedFuelBlock, Fragment, NonEmpty, nonempty, storage::SequentialFuelBlocks, @@ -199,7 +199,7 @@ async fn does_nothing_if_not_enough_blocks() -> Result<()> { }) .await; - // The chain’s latest height is 0 + // The chain's latest height is 0 let mock_fuel_api = test_helpers::mocks::fuel::block_bundler_latest_height_is(0); // We require 2 * block_size in bytes AND we also require 2 blocks @@ -292,7 +292,7 @@ async fn stops_accumulating_blocks_if_time_runs_out_measured_from_component_crea assert!( setup .db() - .lowest_sequence_of_unbundled_blocks(blocks.last().height, 1) + .next_candidates_for_bundling(blocks.last().height, BytesLimit(1), u32::MAX) .await? .is_none() ); @@ -660,7 +660,7 @@ async fn skips_blocks_outside_lookback_window() -> Result<()> { // Ensure that blocks outside the lookback window are still unbundled let unbundled_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(0, u32::MAX) + .next_candidates_for_bundling(0, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap(); diff --git a/packages/services/tests/block_importer.rs b/packages/services/tests/block_importer.rs index 45338a9d..ad8ce37f 100644 --- a/packages/services/tests/block_importer.rs +++ b/packages/services/tests/block_importer.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use mockall::{Sequence, predicate::eq}; use services::{ Result, Runner, - block_bundler::port::Storage, + block_bundler::port::{BytesLimit, Storage}, block_importer::service::BlockImporter, types::{CollectNonEmpty, nonempty}, }; @@ -26,7 +26,7 @@ async fn imports_first_block_when_db_is_empty() -> Result<()> { // then let all_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(0, u32::MAX) + .next_candidates_for_bundling(0, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap() .oldest; @@ -71,7 +71,7 @@ async fn does_not_request_or_import_blocks_already_in_db() -> Result<()> { // then let stored_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(0, u32::MAX) + .next_candidates_for_bundling(0, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap() .oldest; @@ -110,7 +110,7 @@ async fn respects_height_even_if_blocks_before_are_missing() -> Result<()> { // then let stored_new_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(starting_height, u32::MAX) + .next_candidates_for_bundling(starting_height, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap() .oldest; @@ -143,7 +143,7 @@ async fn handles_chain_with_no_new_blocks() -> Result<()> { // Database should remain unchanged let stored_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(0, u32::MAX) + .next_candidates_for_bundling(0, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap() .oldest; @@ -178,7 +178,7 @@ async fn skips_blocks_outside_lookback_window() -> Result<()> { // then let unbundled_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(0, u32::MAX) + .next_candidates_for_bundling(0, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap() .oldest; @@ -243,7 +243,7 @@ async fn fills_in_missing_blocks_inside_lookback_window() -> Result<()> { // then let unbundled_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(0, u32::MAX) + .next_candidates_for_bundling(0, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap() .oldest; @@ -282,7 +282,7 @@ async fn chunks_blocks_correctly_by_count() -> Result<()> { // then let stored_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(0, u32::MAX) + .next_candidates_for_bundling(0, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap() .oldest; @@ -313,7 +313,7 @@ async fn chunks_blocks_correctly_by_size() -> Result<()> { // then let stored_blocks = setup .db() - .lowest_sequence_of_unbundled_blocks(0, u32::MAX) + .next_candidates_for_bundling(0, BytesLimit::UNLIMITED, u32::MAX) .await? .unwrap() .oldest;