@@ -1334,6 +1334,86 @@ def test_data_blob_writer_with_shard(self):
13341334 self .assertEqual (result .num_rows , 3 , "Should have 5 rows" )
13351335 self .assertEqual (result .num_columns , 3 , "Should have 3 columns" )
13361336
1337+ def test_blob_read_row_by_row_iterator (self ):
1338+ """Test reading blob data row by row using to_iterator()."""
1339+ from pypaimon import Schema
1340+ from pypaimon .table .row .blob import Blob
1341+ from pypaimon .table .row .internal_row import RowKind
1342+
1343+ pa_schema = pa .schema ([
1344+ ('id' , pa .int32 ()),
1345+ ('name' , pa .string ()),
1346+ ('blob_data' , pa .large_binary ()),
1347+ ])
1348+
1349+ schema = Schema .from_pyarrow_schema (
1350+ pa_schema ,
1351+ options = {
1352+ 'row-tracking.enabled' : 'true' ,
1353+ 'data-evolution.enabled' : 'true'
1354+ }
1355+ )
1356+ self .catalog .create_table ('test_db.blob_iterator_test' , schema , False )
1357+ table = self .catalog .get_table ('test_db.blob_iterator_test' )
1358+
1359+ expected_data = {
1360+ 1 : {'name' : 'Alice' , 'blob' : b'blob_1' },
1361+ 2 : {'name' : 'Bob' , 'blob' : b'blob_2_data' },
1362+ 3 : {'name' : 'Charlie' , 'blob' : b'blob_3_content' },
1363+ 4 : {'name' : 'David' , 'blob' : b'blob_4_large_content' },
1364+ 5 : {'name' : 'Eve' , 'blob' : b'blob_5_very_large_content_data' }
1365+ }
1366+
1367+ test_data = pa .Table .from_pydict ({
1368+ 'id' : list (expected_data .keys ()),
1369+ 'name' : [expected_data [i ]['name' ] for i in expected_data .keys ()],
1370+ 'blob_data' : [expected_data [i ]['blob' ] for i in expected_data .keys ()]
1371+ }, schema = pa_schema )
1372+
1373+ write_builder = table .new_batch_write_builder ()
1374+ writer = write_builder .new_write ()
1375+ writer .write_arrow (test_data )
1376+ commit_messages = writer .prepare_commit ()
1377+ commit = write_builder .new_commit ()
1378+ commit .commit (commit_messages )
1379+ writer .close ()
1380+
1381+ # Verify blob files were created
1382+ file_names = [f .file_name for f in commit_messages [0 ].new_files ]
1383+ self .assertGreater (
1384+ len ([f for f in file_names if f .endswith ('.blob' )]), 0 ,
1385+ "Should have at least one blob file" )
1386+
1387+ # Read using to_iterator
1388+ iterator = table .new_read_builder ().new_read ().to_iterator (
1389+ table .new_read_builder ().new_scan ().plan ().splits ())
1390+
1391+ rows = []
1392+ value = next (iterator , None )
1393+ while value is not None :
1394+ rows .append (value )
1395+ value = next (iterator , None )
1396+
1397+ self .assertEqual (len (rows ), 5 , "Should have 5 rows" )
1398+
1399+ for row in rows :
1400+ row_id = row .get_field (0 )
1401+ self .assertIn (row_id , expected_data , f"ID { row_id } should be in expected data" )
1402+
1403+ expected = expected_data [row_id ]
1404+ self .assertEqual (row .get_field (1 ), expected ['name' ], f"Row { row_id } : name should match" )
1405+
1406+ row_blob = row .get_field (2 )
1407+ blob_bytes = row_blob .to_data () if isinstance (row_blob , Blob ) else row_blob
1408+ self .assertIsInstance (blob_bytes , bytes , f"Row { row_id } : blob should be bytes" )
1409+ self .assertEqual (blob_bytes , expected ['blob' ], f"Row { row_id } : blob data should match" )
1410+ self .assertEqual (len (blob_bytes ), len (expected ['blob' ]), f"Row { row_id } : blob size should match" )
1411+
1412+ self .assertIn (
1413+ row .get_row_kind (),
1414+ [RowKind .INSERT , RowKind .UPDATE_BEFORE , RowKind .UPDATE_AFTER , RowKind .DELETE ],
1415+ f"Row { row_id } : RowKind should be valid" )
1416+
13371417
13381418if __name__ == '__main__' :
13391419 unittest .main ()
0 commit comments