Skip to content

Commit 1ea138b

Browse files
committed
Merge branch 'm-kovalsky/onelakeapis'
2 parents 749c5db + 4048a27 commit 1ea138b

File tree

4 files changed

+209
-85
lines changed

4 files changed

+209
-85
lines changed

src/sempy_labs/_helper_functions.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2249,12 +2249,12 @@ def get_token(self, *scopes, **kwargs) -> AccessToken:
22492249
elif client == "fabric_sp":
22502250
token = auth.token_provider.get() or FabricDefaultCredential()
22512251
c = fabric.FabricRestClient(credential=token)
2252-
elif client in ["azure", "graph"]:
2252+
elif client in ["azure", "graph", "onelake"]:
22532253
pass
22542254
else:
22552255
raise ValueError(f"{icons.red_dot} The '{client}' client is not supported.")
22562256

2257-
if client not in ["azure", "graph"]:
2257+
if client not in ["azure", "graph", "onelake"]:
22582258
if method == "get":
22592259
response = c.get(request)
22602260
elif method == "delete":
@@ -2268,13 +2268,18 @@ def get_token(self, *scopes, **kwargs) -> AccessToken:
22682268
else:
22692269
raise NotImplementedError
22702270
else:
2271-
headers = _get_headers(auth.token_provider.get(), audience=client)
2272-
if client == "graph":
2273-
url = f"https://graph.microsoft.com/v1.0/{request}"
2274-
elif client == "azure":
2275-
url = request
2271+
if client == "onelake":
2272+
import notebookutils
2273+
2274+
token = notebookutils.credentials.getToken("storage")
2275+
headers = {"Authorization": f"Bearer {token}"}
2276+
url = f"https://onelake.table.fabric.microsoft.com/delta/{request}"
22762277
else:
2277-
raise NotImplementedError
2278+
headers = _get_headers(auth.token_provider.get(), audience=client)
2279+
if client == "graph":
2280+
url = f"https://graph.microsoft.com/v1.0/{request}"
2281+
elif client == "azure":
2282+
url = request
22782283
response = requests.request(
22792284
method.upper(),
22802285
url,

src/sempy_labs/lakehouse/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
from ._materialized_lake_views import (
3535
refresh_materialized_lake_views,
3636
)
37+
from ._schemas import (
38+
list_schemas,
39+
schema_exists,
40+
)
3741

3842
__all__ = [
3943
"get_lakehouse_columns",
@@ -56,4 +60,6 @@
5660
"load_table",
5761
"refresh_materialized_lake_views",
5862
"list_lakehouses",
63+
"list_schemas",
64+
"schema_exists",
5965
]

src/sempy_labs/lakehouse/_get_lakehouse_tables.py

Lines changed: 5 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@
44
from datetime import datetime
55
from sempy_labs._helper_functions import (
66
_get_column_aggregate,
7-
resolve_workspace_name_and_id,
87
resolve_lakehouse_name_and_id,
98
save_as_delta_table,
10-
_base_api,
11-
_create_dataframe,
9+
resolve_workspace_id,
1210
_read_delta_table,
1311
_get_delta_table,
1412
_mount,
@@ -24,6 +22,7 @@
2422
import sempy_labs._icons as icons
2523
from sempy._utils._log import log
2624
from uuid import UUID
25+
from sempy_labs.lakehouse._schemas import list_tables
2726

2827

2928
@log
@@ -70,84 +69,14 @@ def get_lakehouse_tables(
7069
Shows the tables/columns within a lakehouse and their properties.
7170
"""
7271

73-
columns = {
74-
"Workspace Name": "string",
75-
"Lakehouse Name": "string",
76-
"Schema Name": "string",
77-
"Table Name": "string",
78-
"Format": "string",
79-
"Type": "string",
80-
"Location": "string",
81-
}
82-
df = _create_dataframe(columns=columns)
83-
84-
(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
72+
workspace_id = resolve_workspace_id(workspace)
8573
(lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id(
8674
lakehouse=lakehouse, workspace=workspace_id
8775
)
8876

89-
# Test if valid lakehouse:
90-
x = _base_api(f"v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}")
77+
df = list_tables(lakehouse=lakehouse, workspace=workspace)
9178

92-
if count_rows: # Setting countrows defaults to extended=True
93-
extended = True
94-
95-
API_called = True
96-
try:
97-
responses = _base_api(
98-
request=f"v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables",
99-
uses_pagination=True,
100-
client="fabric_sp",
101-
)
102-
103-
except Exception:
104-
API_called = False
105-
106-
rows = []
107-
local_path = None
108-
if API_called:
109-
if not responses[0].get("data"):
110-
return df
111-
112-
for r in responses:
113-
for i in r.get("data", []):
114-
rows.append(
115-
{
116-
"Workspace Name": workspace_name,
117-
"Lakehouse Name": lakehouse_name,
118-
"Schema Name": "",
119-
"Table Name": i.get("name"),
120-
"Format": i.get("format"),
121-
"Type": i.get("type"),
122-
"Location": i.get("location"),
123-
}
124-
)
125-
else:
126-
local_path = _mount(lakehouse=lakehouse_id, workspace=workspace_id)
127-
tables_path = os.path.join(local_path, "Tables")
128-
list_schema = os.listdir(tables_path)
129-
130-
for schema_name in list_schema:
131-
schema_table_path = os.path.join(local_path, "Tables", schema_name)
132-
list_tables = os.listdir(schema_table_path)
133-
for table_name in list_tables:
134-
location_path = create_abfss_path(
135-
lakehouse_id, workspace_id, table_name, schema_name
136-
)
137-
rows.append(
138-
{
139-
"Workspace Name": workspace_name,
140-
"Lakehouse Name": lakehouse_name,
141-
"Schema Name": schema_name,
142-
"Table Name": table_name,
143-
"Format": "delta",
144-
"Type": "Managed",
145-
"Location": location_path,
146-
}
147-
)
148-
149-
if rows:
150-
df = pd.DataFrame(rows, columns=list(columns.keys()))
79+
local_path = _mount(lakehouse=lakehouse_id, workspace=workspace_id)
15180

15281
if extended:
15382
sku_value = get_sku_size(workspace_id)
@@ -161,7 +90,6 @@ def get_lakehouse_tables(
16190
df["Row Count"] = None
16291

16392
for i, r in df.iterrows():
164-
use_schema = True
16593
schema_name = r["Schema Name"]
16694
table_name = r["Table Name"]
16795
if r["Type"] == "Managed" and r["Format"] == "delta":
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
from typing import Optional, List
2+
from uuid import UUID
3+
from sempy._utils._log import log
4+
import pandas as pd
5+
from sempy_labs._helper_functions import (
6+
resolve_lakehouse_name_and_id,
7+
resolve_workspace_id,
8+
resolve_lakehouse_id,
9+
_create_dataframe,
10+
_base_api,
11+
resolve_workspace_name_and_id,
12+
)
13+
import sempy_labs._icons as icons
14+
15+
16+
@log
17+
def list_schemas(
18+
lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None
19+
) -> pd.DataFrame:
20+
"""
21+
Lists the schemas within a Fabric lakehouse.
22+
23+
Parameters
24+
----------
25+
lakehouse : str | uuid.UUID, default=None
26+
The Fabric lakehouse name or ID.
27+
Defaults to None which resolves to the lakehouse attached to the notebook.
28+
workspace : str | uuid.UUID, default=None
29+
The Fabric workspace name or ID used by the lakehouse.
30+
Defaults to None which resolves to the workspace of the attached lakehouse
31+
or if no lakehouse attached, resolves to the workspace of the notebook.
32+
33+
Returns
34+
-------
35+
pandas.DataFrame
36+
Shows the schemas within a lakehouse.
37+
"""
38+
39+
columns = {
40+
"Schema Name": "str",
41+
}
42+
df = _create_dataframe(columns=columns)
43+
workspace_id = resolve_workspace_id(workspace)
44+
item_id = resolve_lakehouse_id(lakehouse, workspace)
45+
response = _base_api(
46+
request=f"{workspace_id}/{item_id}/api/2.1/unity-catalog/schemas?catalog_name={item_id}",
47+
client="onelake",
48+
)
49+
50+
rows = []
51+
for s in response.json().get("schemas", []):
52+
rows.append(
53+
{
54+
"Schema Name": s.get("name", None),
55+
}
56+
)
57+
58+
if rows:
59+
df = pd.DataFrame(rows, columns=list(columns.keys()))
60+
61+
return df
62+
63+
64+
def list_tables(
65+
lakehouse: Optional[str | UUID] = None,
66+
workspace: Optional[str | UUID] = None,
67+
schema: Optional[str | List[str]] = None,
68+
) -> pd.DataFrame:
69+
70+
(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
71+
(item_name, item_id) = resolve_lakehouse_name_and_id(lakehouse, workspace)
72+
73+
response = _base_api(f"/v1/workspaces/{workspace_id}/lakehouses/{item_id}")
74+
default_schema = response.json().get("properties", {}).get("defaultSchema", None)
75+
schema_enabled = True if default_schema else False
76+
77+
columns = {
78+
"Workspace Name": "str",
79+
"Lakehouse Name": "str",
80+
"Table Name": "str",
81+
"Schema Name": "str",
82+
"Format": "str",
83+
"Type": "str",
84+
"Location": "str",
85+
}
86+
df = _create_dataframe(columns=columns)
87+
88+
rows = []
89+
if schema_enabled:
90+
schemas = list_schemas(lakehouse=lakehouse, workspace=workspace)
91+
if schema:
92+
if isinstance(schema, str):
93+
schema = [schema]
94+
schemas = schemas[schemas["Schema Name"].isin(schema)]
95+
96+
# Loop through schemas
97+
for _, r in schemas.iterrows():
98+
schema_name = r["Schema Name"]
99+
response = _base_api(
100+
request=f"{workspace_id}/{item_id}/api/2.1/unity-catalog/tables?catalog_name={item_id}&schema_name={schema_name}",
101+
client="onelake",
102+
)
103+
# Loop through tables
104+
for t in response.json().get("tables", []):
105+
location = t.get("storage_location", {})
106+
location = f'abfss://{location.split(".microsoft.com/")[1]}'
107+
rows.append(
108+
{
109+
"Workspace Name": workspace_name,
110+
"Lakehouse Name": item_name,
111+
"Table Name": t.get("name", {}),
112+
"Schema Name": schema_name,
113+
"Format": t.get("data_source_format", {}).capitalize(),
114+
"Type": "Managed",
115+
"Location": location,
116+
}
117+
)
118+
else:
119+
if schema:
120+
print(
121+
f"{icons.info} The schema parameter has been ignored as the '{item_name}' lakehouse within the '{workspace_name}' workspace has schemas disabled."
122+
)
123+
responses = _base_api(
124+
request=f"v1/workspaces/{workspace_id}/lakehouses/{item_id}/tables",
125+
uses_pagination=True,
126+
client="fabric_sp",
127+
)
128+
for r in responses:
129+
for i in r.get("data", []):
130+
rows.append(
131+
{
132+
"Workspace Name": workspace_name,
133+
"Lakehouse Name": item_name,
134+
"Schema Name": None,
135+
"Table Name": i.get("name"),
136+
"Format": i.get("format"),
137+
"Type": i.get("type"),
138+
"Location": i.get("location"),
139+
}
140+
)
141+
142+
if rows:
143+
df = pd.DataFrame(rows, columns=list(columns.keys()))
144+
145+
return df
146+
147+
148+
def schema_exists(
149+
schema: str,
150+
lakehouse: Optional[str | UUID] = None,
151+
workspace: Optional[str | UUID] = None,
152+
) -> bool:
153+
"""
154+
Indicates whether the specified schema exists within a Fabric lakehouse.
155+
156+
Parameters
157+
----------
158+
schema : str
159+
The name of the schema.
160+
lakehouse : str | uuid.UUID, default=None
161+
The Fabric lakehouse name or ID.
162+
Defaults to None which resolves to the lakehouse attached to the notebook.
163+
workspace : str | uuid.UUID, default=None
164+
The Fabric workspace name or ID used by the lakehouse.
165+
Defaults to None which resolves to the workspace of the attached lakehouse
166+
or if no lakehouse attached, resolves to the workspace of the notebook.
167+
168+
Returns
169+
-------
170+
bool
171+
Indicates whether the specified schema exists within the lakehouse.
172+
"""
173+
174+
df = list_schemas(lakehouse=lakehouse, workspace=workspace)
175+
return schema in df["Schema Name"].values
176+
177+
# (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
178+
# (item_name, item_id) = resolve_lakehouse_name_and_id(lakehouse, workspace)
179+
# response = _base_api(
180+
# request=f"{workspace_id}/{item_id}/api/2.1/unity-catalog/schemas/{schema}",
181+
# client="onelake",
182+
# method="head",
183+
# )
184+
185+
# response.json()

0 commit comments

Comments
 (0)