From b85fea8be112ff44d06fabed2dcf1583bfb9e2c8 Mon Sep 17 00:00:00 2001 From: Guocork Date: Thu, 10 Jul 2025 16:16:59 +0800 Subject: [PATCH] import lz4 to compress --- Cargo.lock | 22 ++++- network/Cargo.toml | 1 + network/src/compress.rs | 162 +++++++++++++++++++++++++++------- network/src/tests/compress.rs | 21 ++++- 4 files changed, 170 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 803e29c86b..3361a2f67c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1366,6 +1366,7 @@ dependencies = [ "hickory-resolver", "idb", "ipnetwork", + "lz4", "num_cpus", "proptest", "rand 0.8.5", @@ -3974,7 +3975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -4049,6 +4050,25 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "match_cfg" version = "0.1.0" diff --git a/network/Cargo.toml b/network/Cargo.toml index 62248f3b1c..61dc29f9f7 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -35,6 +35,7 @@ bloom-filters.workspace = true ckb-spawn.workspace = true bitflags.workspace = true p2p = { workspace = true, default-features = false } +lz4 = "1.28.1" [target.'cfg(not(target_family = "wasm"))'.dependencies] p2p = { workspace = true, default-features = false, features = [ diff --git a/network/src/compress.rs b/network/src/compress.rs index ed63a2680c..1b33b15f99 100644 --- a/network/src/compress.rs +++ b/network/src/compress.rs @@ -8,7 +8,8 @@ use std::io; pub(crate) const COMPRESSION_SIZE_THRESHOLD: usize = 1024; const UNCOMPRESS_FLAG: u8 = 0b0000_0000; -const COMPRESS_FLAG: u8 = 0b1000_0000; +const SNAPPY_FLAG: u8 = 0b1000_0000; +const LZ4_FLAG: u8 = 0b0100_0000; const MAX_UNCOMPRESSED_LEN: usize = 1 << 23; // 8MB /// Compressed decompression structure @@ -26,6 +27,31 @@ const MAX_UNCOMPRESSED_LEN: usize = 1 << 23; // 8MB /// +-------+------+------------------------------------------------+ /// | 1~ | | Payload (Serialized Data with Compress) | /// +-------+------+------------------------------------------------+ +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum CompressionType { + None, + Snappy, + Lz4, +} + +impl CompressionType { + fn from_flag(flag: u8) -> Self { + match flag & 0b1100_0000 { + SNAPPY_FLAG => CompressionType::Snappy, + LZ4_FLAG => CompressionType::Lz4, + _ => CompressionType::None, + } + } + + fn to_flag(self) -> u8 { + match self { + CompressionType::None => UNCOMPRESS_FLAG, + CompressionType::Snappy => SNAPPY_FLAG, + CompressionType::Lz4 => LZ4_FLAG, + } + } +} + #[derive(Clone, Debug)] pub(crate) struct Message { inner: BytesMut, @@ -45,17 +71,23 @@ impl Message { Self { inner: data } } - /// Compress message - pub(crate) fn compress(mut self) -> Bytes { - if self.inner.len() > COMPRESSION_SIZE_THRESHOLD { + /// Compress message with specified compression type + pub(crate) fn compress_with(mut self, compression_type: CompressionType) -> Bytes { + if self.inner.len() > COMPRESSION_SIZE_THRESHOLD && compression_type != CompressionType::None { let input = self.inner.split_off(1); - match SnapEncoder::new().compress_vec(&input) { + let compress_result = match compression_type { + CompressionType::Snappy => self.compress_snappy(&input), + CompressionType::Lz4 => self.compress_lz4(&input), + CompressionType::None => return self.inner.freeze(), + }; + + match compress_result { Ok(res) => { self.inner.extend_from_slice(&res); - self.set_compress_flag(); + self.set_compression_flag(compression_type); } Err(e) => { - debug!("snappy compress error: {}", e); + debug!("{:?} compress error: {}", compression_type, e); self.inner.unsplit(input); } } @@ -63,47 +95,104 @@ impl Message { self.inner.freeze() } + /// Compress message with default snappy compression + pub(crate) fn compress(self) -> Bytes { + self.compress_with(CompressionType::Snappy) + } + + /// Compress message in snappy format + fn compress_snappy(&self, input: &BytesMut) -> Result, io::Error> { + SnapEncoder::new() + .compress_vec(input) + .map_err(|e| { + io::Error::new(io::ErrorKind::InvalidData, e) + }) + } + + /// Compress message in lz4 format + fn compress_lz4(&self, input: &BytesMut) -> Result, io::Error> { + lz4::block::compress(input, None, false) + .map_err(|e| { + io::Error::new(io::ErrorKind::InvalidData, e) + }) + } + /// Decompress message pub(crate) fn decompress(mut self) -> Result { if self.inner.is_empty() { - Err(io::ErrorKind::InvalidData.into()) - } else if self.compress_flag() { - match decompress_len(&self.inner[1..]) { - Ok(decompressed_bytes_len) => { - if decompressed_bytes_len > MAX_UNCOMPRESSED_LEN { - debug!( - "The limit for uncompressed bytes len is exceeded. limit: {}, len: {}", - MAX_UNCOMPRESSED_LEN, decompressed_bytes_len - ); + return Err(io::ErrorKind::InvalidData.into()); + } + + let compression_type = self.compression_type(); + + match compression_type { + CompressionType::None => { + let _ = self.inner.split_to(1); + Ok(self.inner.freeze()) + }, + CompressionType::Snappy => self.decompress_snappy(), + CompressionType::Lz4 => self.decompress_lz4(), + } + } + + /// Decompress message in snappy format + fn decompress_snappy(&mut self) -> Result { + match decompress_len(&self.inner[1..]) { + Ok(decompressed_bytes_len) => { + if decompressed_bytes_len > MAX_UNCOMPRESSED_LEN { + debug!( + "The limit for uncompressed bytes len is exceeded. limit: {}, len: {}", + MAX_UNCOMPRESSED_LEN, decompressed_bytes_len + ); + return Err(io::ErrorKind::InvalidData.into()); + } + + let mut buf = vec![0; decompressed_bytes_len]; + match SnapDecoder::new().decompress(&self.inner[1..], &mut buf) { + Ok(_) => Ok(buf.into()), + Err(e) => { + debug!("snappy decompress error: {:?}", e); Err(io::ErrorKind::InvalidData.into()) - } else { - let mut buf = vec![0; decompressed_bytes_len]; - match SnapDecoder::new().decompress(&self.inner[1..], &mut buf) { - Ok(_) => Ok(buf.into()), - Err(e) => { - debug!("snappy decompress error: {:?}", e); - Err(io::ErrorKind::InvalidData.into()) - } - } } } - Err(e) => { - debug!("snappy decompress_len error: {:?}", e); - Err(io::ErrorKind::InvalidData.into()) + } + Err(e) => { + debug!("snappy decompress_len error: {:?}", e); + Err(io::ErrorKind::InvalidData.into()) + } + } + } + + /// Decompress message in lz4 format + fn decompress_lz4(&mut self) -> Result { + match lz4::block::decompress(&self.inner[1..], Some(MAX_UNCOMPRESSED_LEN as i32)) { + Ok(decompressed_data) => { + if decompressed_data.len() > MAX_UNCOMPRESSED_LEN { + debug!( + "The limit for uncompressed bytes len is exceeded. limit: {}, len: {}", + MAX_UNCOMPRESSED_LEN, decompressed_data.len() + ); + return Err(io::ErrorKind::InvalidData.into()); } + Ok(decompressed_data.into()) + } + Err(e) => { + debug!("lz4 decompress error: {:?}", e); + Err(io::ErrorKind::InvalidData.into()) } - } else { - let _ = self.inner.split_to(1); - Ok(self.inner.freeze()) } } - pub(crate) fn set_compress_flag(&mut self) { - self.inner[0] = COMPRESS_FLAG; + fn set_compression_flag(&mut self, compression_type: CompressionType) { + self.inner[0] = compression_type.to_flag(); + } + + pub(crate) fn compression_type(&self) -> CompressionType { + CompressionType::from_flag(self.inner[0]) } pub(crate) fn compress_flag(&self) -> bool { - (self.inner[0] & COMPRESS_FLAG) != 0 + self.compression_type() != CompressionType::None } } @@ -116,3 +205,8 @@ pub fn compress(src: Bytes) -> Bytes { pub fn decompress(src: BytesMut) -> Result { Message::from_compressed(src).decompress() } + +/// Compress data with specified compression type +pub fn compress_with(src: Bytes, compression_type: CompressionType) -> Bytes { + Message::from_raw(src).compress_with(compression_type) +} diff --git a/network/src/tests/compress.rs b/network/src/tests/compress.rs index 6166093851..b136c0c547 100644 --- a/network/src/tests/compress.rs +++ b/network/src/tests/compress.rs @@ -1,6 +1,6 @@ use p2p::bytes::{Bytes, BytesMut}; -use crate::compress::{COMPRESSION_SIZE_THRESHOLD, Message, compress, decompress}; +use crate::compress::{COMPRESSION_SIZE_THRESHOLD, Message, compress, decompress, CompressionType, compress_with}; #[test] fn test_no_need_compress() { @@ -46,3 +46,22 @@ fn test_invalid_data() { assert!(decompress(BytesMut::from(&cmp_data.as_ref()[1..])).is_err()); assert!(decompress(BytesMut::new()).is_err()); } + + #[test] +fn test_compression_types() { + let data = Bytes::from(vec![1u8; 2000]); + + let snappy_compressed = compress_with(data.clone(), CompressionType::Snappy); + let snappy_msg = Message::from_compressed(BytesMut::from(snappy_compressed.as_ref())); + assert_eq!(snappy_msg.compression_type(), CompressionType::Snappy); + + let lz4_compressed = compress_with(data.clone(), CompressionType::Lz4); + let lz4_msg = Message::from_compressed(BytesMut::from(lz4_compressed.as_ref())); + assert_eq!(lz4_msg.compression_type(), CompressionType::Lz4); + + let decompressed_snappy = decompress(BytesMut::from(snappy_compressed.as_ref())).unwrap(); + let decompressed_lz4 = decompress(BytesMut::from(lz4_compressed.as_ref())).unwrap(); + + assert_eq!(decompressed_snappy, data); + assert_eq!(decompressed_lz4, data); +}