diff --git a/Cargo.toml b/Cargo.toml index 284a836..4155ea8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,4 +33,9 @@ fluss = { version = "0.1.0", path = "./crates/fluss" } tokio = { version = "1.44.2", features = ["full"] } clap = { version = "4.5.37", features = ["derive"] } arrow = { version = "57.0.0", features = ["ipc_compression"] } +chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] } + +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +opendal = "0.53" jiff = { version = "0.2" } diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index dab85b6..e1fa531 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -26,7 +26,7 @@ version = { workspace = true } [dependencies] fluss = { workspace = true } tokio = { workspace = true } -clap = { workspace = true} +clap = { workspace = true } [[example]] name = "example-table" path = "src/example_table.rs" \ No newline at end of file diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index aa763d5..0cf0364 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -22,11 +22,12 @@ version = { workspace = true } name = "fluss" [features] -default = ["storage-memory", "storage-fs"] -storage-all = ["storage-memory", "storage-fs"] +default = ["storage-memory", "storage-fs", "storage-s3"] +storage-all = ["storage-memory", "storage-fs", "storage-s3"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] +storage-s3 = ["opendal/services-s3"] integration_tests = [] [dependencies] @@ -39,9 +40,9 @@ crc32c = "0.6.8" linked-hash-map = "0.5.6" prost = "0.14" rand = "0.9.1" -serde = { version = "1.0.219", features = ["derive", "rc"] } -serde_json = "1.0.140" -thiserror = "2" +serde = { workspace = true, features = ["rc"] } +serde_json = { workspace = true } +thiserror = "1.0" log = { version = "0.4", features = ["kv_std"] } tokio = { workspace = true } parking_lot = "0.12" diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs new file mode 100644 index 0000000..4078b47 --- /dev/null +++ b/crates/fluss/src/client/credentials.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::metadata::Metadata; +use crate::error::{Error, Result}; +use crate::rpc::RpcClient; +use crate::rpc::message::GetSecurityTokenRequest; +use parking_lot::RwLock; +use serde::Deserialize; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +const CACHE_TTL: Duration = Duration::from_secs(3600); + +#[derive(Debug, Deserialize)] +struct Credentials { + access_key_id: String, + access_key_secret: String, + security_token: Option, +} + +struct CachedToken { + access_key_id: String, + secret_access_key: String, + security_token: Option, + addition_infos: HashMap, + cached_at: Instant, +} + +impl CachedToken { + fn to_remote_fs_props(&self) -> HashMap { + let mut props = HashMap::new(); + + props.insert("access_key_id".to_string(), self.access_key_id.clone()); + props.insert( + "secret_access_key".to_string(), + self.secret_access_key.clone(), + ); + + if let Some(token) = &self.security_token { + props.insert("security_token".to_string(), token.clone()); + } + + for (key, value) in &self.addition_infos { + if let Some((opendal_key, transform)) = convert_hadoop_key_to_opendal(key) { + let final_value = if transform { + // Invert boolean value (path_style_access -> enable_virtual_host_style) + if value == "true" { + "false".to_string() + } else { + "true".to_string() + } + } else { + value.clone() + }; + props.insert(opendal_key, final_value); + } + } + + props + } +} + +/// Returns (opendal_key, needs_inversion) +/// needs_inversion is true for path_style_access -> enable_virtual_host_style conversion +fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> { + match hadoop_key { + "fs.s3a.endpoint" => Some(("endpoint".to_string(), false)), + "fs.s3a.endpoint.region" => Some(("region".to_string(), false)), + "fs.s3a.path.style.access" => Some(("enable_virtual_host_style".to_string(), true)), + "fs.s3a.connection.ssl.enabled" => None, + _ => None, + } +} + +pub struct CredentialsCache { + inner: RwLock>, +} + +impl CredentialsCache { + pub fn new() -> Self { + Self { + inner: RwLock::new(None), + } + } + + pub async fn get_or_refresh( + &self, + rpc_client: &Arc, + metadata: &Arc, + ) -> Result> { + { + let guard = self.inner.read(); + if let Some(cached) = guard.as_ref() { + if cached.cached_at.elapsed() < CACHE_TTL { + return Ok(cached.to_remote_fs_props()); + } + } + } + + self.refresh_from_server(rpc_client, metadata).await + } + + async fn refresh_from_server( + &self, + rpc_client: &Arc, + metadata: &Arc, + ) -> Result> { + let cluster = metadata.get_cluster(); + let server_node = cluster + .get_coordinator_server() + .or_else(|| Some(cluster.get_one_available_server())) + .expect("no available server to fetch security token"); + let conn = rpc_client.get_connection(server_node).await?; + + let request = GetSecurityTokenRequest::new(); + let response = conn.request(request).await?; + + let credentials: Credentials = serde_json::from_slice(&response.token) + .map_err(|e| Error::JsonSerdeError(e.to_string()))?; + + let mut addition_infos = HashMap::new(); + for kv in &response.addition_info { + addition_infos.insert(kv.key.clone(), kv.value.clone()); + } + + let cached = CachedToken { + access_key_id: credentials.access_key_id, + secret_access_key: credentials.access_key_secret, + security_token: credentials.security_token, + addition_infos, + cached_at: Instant::now(), + }; + + let props = cached.to_remote_fs_props(); + *self.inner.write() = Some(cached); + + Ok(props) + } +} + +impl Default for CredentialsCache { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/fluss/src/client/mod.rs b/crates/fluss/src/client/mod.rs index a971439..cff218b 100644 --- a/crates/fluss/src/client/mod.rs +++ b/crates/fluss/src/client/mod.rs @@ -17,12 +17,14 @@ mod admin; mod connection; +mod credentials; mod metadata; mod table; mod write; pub use admin::*; pub use connection::*; +pub use credentials::*; pub use metadata::*; pub use table::*; pub use write::*; diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 65805d0..aa5615c 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -20,6 +20,7 @@ use crate::metadata::TableBucket; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; use crate::util::delete_file; +use parking_lot::RwLock; use std::collections::HashMap; use std::io; use std::path::{Path, PathBuf}; @@ -115,11 +116,19 @@ impl RemoteLogDownloadFuture { /// Downloader for remote log segment files pub struct RemoteLogDownloader { local_log_dir: TempDir, + remote_fs_props: RwLock>, } impl RemoteLogDownloader { pub fn new(local_log_dir: TempDir) -> Result { - Ok(Self { local_log_dir }) + Ok(Self { + local_log_dir, + remote_fs_props: RwLock::new(HashMap::new()), + }) + } + + pub fn set_remote_fs_props(&self, props: HashMap) { + *self.remote_fs_props.write() = props; } /// Request to fetch a remote log segment to local. This method is non-blocking. @@ -133,10 +142,16 @@ impl RemoteLogDownloader { let local_file_path = self.local_log_dir.path().join(&local_file_name); let remote_path = self.build_remote_path(remote_log_tablet_dir, segment); let remote_log_tablet_dir = remote_log_tablet_dir.to_string(); + let remote_fs_props = self.remote_fs_props.read().clone(); // Spawn async download task tokio::spawn(async move { - let result = - Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; + let result = Self::download_file( + &remote_log_tablet_dir, + &remote_path, + &local_file_path, + &remote_fs_props, + ) + .await; let _ = sender.send(result); }); Ok(RemoteLogDownloadFuture::new(receiver)) @@ -157,6 +172,7 @@ impl RemoteLogDownloader { remote_log_tablet_dir: &str, remote_path: &str, local_path: &Path, + remote_fs_props: &HashMap, ) -> Result { // Handle both URL (e.g., "s3://bucket/path") and local file paths // If the path doesn't contain "://", treat it as a local file path @@ -169,11 +185,27 @@ impl RemoteLogDownloader { // Create FileIO from the remote log tablet dir URL to get the storage let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?; + // For S3/S3A URLs, inject S3 credentials from props + let file_io_builder = if remote_log_tablet_dir.starts_with("s3://") + || remote_log_tablet_dir.starts_with("s3a://") + { + file_io_builder.with_props( + remote_fs_props + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())), + ) + } else { + file_io_builder + }; + // Build storage and create operator directly let storage = Storage::build(file_io_builder)?; let (op, relative_path) = storage.create(remote_path)?; - // Get file metadata to know the size + // Timeout for remote storage operations (30 seconds) + const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + + // Get file metadata to know the size with timeout let meta = op.stat(relative_path).await?; let file_size = meta.content_length(); @@ -184,13 +216,36 @@ impl RemoteLogDownloader { // opendal::Reader::read accepts a range, so we read in chunks const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient reading let mut offset = 0u64; + let mut chunk_count = 0u64; + let total_chunks = file_size.div_ceil(CHUNK_SIZE); while offset < file_size { let end = std::cmp::min(offset + CHUNK_SIZE, file_size); let range = offset..end; - - // Read chunk from remote storage - let chunk = op.read_with(relative_path).range(range.clone()).await?; + chunk_count += 1; + + if chunk_count <= 3 || chunk_count % 10 == 0 { + log::debug!( + "Remote log download: reading chunk {}/{} (offset {})", + chunk_count, + total_chunks, + offset + ); + } + + // Read chunk from remote storage with timeout + let read_future = op.read_with(relative_path).range(range.clone()); + let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future) + .await + .map_err(|_| { + Error::Io(io::Error::new( + io::ErrorKind::TimedOut, + format!( + "Timeout reading chunk from remote storage: {} at offset {}", + remote_path, offset + ), + )) + })??; let bytes = chunk.to_bytes(); // Write chunk to local file @@ -254,10 +309,10 @@ impl RemotePendingFetch { // delete the downloaded local file to free disk delete_file(file_path).await; - // Parse log records + // Parse log records (remote log contains full data, need client-side projection) let mut fetch_records = vec![]; for log_record in &mut LogRecordsBatchs::new(data) { - fetch_records.extend(log_record.records(&self.read_context)?); + fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?); } let mut result = HashMap::new(); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index f6780d7..f66d7d7 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -16,6 +16,7 @@ // under the License. use crate::client::connection::FlussConnection; +use crate::client::credentials::CredentialsCache; use crate::client::metadata::Metadata; use crate::error::{Error, Result}; use crate::metadata::{TableBucket, TableInfo, TablePath}; @@ -194,6 +195,7 @@ struct LogFetcher { log_scanner_status: Arc, read_context: ReadContext, remote_log_downloader: RemoteLogDownloader, + credentials_cache: CredentialsCache, } impl LogFetcher { @@ -217,6 +219,7 @@ impl LogFetcher { log_scanner_status, read_context, remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?, + credentials_cache: CredentialsCache::new(), }) } @@ -256,6 +259,12 @@ impl LogFetcher { if let Some(ref remote_log_fetch_info) = fetch_log_for_bucket.remote_log_fetch_info { + let remote_fs_props = self + .credentials_cache + .get_or_refresh(&self.conns, &self.metadata) + .await?; + self.remote_log_downloader + .set_remote_fs_props(remote_fs_props); let remote_fetch_info = RemoteLogFetchInfo::from_proto( remote_log_fetch_info, table_bucket.clone(), diff --git a/crates/fluss/src/io/mod.rs b/crates/fluss/src/io/mod.rs index 3c9a165..a03a394 100644 --- a/crates/fluss/src/io/mod.rs +++ b/crates/fluss/src/io/mod.rs @@ -27,8 +27,13 @@ pub use storage::*; mod storage_fs; #[cfg(feature = "storage-fs")] use storage_fs::*; + #[cfg(feature = "storage-memory")] mod storage_memory; - #[cfg(feature = "storage-memory")] use storage_memory::*; + +#[cfg(feature = "storage-s3")] +mod storage_s3; +#[cfg(feature = "storage-s3")] +use storage_s3::*; diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs index 361da7e..089670e 100644 --- a/crates/fluss/src/io/storage.rs +++ b/crates/fluss/src/io/storage.rs @@ -19,6 +19,7 @@ use crate::error; use crate::error::Result; use crate::io::FileIOBuilder; use opendal::{Operator, Scheme}; +use std::collections::HashMap; /// The storage carries all supported storage services in fluss #[derive(Debug)] @@ -27,11 +28,13 @@ pub enum Storage { Memory, #[cfg(feature = "storage-fs")] LocalFs, + #[cfg(feature = "storage-s3")] + S3 { props: HashMap }, } impl Storage { pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result { - let (scheme_str, _) = file_io_builder.into_parts(); + let (scheme_str, props) = file_io_builder.into_parts(); let scheme = Self::parse_scheme(&scheme_str)?; match scheme { @@ -39,6 +42,8 @@ impl Storage { Scheme::Memory => Ok(Self::Memory), #[cfg(feature = "storage-fs")] Scheme::Fs => Ok(Self::LocalFs), + #[cfg(feature = "storage-s3")] + Scheme::S3 => Ok(Self::S3 { props }), _ => Err(error::Error::IoUnsupported( "Unsupported storage feature".to_string(), )), @@ -66,6 +71,14 @@ impl Storage { Ok((op, &path[1..])) } } + #[cfg(feature = "storage-s3")] + Storage::S3 { props } => { + let (bucket, key) = super::parse_s3_path(path); + let mut s3_props = props.clone(); + s3_props.insert("bucket".to_string(), bucket.to_string()); + let op = super::s3_config_build(&s3_props)?; + Ok((op, key)) + } } } @@ -73,6 +86,7 @@ impl Storage { match scheme { "memory" => Ok(Scheme::Memory), "file" | "" => Ok(Scheme::Fs), + "s3" | "s3a" => Ok(Scheme::S3), s => Ok(s.parse::()?), } } diff --git a/crates/fluss/src/io/storage_s3.rs b/crates/fluss/src/io/storage_s3.rs new file mode 100644 index 0000000..8000d09 --- /dev/null +++ b/crates/fluss/src/io/storage_s3.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use opendal::Configurator; +use opendal::Operator; +use opendal::layers::TimeoutLayer; +use opendal::services::S3Config; +use std::collections::HashMap; +use std::time::Duration; + +pub(crate) fn s3_config_build(props: &HashMap) -> Result { + let config = S3Config::from_iter(props.clone())?; + let op = Operator::from_config(config)?.finish(); + + // Add timeout layer to prevent hanging on S3 operations + let timeout_layer = TimeoutLayer::new() + .with_timeout(Duration::from_secs(10)) + .with_io_timeout(Duration::from_secs(30)); + + Ok(op.layer(timeout_layer)) +} + +pub(crate) fn parse_s3_path(path: &str) -> (&str, &str) { + let path = path + .strip_prefix("s3a://") + .or_else(|| path.strip_prefix("s3://")) + .unwrap_or(path); + + match path.find('/') { + Some(idx) => (&path[..idx], &path[idx + 1..]), + None => (path, ""), + } +} diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index ef460fc..e59c2d9 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -297,4 +297,19 @@ message PbLakeSnapshotForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; optional int64 log_offset = 3; +} + +message PbKeyValue { + required string key = 1; + required string value = 2; +} + +message GetFileSystemSecurityTokenRequest { +} + +message GetFileSystemSecurityTokenResponse { + required string schema = 1; + required bytes token = 2; + optional int64 expiration_time = 3; + repeated PbKeyValue addition_info = 4; } \ No newline at end of file diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 806c9a5..f079f09 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -504,6 +504,30 @@ impl<'a> LogRecordBatch<'a> { }; Ok(log_record_iterator) } + + pub fn records_for_remote_log(&self, read_context: &ReadContext) -> Result { + if self.record_count() == 0 { + return Ok(LogRecordIterator::empty()); + } + + let data = &self.data[RECORDS_OFFSET..]; + + let record_batch = read_context.record_batch_for_remote_log(data)?; + let log_record_iterator = match record_batch { + None => LogRecordIterator::empty(), + Some(record_batch) => { + let arrow_reader = ArrowReader::new(Arc::new(record_batch)); + LogRecordIterator::Arrow(ArrowLogRecordIterator { + reader: arrow_reader, + base_offset: self.base_log_offset(), + timestamp: self.commit_timestamp(), + row_id: 0, + change_type: ChangeType::AppendOnly, + }) + } + }; + Ok(log_record_iterator) + } } /// Parse an Arrow IPC message from a byte slice. @@ -552,7 +576,8 @@ fn parse_ipc_message( let message = root_as_message(metadata_bytes).ok()?; let batch_metadata = message.header_as_record_batch()?; - let body_start = 8 + metadata_size; + let metadata_padded_size = (metadata_size + 7) & !7; + let body_start = 8 + metadata_padded_size; let body_data = &data[body_start..]; let body_buffer = Buffer::from(body_data); @@ -677,7 +702,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType { #[derive(Clone)] pub struct ReadContext { target_schema: SchemaRef, - + full_schema: SchemaRef, projection: Option, } @@ -694,7 +719,8 @@ struct Projection { impl ReadContext { pub fn new(arrow_schema: SchemaRef) -> ReadContext { ReadContext { - target_schema: arrow_schema, + target_schema: arrow_schema.clone(), + full_schema: arrow_schema, projection: None, } } @@ -730,7 +756,10 @@ impl ReadContext { } } else { Projection { - ordered_schema: Self::project_schema(arrow_schema, projected_fields.as_slice()), + ordered_schema: Self::project_schema( + arrow_schema.clone(), + projected_fields.as_slice(), + ), ordered_fields: projected_fields.clone(), projected_fields, reordering_indexes: vec![], @@ -741,6 +770,7 @@ impl ReadContext { ReadContext { target_schema, + full_schema: arrow_schema, projection: Some(project), } } @@ -809,6 +839,35 @@ impl ReadContext { }; Ok(Some(record_batch)) } + + pub fn record_batch_for_remote_log(&self, data: &[u8]) -> Result> { + let (batch_metadata, body_buffer, version) = match parse_ipc_message(data) { + Some(result) => result, + None => return Ok(None), + }; + + let record_batch = read_record_batch( + &body_buffer, + batch_metadata, + self.full_schema.clone(), + &std::collections::HashMap::new(), + None, + &version, + )?; + + let record_batch = match &self.projection { + Some(projection) => { + let projected_columns: Vec<_> = projection + .projected_fields + .iter() + .map(|&idx| record_batch.column(idx).clone()) + .collect(); + RecordBatch::try_new(self.target_schema.clone(), projected_columns)? + } + None => record_batch, + }; + Ok(Some(record_batch)) + } } pub enum LogRecordIterator { diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs index 215bb39..b11647f 100644 --- a/crates/fluss/src/rpc/api_key.rs +++ b/crates/fluss/src/rpc/api_key.rs @@ -32,6 +32,7 @@ pub enum ApiKey { ProduceLog, FetchLog, ListOffsets, + GetFileSystemSecurityToken, GetDatabaseInfo, GetLatestLakeSnapshot, Unknown(i16), @@ -53,6 +54,7 @@ impl From for ApiKey { 1014 => ApiKey::ProduceLog, 1015 => ApiKey::FetchLog, 1021 => ApiKey::ListOffsets, + 1025 => ApiKey::GetFileSystemSecurityToken, 1032 => ApiKey::GetLatestLakeSnapshot, 1035 => ApiKey::GetDatabaseInfo, _ => Unknown(key), @@ -76,6 +78,7 @@ impl From for i16 { ApiKey::ProduceLog => 1014, ApiKey::FetchLog => 1015, ApiKey::ListOffsets => 1021, + ApiKey::GetFileSystemSecurityToken => 1025, ApiKey::GetLatestLakeSnapshot => 1032, ApiKey::GetDatabaseInfo => 1035, Unknown(x) => x, diff --git a/crates/fluss/src/rpc/message/get_security_token.rs b/crates/fluss/src/rpc/message/get_security_token.rs new file mode 100644 index 0000000..7995232 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_security_token.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{GetFileSystemSecurityTokenRequest, GetFileSystemSecurityTokenResponse}; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::{impl_read_version_type, impl_write_version_type}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetSecurityTokenRequest { + pub inner_request: GetFileSystemSecurityTokenRequest, +} + +impl GetSecurityTokenRequest { + pub fn new() -> Self { + Self { + inner_request: GetFileSystemSecurityTokenRequest {}, + } + } +} + +impl Default for GetSecurityTokenRequest { + fn default() -> Self { + Self::new() + } +} + +impl RequestBody for GetSecurityTokenRequest { + type ResponseBody = GetFileSystemSecurityTokenResponse; + const API_KEY: ApiKey = ApiKey::GetFileSystemSecurityToken; + const REQUEST_VERSION: ApiVersion = ApiVersion(0); +} + +impl_write_version_type!(GetSecurityTokenRequest); +impl_read_version_type!(GetFileSystemSecurityTokenResponse); diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index 230d971..0ed5b7c 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -28,6 +28,7 @@ mod drop_table; mod fetch; mod get_database_info; mod get_latest_lake_snapshot; +mod get_security_token; mod get_table; mod header; mod list_databases; @@ -45,6 +46,7 @@ pub use drop_table::*; pub use fetch::*; pub use get_database_info::*; pub use get_latest_lake_snapshot::*; +pub use get_security_token::*; pub use get_table::*; pub use header::*; pub use list_databases::*;