Skip to content

Commit c263bdd

Browse files
authored
#49 Implement Python native write with PyArrow (#51)
1 parent e3b56d8 commit c263bdd

File tree

107 files changed

+5937
-1915
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+5937
-1915
lines changed

dev/dev-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ setuptools>=18.0
2121
wheel
2222
py4j==0.10.9.7
2323
pyarrow>=5.0.0
24+
polars>=1.31.0
2425
fastavro>=1.9.0
2526
zstandard>=0.23.0
2627
pandas>=1.3.0

pypaimon/__init__.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,37 +15,3 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
#################################################################################
18-
19-
from .api import Schema
20-
from .py4j import Catalog
21-
from .py4j import CommitMessage
22-
from .py4j import Predicate
23-
from .py4j import PredicateBuilder
24-
from .py4j import ReadBuilder
25-
from .py4j import RowType
26-
from .py4j import Split
27-
from .py4j import Table
28-
from .py4j import BatchTableCommit
29-
from .py4j import TableRead
30-
from .py4j import TableScan
31-
from .py4j import Plan
32-
from .py4j import BatchTableWrite
33-
from .py4j import BatchWriteBuilder
34-
35-
__all__ = [
36-
'Schema',
37-
'Catalog',
38-
'CommitMessage',
39-
'Predicate',
40-
'PredicateBuilder',
41-
'ReadBuilder',
42-
'RowType',
43-
'Split',
44-
'Table',
45-
'BatchTableCommit',
46-
'TableRead',
47-
'TableScan',
48-
'Plan',
49-
'BatchTableWrite',
50-
'BatchWriteBuilder'
51-
]

pypaimon/api/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
from .table_commit import BatchTableCommit
2626
from .table_write import BatchTableWrite
2727
from .write_builder import BatchWriteBuilder
28-
from .table import Table, Schema
28+
from .schema import Schema
29+
from .table import Table
30+
from .database import Database
2931
from .catalog import Catalog
3032

3133
__all__ = [
@@ -40,6 +42,7 @@
4042
'BatchWriteBuilder',
4143
'Table',
4244
'Schema',
45+
'Database',
4346
'Catalog',
4447
'Predicate',
4548
'PredicateBuilder'

pypaimon/api/catalog.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from abc import ABC, abstractmethod
2020
from typing import Optional
21-
from pypaimon.api import Table, Schema
21+
from pypaimon.api import Table, Schema, Database
2222

2323

2424
class Catalog(ABC):
@@ -27,10 +27,9 @@ class Catalog(ABC):
2727
metadata such as database/table from a paimon catalog.
2828
"""
2929

30-
@staticmethod
3130
@abstractmethod
32-
def create(catalog_options: dict) -> 'Catalog':
33-
"""Create catalog from configuration."""
31+
def get_database(self, name: str) -> 'Database':
32+
"""Get paimon database identified by the given name."""
3433

3534
@abstractmethod
3635
def get_table(self, identifier: str) -> Table:

pypaimon/api/catalog_factory.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pypaimon.api.catalog import Catalog
2+
3+
4+
class CatalogFactory:
5+
6+
@staticmethod
7+
def create(catalog_options: dict) -> Catalog:
8+
from pypaimon.pynative.catalog.catalog_option import CatalogOptions
9+
from pypaimon.pynative.catalog.abstract_catalog import AbstractCatalog
10+
from pypaimon.pynative.catalog.filesystem_catalog import FileSystemCatalog # noqa: F401
11+
from pypaimon.pynative.catalog.hive_catalog import HiveCatalog # noqa: F401
12+
13+
identifier = catalog_options.get(CatalogOptions.METASTORE, "filesystem")
14+
subclasses = AbstractCatalog.__subclasses__()
15+
for subclass in subclasses:
16+
if subclass.identifier() == identifier:
17+
return subclass(catalog_options)
18+
raise ValueError(f"Unknown catalog identifier: {identifier}")

pypaimon/api/database.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#################################################################################
18+
19+
from typing import Optional
20+
21+
22+
class Database:
23+
"""Structure of a Database."""
24+
25+
def __init__(self, name: str, properties: dict, comment: Optional[str] = None):
26+
self.name = name
27+
self.properties = properties
28+
self.comment = comment

pypaimon/api/schema.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#################################################################################
18+
19+
import pyarrow as pa
20+
21+
from typing import Optional, List
22+
23+
24+
class Schema:
25+
"""Schema of a table."""
26+
27+
def __init__(self,
28+
pa_schema: pa.Schema,
29+
partition_keys: Optional[List[str]] = None,
30+
primary_keys: Optional[List[str]] = None,
31+
options: Optional[dict] = None,
32+
comment: Optional[str] = None):
33+
self.pa_schema = pa_schema
34+
self.partition_keys = partition_keys
35+
self.primary_keys = primary_keys
36+
self.options = options
37+
self.comment = comment

pypaimon/api/table.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@
1616
# limitations under the License.
1717
#################################################################################
1818

19-
import pyarrow as pa
20-
2119
from abc import ABC, abstractmethod
2220
from pypaimon.api import ReadBuilder, BatchWriteBuilder
23-
from typing import Optional, List
2421

2522

2623
class Table(ABC):
@@ -33,19 +30,3 @@ def new_read_builder(self) -> ReadBuilder:
3330
@abstractmethod
3431
def new_batch_write_builder(self) -> BatchWriteBuilder:
3532
"""Returns a builder for building batch table write and table commit."""
36-
37-
38-
class Schema:
39-
"""Schema of a table."""
40-
41-
def __init__(self,
42-
pa_schema: pa.Schema,
43-
partition_keys: Optional[List[str]] = None,
44-
primary_keys: Optional[List[str]] = None,
45-
options: Optional[dict] = None,
46-
comment: Optional[str] = None):
47-
self.pa_schema = pa_schema
48-
self.partition_keys = partition_keys
49-
self.primary_keys = primary_keys
50-
self.options = options
51-
self.comment = comment

pypaimon/api/table_read.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from abc import ABC, abstractmethod
2323
from pypaimon.api import Split
24-
from typing import List, Optional, TYPE_CHECKING
24+
from typing import List, Optional, TYPE_CHECKING, Iterator
2525

2626
if TYPE_CHECKING:
2727
import ray
@@ -31,6 +31,10 @@
3131
class TableRead(ABC):
3232
"""To read data from data splits."""
3333

34+
@abstractmethod
35+
def to_iterator(self, splits: List[Split]) -> Iterator[tuple]:
36+
"""Read data from splits and converted to pyarrow.Table format."""
37+
3438
@abstractmethod
3539
def to_arrow(self, splits: List[Split]) -> pa.Table:
3640
"""Read data from splits and converted to pyarrow.Table format."""

pypaimon/api/table_write.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ class BatchTableWrite(ABC):
2828
"""A table write for batch processing. Recommended for one-time committing."""
2929

3030
@abstractmethod
31-
def write_arrow(self, table: pa.Table):
31+
def write_arrow(self, table: pa.Table, row_kind: List[int] = None):
3232
""" Write an arrow table to the writer."""
3333

3434
@abstractmethod
35-
def write_arrow_batch(self, record_batch: pa.RecordBatch):
35+
def write_arrow_batch(self, record_batch: pa.RecordBatch, row_kind: List[int] = None):
3636
""" Write an arrow record batch to the writer."""
3737

3838
@abstractmethod

0 commit comments

Comments
 (0)