Skip to content

Commit bd6a72d

Browse files
committed
Merge branch 'm-kovalsky/_get_column_value'
2 parents c1896e0 + 7d5c66c commit bd6a72d

File tree

5 files changed

+26
-19
lines changed

5 files changed

+26
-19
lines changed

src/sempy_labs/_helper_functions.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,16 +1188,24 @@ def generate_guid():
11881188
return str(uuid.uuid4())
11891189

11901190

1191-
def _get_max_run_id(lakehouse: str, table_name: str) -> int:
1191+
def _get_column_aggregate(
1192+
lakehouse: str,
1193+
table_name: str,
1194+
column_name: str = "RunId",
1195+
function: str = "max",
1196+
default_value: int = 0,
1197+
) -> int:
11921198

11931199
from pyspark.sql import SparkSession
11941200

11951201
spark = SparkSession.builder.getOrCreate()
1196-
query = f"SELECT MAX(RunId) FROM {lakehouse}.{table_name}"
1202+
function = function.upper()
1203+
query = f"SELECT {function}({column_name}) FROM {lakehouse}.{table_name}"
1204+
if "COUNT" in function and "DISTINCT" in function:
1205+
query = f"SELECT COUNT(DISTINCT({column_name})) FROM {lakehouse}.{table_name}"
11971206
dfSpark = spark.sql(query)
1198-
max_run_id = dfSpark.collect()[0][0] or 0
11991207

1200-
return max_run_id
1208+
return dfSpark.collect()[0][0] or default_value
12011209

12021210

12031211
def _make_list_unique(my_list):
@@ -1209,20 +1217,17 @@ def _get_partition_map(
12091217
dataset: str, workspace: Optional[str | UUID] = None
12101218
) -> pd.DataFrame:
12111219

1212-
(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
1213-
(dataset_name, dataset_id) = resolve_dataset_name_and_id(dataset, workspace_id)
1214-
12151220
partitions = fabric.evaluate_dax(
1216-
dataset=dataset_id,
1217-
workspace=workspace_id,
1221+
dataset=dataset,
1222+
workspace=workspace,
12181223
dax_string="""
12191224
select [ID] AS [PartitionID], [TableID], [Name] AS [PartitionName] from $system.tmschema_partitions
12201225
""",
12211226
)
12221227

12231228
tables = fabric.evaluate_dax(
1224-
dataset=dataset_id,
1225-
workspace=workspace_id,
1229+
dataset=dataset,
1230+
workspace=workspace,
12261231
dax_string="""
12271232
select [ID] AS [TableID], [Name] AS [TableName] from $system.tmschema_tables
12281233
""",

src/sempy_labs/_model_bpa.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
resolve_workspace_capacity,
1313
resolve_dataset_name_and_id,
1414
get_language_codes,
15-
_get_max_run_id,
15+
_get_column_aggregate,
1616
resolve_workspace_name_and_id,
1717
)
1818
from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached
@@ -386,7 +386,7 @@ def translate_using_spark(rule_file):
386386
if len(lakeT_filt) == 0:
387387
runId = 1
388388
else:
389-
max_run_id = _get_max_run_id(
389+
max_run_id = _get_column_aggregate(
390390
lakehouse=lakehouse, table_name=delta_table_name
391391
)
392392
runId = max_run_id + 1

src/sempy_labs/_model_bpa_bulk.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
save_as_delta_table,
77
resolve_workspace_capacity,
88
retry,
9-
_get_max_run_id,
9+
_get_column_aggregate,
1010
)
1111
from sempy_labs.lakehouse import (
1212
get_lakehouse_tables,
@@ -76,7 +76,7 @@ def run_model_bpa_bulk(
7676
if len(lakeT_filt) == 0:
7777
runId = 1
7878
else:
79-
max_run_id = _get_max_run_id(lakehouse=lakehouse, table_name=output_table)
79+
max_run_id = _get_column_aggregate(lakehouse=lakehouse, table_name=output_table)
8080
runId = max_run_id + 1
8181

8282
if isinstance(workspace, str):

src/sempy_labs/_vertipaq.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
resolve_lakehouse_name,
1313
save_as_delta_table,
1414
resolve_workspace_capacity,
15-
_get_max_run_id,
15+
_get_column_aggregate,
1616
resolve_workspace_name_and_id,
1717
resolve_dataset_name_and_id,
1818
)
@@ -519,7 +519,9 @@ def _style_columns_based_on_types(dataframe: pd.DataFrame, column_type_mapping):
519519
if len(lakeT_filt) == 0:
520520
runId = 1
521521
else:
522-
max_run_id = _get_max_run_id(lakehouse=lakehouse, table_name=lakeTName)
522+
max_run_id = _get_column_aggregate(
523+
lakehouse=lakehouse, table_name=lakeTName
524+
)
523525
runId = max_run_id + 1
524526

525527
dfMap = {

src/sempy_labs/report/_report_bpa.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
resolve_report_id,
1111
resolve_lakehouse_name,
1212
resolve_workspace_capacity,
13-
_get_max_run_id,
13+
_get_column_aggregate,
1414
resolve_workspace_name_and_id,
1515
)
1616
from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached
@@ -217,7 +217,7 @@ def execute_rule(row):
217217
if len(lakeT_filt) == 0:
218218
runId = 1
219219
else:
220-
max_run_id = _get_max_run_id(
220+
max_run_id = _get_column_aggregate(
221221
lakehouse=lakehouse, table_name=delta_table_name
222222
)
223223
runId = max_run_id + 1

0 commit comments

Comments
 (0)