@@ -33,7 +33,10 @@ static SHARED_FLUSS_CLUSTER: Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> =
3333mod admin_test {
3434 use super :: SHARED_FLUSS_CLUSTER ;
3535 use crate :: integration:: fluss_cluster:: { FlussTestingCluster , FlussTestingClusterBuilder } ;
36- use fluss:: metadata:: DatabaseDescriptorBuilder ;
36+ use fluss:: metadata:: {
37+ DataTypes , DatabaseDescriptorBuilder , KvFormat , LogFormat , Schema , TableDescriptor ,
38+ TablePath ,
39+ } ;
3740 use std:: sync:: Arc ;
3841
3942 fn before_all ( ) {
@@ -126,6 +129,121 @@ mod admin_test {
126129
127130 #[ tokio:: test]
128131 async fn test_create_table ( ) {
129- // todo
132+ let cluster = get_fluss_cluster ( ) ;
133+ let connection = cluster. get_fluss_connection ( ) . await ;
134+ let admin = connection
135+ . get_admin ( )
136+ . await
137+ . expect ( "Failed to get admin client" ) ;
138+
139+ let test_db_name = "test_create_table_db" ;
140+ let db_descriptor = DatabaseDescriptorBuilder :: default ( )
141+ . comment ( "Database for test_create_table" )
142+ . build ( ) ;
143+
144+ assert_eq ! ( admin. database_exists( test_db_name) . await . unwrap( ) , false ) ;
145+ admin
146+ . create_database ( test_db_name, false , Some ( & db_descriptor) )
147+ . await
148+ . expect ( "Failed to create test database" ) ;
149+
150+ let test_table_name = "test_user_table" ;
151+ let table_path = TablePath :: new ( test_db_name. to_string ( ) , test_table_name. to_string ( ) ) ;
152+
153+ // build table schema
154+ let table_schema = Schema :: builder ( )
155+ . column ( "id" , DataTypes :: int ( ) )
156+ . column ( "name" , DataTypes :: string ( ) )
157+ . column ( "age" , DataTypes :: int ( ) )
158+ . with_comment ( "User's age (optional)" )
159+ . column ( "email" , DataTypes :: string ( ) )
160+ . primary_key ( vec ! [ "id" . to_string( ) ] )
161+ . build ( )
162+ . expect ( "Failed to build table schema" ) ;
163+
164+ // build table descriptor
165+ let table_descriptor = TableDescriptor :: builder ( )
166+ . schema ( table_schema. clone ( ) )
167+ . comment ( "Test table for user data (id, name, age, email)" )
168+ . distributed_by ( Some ( 3 ) , vec ! [ "id" . to_string( ) ] )
169+ . property ( "table.replication.factor" , "1" )
170+ . log_format ( LogFormat :: ARROW )
171+ . kv_format ( KvFormat :: INDEXED )
172+ . build ( )
173+ . expect ( "Failed to build table descriptor" ) ;
174+
175+ // create test table
176+ admin
177+ . create_table ( & table_path, & table_descriptor, false )
178+ . await
179+ . expect ( "Failed to create test table" ) ;
180+
181+ assert ! (
182+ admin. table_exists( & table_path) . await . unwrap( ) ,
183+ "Table {:?} should exist after creation" ,
184+ table_path
185+ ) ;
186+
187+ let tables = admin. list_tables ( test_db_name) . await . unwrap ( ) ;
188+ assert_eq ! (
189+ tables. len( ) ,
190+ 1 ,
191+ "There should be exactly one table in the database"
192+ ) ;
193+ assert ! (
194+ tables. contains( & test_table_name. to_string( ) ) ,
195+ "Table list should contain the created table"
196+ ) ;
197+
198+ let table_info = admin
199+ . get_table ( & table_path)
200+ . await
201+ . expect ( "Failed to get table info" ) ;
202+
203+ // verify table comment
204+ assert_eq ! (
205+ table_info. get_comment( ) ,
206+ Some ( "Test table for user data (id, name, age, email)" ) ,
207+ "Table comment mismatch"
208+ ) ;
209+
210+ // verify schema columns
211+ let actual_schema = table_info. get_schema ( ) ;
212+ assert_eq ! ( actual_schema, table_descriptor. schema( ) , "Schema mismatch" ) ;
213+
214+ // verify primary key
215+ assert_eq ! (
216+ table_info. get_primary_keys( ) ,
217+ & vec![ "id" . to_string( ) ] ,
218+ "Primary key columns mismatch"
219+ ) ;
220+
221+ // verify distribution and properties
222+ assert_eq ! ( table_info. get_num_buckets( ) , 3 , "Bucket count mismatch" ) ;
223+ assert_eq ! (
224+ table_info. get_bucket_keys( ) ,
225+ & vec![ "id" . to_string( ) ] ,
226+ "Bucket keys mismatch"
227+ ) ;
228+
229+ assert_eq ! (
230+ table_info. get_properties( ) ,
231+ table_descriptor. properties( ) ,
232+ "Properties mismatch"
233+ ) ;
234+
235+ // drop table
236+ admin
237+ . drop_table ( & table_path, false )
238+ . await
239+ . expect ( "Failed to drop table" ) ;
240+ // table shouldn't exist now
241+ assert_eq ! ( admin. table_exists( & table_path) . await . unwrap( ) , false ) ;
242+
243+ // drop database
244+ admin. drop_database ( test_db_name, false , true ) . await ;
245+
246+ // database shouldn't exist now
247+ assert_eq ! ( admin. database_exists( test_db_name) . await . unwrap( ) , false ) ;
130248 }
131249}
0 commit comments