Skip to content

Commit 95bc0f0

Browse files
authored
Release/2.56.2 (#648)
Fix fallback when SSO is enabled and other auth methods are used Add multithreading support for BigQuery Storage Introduce in-memory mode for PyCarol
1 parent 291f3fd commit 95bc0f0

File tree

6 files changed

+272
-14
lines changed

6 files changed

+272
-14
lines changed

pycarol/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,4 @@
7979
from .storage import Storage # noqa
8080
from .subscription import Subscription # noqa
8181
from .tasks import Tasks # noqa
82+
from .memory import Memory # noqa

pycarol/bigquery.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
from google.api_core import retry as retries
1515
from google.auth.transport.requests import Request
1616

17+
from concurrent.futures import ThreadPoolExecutor, as_completed
18+
1719
try:
1820
import pandas
1921
except ImportError:
@@ -407,14 +409,15 @@ def _get_read_session(
407409
self,
408410
client: bigquery_storage.BigQueryReadClient,
409411
table_name: str,
410-
columns_names: T.Optional[T.List[str]] = None,
412+
column_names: T.Optional[T.List[str]] = None,
411413
row_restriction: T.Optional[str] = None,
412414
sample_percentage: T.Optional[float] = None,
415+
max_stream_count: T.Optional[int] = 1,
413416
) -> bigquery_storage_v1.types.ReadSession:
414417
read_options = None
415-
if columns_names is not None:
418+
if column_names is not None:
416419
read_options = types.ReadSession.TableReadOptions( # type:ignore # noqa:E501 pylint:disable=no-member
417-
selected_fields=columns_names,
420+
selected_fields=column_names,
418421
row_restriction=row_restriction,
419422
sample_percentage=sample_percentage,
420423
)
@@ -429,17 +432,18 @@ def _get_read_session(
429432
read_session = client.create_read_session(
430433
parent=parent,
431434
read_session=requested_session,
432-
max_stream_count=1,
435+
max_stream_count=max_stream_count,
433436
)
434437
return read_session
435438

436439
def query(
437440
self,
438441
table_name: str,
439-
columns_names: T.Optional[T.List[str]] = None,
442+
column_names: T.Optional[T.List[str]] = None,
440443
return_dataframe: bool = True,
441444
row_restriction: T.Optional[str] = None,
442445
sample_percentage: T.Optional[float] = None,
446+
max_stream_count: T.Optional[int] = 1
443447
) -> T.Union["pandas.DataFrame", T.List[bigquery_storage_v1.reader.ReadRowsPage]]:
444448
"""Read from BigQuery Storage API.
445449
@@ -473,24 +477,35 @@ def query(
473477
read_session = self._get_read_session(
474478
client,
475479
table_name,
476-
columns_names,
480+
column_names,
477481
row_restriction,
478482
sample_percentage,
483+
max_stream_count
479484
)
480485

481-
stream = read_session.streams[0]
482-
reader = client.read_rows(stream.name)
486+
all_frames = []
487+
488+
def _read_stream(stream):
489+
frames = []
490+
reader = client.read_rows(stream.name)
491+
for frame in reader.rows().pages:
492+
frames.append(frame)
493+
return frames
494+
495+
with ThreadPoolExecutor(max_workers=len(read_session.streams)) as executor:
496+
futures = {executor.submit(_read_stream, s): s for s in read_session.streams}
483497

484-
frames = []
485-
for frame in reader.rows().pages:
486-
frames.append(frame)
498+
for future in as_completed(futures):
499+
df = future.result()
500+
all_frames.extend(df)
487501

488502
if return_dataframe is False:
489-
return frames
503+
return all_frames
490504

491505
if "pandas" not in sys.modules and return_dataframe is True:
492506
raise exceptions.PandasNotFoundException
493507

494-
dataframe = pandas.concat([frame.to_dataframe() for frame in frames])
508+
dataframe = pandas.concat([frame.to_dataframe() for frame in all_frames])
495509
dataframe = dataframe.reset_index(drop=True)
496510
return dataframe
511+

pycarol/carol.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,10 +329,17 @@ def call_api(
329329
return {}
330330
return json.loads(response.text)
331331

332+
if ('Single Sign On enabled' in response.text) and isinstance(self.auth, PwdAuth):
333+
raise exceptions.InvalidToken(
334+
response.text
335+
+ ". Please use PwdFluig or ApiKeyAuth to authenticate.")
336+
332337
if (response.reason == "Unauthorized") and isinstance(self.auth, PwdAuth):
333338
if response.json().get("possibleResponsibleField") in [
334339
"password",
335340
"userLogin",
341+
"userLogin/password",
342+
"mfaCode",
336343
]:
337344
raise exceptions.InvalidToken(response.text)
338345
self.auth.get_access_token() # It will refresh token if Unauthorized

pycarol/exceptions.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,17 @@ def __init__(self):
100100
" False."
101101
)
102102
super().__init__(msg)
103+
104+
105+
class TableNotFoundError(Exception):
106+
107+
"""Custom exception for when trying to append to a table that doesn't exist."""
108+
109+
pass
110+
111+
112+
class InsertOperationError(Exception):
113+
114+
"""Custom exception for when an insert operation fails."""
115+
116+
pass

pycarol/memory.py

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
import duckdb
2+
import pandas as pd
3+
from typing import List, Optional, Union, Tuple
4+
from . import exceptions
5+
6+
7+
class Memory:
8+
def __init__(self, dfs: Optional[List[Tuple[pd.DataFrame, str]]] = None, database_path: Optional[str] = None) -> None:
9+
"""
10+
Initialize Memory class with optional database file.
11+
12+
Args:
13+
dfs: List of (dataframe, table_name) tuples to cache
14+
database_path: Path to database file. If None, uses in-memory database.
15+
"""
16+
self.database_path = database_path
17+
self._is_memory_mode = database_path is None
18+
19+
# Create read-write connection for data loading
20+
if self._is_memory_mode:
21+
self.conn = duckdb.connect(database=':memory:', read_only=False)
22+
else:
23+
self.conn = duckdb.connect(database=database_path, read_only=False)
24+
25+
if dfs:
26+
self.cache_dataframes(dfs)
27+
28+
def cache_dataframes(self, dfs: List[Tuple[pd.DataFrame, str]]) -> None:
29+
"""Cache multiple dataframes to the database.
30+
31+
This method takes a list of (dataframe, table_name) tuples and adds each
32+
dataframe to the database with the specified table name.
33+
34+
Args:
35+
dfs: List of tuples containing (dataframe, table_name) pairs to cache.
36+
37+
Example:
38+
>>> memory = Memory()
39+
>>> df1 = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
40+
>>> df2 = pd.DataFrame({"x": [5, 6], "y": [7, 8]})
41+
>>> memory.cache_dataframes([(df1, "table1"), (df2, "table2")])
42+
"""
43+
for df, table_name in dfs:
44+
self.add(table_name, df)
45+
46+
def add(self, table_name: str, df: pd.DataFrame) -> None:
47+
"""Add or replace a table in the database.
48+
49+
This method creates a new table or replaces an existing one with the
50+
provided dataframe. If a table with the same name already exists, it
51+
will be dropped and recreated with the new data.
52+
53+
Args:
54+
table_name: Name of the table to create or replace.
55+
df: Pandas DataFrame containing the data to store.
56+
57+
Example:
58+
>>> memory = Memory()
59+
>>> df = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
60+
>>> memory.add("users", df)
61+
"""
62+
self.conn.execute(f"DROP TABLE IF EXISTS {table_name}")
63+
self.conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df")
64+
65+
def delete(self, table_name: str) -> None:
66+
"""remove a table in the database.
67+
68+
This method removes a table from the database if the table exists.
69+
70+
Args:
71+
table_name: Name of the table to remove.
72+
73+
Example:
74+
>>> memory = Memory()
75+
>>> df = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
76+
>>> memory.add("users", df)
77+
>>> memory.remove("users")
78+
"""
79+
self.conn.execute(f"DROP TABLE IF EXISTS {table_name}")
80+
81+
def append(self, table_name: str, df: pd.DataFrame) -> None:
82+
"""Append data to an existing table in the database.
83+
84+
This method adds new data to an existing table. The incoming dataframe
85+
will be automatically reordered to match the existing table's column
86+
order to ensure compatibility. The table must already exist.
87+
88+
Args:
89+
table_name: Name of the existing table to append data to.
90+
df: Pandas DataFrame containing the data to append.
91+
92+
Raises:
93+
exceptions.TableNotFoundError: If the specified table does not exist.
94+
exceptions.InsertOperationError: If the insert operation fails.
95+
96+
Example:
97+
>>> memory = Memory()
98+
>>> # First create a table
99+
>>> initial_df = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
100+
>>> memory.add("users", initial_df)
101+
>>> # Then append more data
102+
>>> new_df = pd.DataFrame({"id": [3, 4], "name": ["Charlie", "Diana"]})
103+
>>> memory.append("users", new_df)
104+
"""
105+
# Check if table exists
106+
try:
107+
table_info = self.conn.execute(f"DESCRIBE {table_name}").fetchdf()
108+
except Exception as e:
109+
raise exceptions.TableNotFoundError(f"Table '{table_name}' does not exist on Memory. Error: {str(e)}")
110+
111+
existing_columns = table_info['column_name'].tolist()
112+
113+
# Reorder dataframe columns to match existing table
114+
df_reordered = df[existing_columns]
115+
116+
# Attempt to insert data
117+
try:
118+
self.conn.execute(f"INSERT INTO {table_name} SELECT * FROM df_reordered")
119+
except Exception as e:
120+
raise exceptions.InsertOperationError(f"Failed to insert data into table '{table_name}' on Memory. Error: {str(e)}")
121+
122+
def query(self, query: str) -> pd.DataFrame:
123+
"""Execute a SELECT query on the database.
124+
125+
This method executes SQL SELECT queries on the database. Only SELECT
126+
statements and Common Table Expressions (CTEs) are allowed for security.
127+
All queries are validated to prevent SQL injection attacks.
128+
129+
Args:
130+
query: SQL SELECT query string to execute.
131+
132+
Returns:
133+
pd.DataFrame: Query results as a pandas DataFrame.
134+
135+
Raises:
136+
ValueError: If the query is not a valid SELECT statement or contains
137+
multiple statements with non-SELECT operations.
138+
139+
Example:
140+
>>> memory = Memory()
141+
>>> memory.add("users", df)
142+
>>> result = memory.query("SELECT * FROM users WHERE id > 1")
143+
>>> result = memory.query("WITH cte AS (SELECT * FROM users) SELECT * FROM cte")
144+
"""
145+
if not self._is_select_query(query):
146+
raise ValueError("Only SELECT queries are allowed. Other operations are not permitted.")
147+
return self.conn.execute(query).fetchdf()
148+
149+
def _is_select_query(self, query: str) -> bool:
150+
"""Check if the query contains only SELECT statements, preventing SQL injection.
151+
152+
This private method validates that the provided query contains only SELECT
153+
statements or Common Table Expressions (CTEs). It prevents SQL injection
154+
by ensuring no other SQL operations (INSERT, UPDATE, DELETE, DROP, etc.)
155+
are present in the query.
156+
157+
Args:
158+
query: SQL query string to validate.
159+
160+
Returns:
161+
bool: True if the query contains only SELECT/WITH statements,
162+
False otherwise.
163+
"""
164+
normalized_query = query.strip()
165+
166+
statements = [stmt.strip() for stmt in normalized_query.split(';') if stmt.strip()]
167+
168+
for statement in statements:
169+
statement_upper = statement.upper()
170+
if not statement_upper.startswith(('SELECT', 'WITH')):
171+
return False
172+
173+
return len(statements) > 0
174+
175+
def is_memory_mode(self) -> bool:
176+
"""Check if the database is running in memory mode.
177+
178+
Returns:
179+
bool: True if using in-memory database, False if using file database.
180+
"""
181+
return self._is_memory_mode
182+
183+
def get_database_path(self) -> Optional[str]:
184+
"""Get the database file path.
185+
186+
Returns:
187+
Optional[str]: Path to the database file if using file mode,
188+
None if using memory mode.
189+
"""
190+
return self.database_path
191+
192+
def close(self) -> None:
193+
"""Close database connections.
194+
195+
This method properly closes all database connections. It should be
196+
called when the Memory instance is no longer needed to free up
197+
resources.
198+
"""
199+
if hasattr(self, 'conn'):
200+
self.conn.close()
201+
202+
def __enter__(self) -> 'Memory':
203+
"""Context manager entry.
204+
205+
Returns:
206+
Memory: The Memory instance for use in a with statement.
207+
"""
208+
return self
209+
210+
def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[object]) -> None:
211+
"""Context manager exit.
212+
213+
Automatically closes database connections when exiting the with block.
214+
215+
Args:
216+
exc_type: Exception type if an exception occurred.
217+
exc_val: Exception value if an exception occurred.
218+
exc_tb: Exception traceback if an exception occurred.
219+
"""
220+
self.close()

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
"urllib3",
2525
"pandas>=0.23.4,!=1.0.4",
2626
"numpy>=1.16.3",
27-
"pip-system-certs"
27+
"pip-system-certs",
28+
"duckdb==1.4.0"
2829
]
2930

3031
dataframe_requires = [

0 commit comments

Comments
 (0)