|
| 1 | +# Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +# or more contributor license agreements. See the NOTICE file |
| 3 | +# distributed with this work for additional information |
| 4 | +# regarding copyright ownership. The ASF licenses this file |
| 5 | +# to you under the Apache License, Version 2.0 (the |
| 6 | +# "License"); you may not use this file except in compliance |
| 7 | +# with the License. You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, |
| 12 | +# software distributed under the License is distributed on an |
| 13 | +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +# KIND, either express or implied. See the License for the |
| 15 | +# specific language governing permissions and limitations |
| 16 | +# under the License. |
| 17 | + |
| 18 | +import asyncio |
| 19 | +import time |
| 20 | + |
| 21 | +import pandas as pd |
| 22 | +import pyarrow as pa |
| 23 | + |
| 24 | +import fluss |
| 25 | + |
| 26 | + |
| 27 | +async def main(): |
| 28 | + # Create connection configuration |
| 29 | + config_spec = { |
| 30 | + "bootstrap.servers": "127.0.0.1:9123", |
| 31 | + # Add other configuration options as needed |
| 32 | + "request.max.size": "10485760", # 10 MB |
| 33 | + "writer.acks": "all", # Wait for all replicas to acknowledge |
| 34 | + "writer.retries": "3", # Retry up to 3 times on failure |
| 35 | + "writer.batch.size": "1000", # Batch size for writes |
| 36 | + } |
| 37 | + config = fluss.Config(config_spec) |
| 38 | + |
| 39 | + # Create connection using the static connect method |
| 40 | + conn = await fluss.FlussConnection.connect(config) |
| 41 | + |
| 42 | + # Define fields for PyArrow |
| 43 | + fields = [ |
| 44 | + pa.field("id", pa.int32()), |
| 45 | + pa.field("name", pa.string()), |
| 46 | + pa.field("score", pa.float32()), |
| 47 | + pa.field("age", pa.int32()), |
| 48 | + ] |
| 49 | + |
| 50 | + # Create a PyArrow schema |
| 51 | + schema = pa.schema(fields) |
| 52 | + |
| 53 | + # Create a Fluss Schema first (this is what TableDescriptor expects) |
| 54 | + fluss_schema = fluss.Schema(schema) |
| 55 | + |
| 56 | + # Create a Fluss TableDescriptor |
| 57 | + table_descriptor = fluss.TableDescriptor(fluss_schema) |
| 58 | + |
| 59 | + # Get the admin for Fluss |
| 60 | + admin = await conn.get_admin() |
| 61 | + |
| 62 | + # Create a Fluss table |
| 63 | + table_path = fluss.TablePath("fluss", "sample_table") |
| 64 | + |
| 65 | + try: |
| 66 | + await admin.create_table(table_path, table_descriptor, True) |
| 67 | + print(f"Created table: {table_path}") |
| 68 | + except Exception as e: |
| 69 | + print(f"Table creation failed: {e}") |
| 70 | + |
| 71 | + # Get table information via admin |
| 72 | + try: |
| 73 | + table_info = await admin.get_table(table_path) |
| 74 | + print(f"Table info: {table_info}") |
| 75 | + print(f"Table ID: {table_info.table_id}") |
| 76 | + print(f"Schema ID: {table_info.schema_id}") |
| 77 | + print(f"Created time: {table_info.created_time}") |
| 78 | + print(f"Primary keys: {table_info.get_primary_keys()}") |
| 79 | + except Exception as e: |
| 80 | + print(f"Failed to get table info: {e}") |
| 81 | + |
| 82 | + # Get the table instance |
| 83 | + table = await conn.get_table(table_path) |
| 84 | + print(f"Got table: {table}") |
| 85 | + |
| 86 | + # Create a writer for the table |
| 87 | + append_writer = await table.new_append_writer() |
| 88 | + print(f"Created append writer: {append_writer}") |
| 89 | + |
| 90 | + try: |
| 91 | + # Test 1: Write PyArrow Table |
| 92 | + print("\n--- Testing PyArrow Table write ---") |
| 93 | + pa_table = pa.Table.from_arrays( |
| 94 | + [ |
| 95 | + pa.array([1, 2, 3], type=pa.int32()), |
| 96 | + pa.array(["Alice", "Bob", "Charlie"], type=pa.string()), |
| 97 | + pa.array([95.2, 87.2, 92.1], type=pa.float32()), |
| 98 | + pa.array([25, 30, 35], type=pa.int32()), |
| 99 | + ], |
| 100 | + schema=schema, |
| 101 | + ) |
| 102 | + |
| 103 | + append_writer.write_arrow(pa_table) |
| 104 | + print("Successfully wrote PyArrow Table") |
| 105 | + |
| 106 | + # Test 2: Write PyArrow RecordBatch |
| 107 | + print("\n--- Testing PyArrow RecordBatch write ---") |
| 108 | + pa_record_batch = pa.RecordBatch.from_arrays( |
| 109 | + [ |
| 110 | + pa.array([4, 5], type=pa.int32()), |
| 111 | + pa.array(["David", "Eve"], type=pa.string()), |
| 112 | + pa.array([88.5, 91.0], type=pa.float32()), |
| 113 | + pa.array([28, 32], type=pa.int32()), |
| 114 | + ], |
| 115 | + schema=schema, |
| 116 | + ) |
| 117 | + |
| 118 | + append_writer.write_arrow_batch(pa_record_batch) |
| 119 | + print("Successfully wrote PyArrow RecordBatch") |
| 120 | + |
| 121 | + # Test 3: Write Pandas DataFrame |
| 122 | + print("\n--- Testing Pandas DataFrame write ---") |
| 123 | + df = pd.DataFrame( |
| 124 | + { |
| 125 | + "id": [6, 7], |
| 126 | + "name": ["Frank", "Grace"], |
| 127 | + "score": [89.3, 94.7], |
| 128 | + "age": [29, 27], |
| 129 | + } |
| 130 | + ) |
| 131 | + |
| 132 | + append_writer.write_pandas(df) |
| 133 | + print("Successfully wrote Pandas DataFrame") |
| 134 | + |
| 135 | + # Flush all pending data |
| 136 | + print("\n--- Flushing data ---") |
| 137 | + append_writer.flush() |
| 138 | + print("Successfully flushed data") |
| 139 | + |
| 140 | + except Exception as e: |
| 141 | + print(f"Error during writing: {e}") |
| 142 | + |
| 143 | + # Now scan the table to verify data was written |
| 144 | + print("\n--- Scanning table ---") |
| 145 | + try: |
| 146 | + log_scanner = await table.new_log_scanner() |
| 147 | + print(f"Created log scanner: {log_scanner}") |
| 148 | + |
| 149 | + # Subscribe to scan from earliest to latest |
| 150 | + # start_timestamp=None (earliest), end_timestamp=None (latest) |
| 151 | + log_scanner.subscribe(None, None) |
| 152 | + |
| 153 | + print("Scanning results using to_arrow():") |
| 154 | + |
| 155 | + # Try to get as PyArrow Table |
| 156 | + try: |
| 157 | + pa_table_result = log_scanner.to_arrow() |
| 158 | + print(f"\nAs PyArrow Table: {pa_table_result}") |
| 159 | + except Exception as e: |
| 160 | + print(f"Could not convert to PyArrow: {e}") |
| 161 | + |
| 162 | + # Let's subscribe from the beginning again. |
| 163 | + # Reset subscription |
| 164 | + log_scanner.subscribe(None, None) |
| 165 | + |
| 166 | + # Try to get as Pandas DataFrame |
| 167 | + try: |
| 168 | + df_result = log_scanner.to_pandas() |
| 169 | + print(f"\nAs Pandas DataFrame:\n{df_result}") |
| 170 | + except Exception as e: |
| 171 | + print(f"Could not convert to Pandas: {e}") |
| 172 | + |
| 173 | + # TODO: support to_arrow_batch_reader() |
| 174 | + # which is reserved for streaming use cases |
| 175 | + |
| 176 | + # TODO: support to_duckdb() |
| 177 | + |
| 178 | + except Exception as e: |
| 179 | + print(f"Error during scanning: {e}") |
| 180 | + |
| 181 | + # Close connection |
| 182 | + conn.close() |
| 183 | + print("\nConnection closed") |
| 184 | + |
| 185 | + |
| 186 | +if __name__ == "__main__": |
| 187 | + # Run the async main function |
| 188 | + asyncio.run(main()) |
0 commit comments