Skip to content

Commit af5330e

Browse files
committed
Merge branch 'm-kovalsky/eventstreams'
2 parents 6f2b94a + 5244210 commit af5330e

File tree

4 files changed

+952
-8
lines changed

4 files changed

+952
-8
lines changed

src/sempy_labs/__init__.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,6 @@
115115
delete_eventhouse,
116116
get_eventhouse_definition,
117117
)
118-
from ._eventstreams import (
119-
list_eventstreams,
120-
create_eventstream,
121-
delete_eventstream,
122-
)
123118
from ._kql_querysets import (
124119
list_kql_querysets,
125120
create_kql_queryset,
@@ -492,9 +487,6 @@
492487
"list_data_pipelines",
493488
"create_data_pipeline",
494489
"delete_data_pipeline",
495-
"list_eventstreams",
496-
"create_eventstream",
497-
"delete_eventstream",
498490
"list_kql_querysets",
499491
"create_kql_queryset",
500492
"delete_kql_queryset",
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from ._items import (
2+
list_eventstreams,
3+
create_eventstream,
4+
delete_eventstream,
5+
get_eventstream_definition,
6+
)
7+
from ._topology import (
8+
get_eventstream_destination,
9+
get_eventstream_destination_connection,
10+
get_eventstream_source,
11+
get_eventstream_source_connection,
12+
get_eventstream_topology,
13+
pause_eventstream,
14+
pause_eventstream_destination,
15+
pause_eventstream_source,
16+
resume_eventstream,
17+
resume_eventstream_destination,
18+
resume_eventstream_source,
19+
)
20+
21+
__all__ = [
22+
"list_eventstreams",
23+
"create_eventstream",
24+
"delete_eventstream",
25+
"get_eventstream_definition",
26+
"get_eventstream_destination",
27+
"get_eventstream_destination_connection",
28+
"get_eventstream_source",
29+
"get_eventstream_source_connection",
30+
"get_eventstream_topology",
31+
"pause_eventstream",
32+
"pause_eventstream_destination",
33+
"pause_eventstream_source",
34+
"resume_eventstream",
35+
"resume_eventstream_destination",
36+
"resume_eventstream_source",
37+
]
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
import pandas as pd
2+
from typing import Optional
3+
from sempy_labs._helper_functions import (
4+
_base_api,
5+
delete_item,
6+
_create_dataframe,
7+
create_item,
8+
resolve_workspace_id,
9+
resolve_item_id,
10+
_decode_b64,
11+
)
12+
from uuid import UUID
13+
import sempy_labs._icons as icons
14+
from sempy._utils._log import log
15+
import json
16+
17+
18+
@log
19+
def list_eventstreams(workspace: Optional[str | UUID] = None) -> pd.DataFrame:
20+
"""
21+
Shows the eventstreams within a workspace.
22+
23+
This is a wrapper function for the following API: `Items - List Eventstreams <https://learn.microsoft.com/rest/api/fabric/environment/items/list-eventstreams>`_.
24+
25+
Parameters
26+
----------
27+
workspace : str | uuid.UUID, default=None
28+
The Fabric workspace name or ID.
29+
Defaults to None which resolves to the workspace of the attached lakehouse
30+
or if no lakehouse attached, resolves to the workspace of the notebook.
31+
32+
Returns
33+
-------
34+
pandas.DataFrame
35+
A pandas dataframe showing the eventstreams within a workspace.
36+
"""
37+
38+
columns = {
39+
"Eventstream Name": "string",
40+
"Eventstream Id": "string",
41+
"Description": "string",
42+
}
43+
df = _create_dataframe(columns=columns)
44+
45+
workspace_id = resolve_workspace_id(workspace)
46+
responses = _base_api(
47+
request=f"/v1/workspaces/{workspace_id}/eventstreams", uses_pagination=True
48+
)
49+
50+
rows = []
51+
for r in responses:
52+
for v in r.get("value", []):
53+
rows.append(
54+
{
55+
"Eventstream Name": v.get("displayName"),
56+
"Eventstream Id": v.get("id"),
57+
"Description": v.get("description"),
58+
}
59+
)
60+
61+
if rows:
62+
df = pd.DataFrame(rows, columns=list(columns.keys()))
63+
64+
return df
65+
66+
67+
@log
68+
def create_eventstream(
69+
name: str, description: Optional[str] = None, workspace: Optional[str | UUID] = None
70+
):
71+
"""
72+
Creates a Fabric eventstream.
73+
74+
This is a wrapper function for the following API: `Items - Create Eventstream <https://learn.microsoft.com/rest/api/fabric/environment/items/create-eventstream>`_.
75+
76+
Parameters
77+
----------
78+
name: str
79+
Name of the eventstream.
80+
description : str, default=None
81+
A description of the environment.
82+
workspace : str | uuid.UUID, default=None
83+
The Fabric workspace name or ID.
84+
Defaults to None which resolves to the workspace of the attached lakehouse
85+
or if no lakehouse attached, resolves to the workspace of the notebook.
86+
"""
87+
88+
create_item(
89+
name=name, description=description, type="Eventstream", workspace=workspace
90+
)
91+
92+
93+
@log
94+
def delete_eventstream(
95+
eventstream: str | UUID, workspace: Optional[str | UUID] = None, **kwargs
96+
):
97+
"""
98+
Deletes a Fabric eventstream.
99+
100+
This is a wrapper function for the following API: `Items - Delete Eventstream <https://learn.microsoft.com/rest/api/fabric/environment/items/delete-eventstream>`_.
101+
102+
Parameters
103+
----------
104+
eventstream: str | uuid.UUID
105+
Name or ID of the eventstream.
106+
workspace : str | uuid.UUID, default=None
107+
The Fabric workspace name or ID.
108+
Defaults to None which resolves to the workspace of the attached lakehouse
109+
or if no lakehouse attached, resolves to the workspace of the notebook.
110+
"""
111+
112+
if "name" in kwargs:
113+
eventstream = kwargs["name"]
114+
print(
115+
f"{icons.warning} The 'name' parameter is deprecated. Please use 'eventstream' instead."
116+
)
117+
118+
delete_item(item=eventstream, type="Eventstream", workspace=workspace)
119+
120+
121+
@log
122+
def get_eventstream_definition(
123+
eventstream: str | UUID,
124+
workspace: Optional[str | UUID] = None,
125+
decode: bool = True,
126+
return_dataframe: bool = False,
127+
) -> dict:
128+
129+
workspace_id = resolve_workspace_id(workspace)
130+
item_id = resolve_item_id(item=eventstream, type="Eventstream", workspace=workspace)
131+
132+
result = _base_api(
133+
request=f"/v1/workspaces/{workspace_id}/eventstreams/{item_id}/getDefinition",
134+
method="post",
135+
client="fabric_sp",
136+
status_codes=None,
137+
lro_return_json=True,
138+
)
139+
140+
if decode:
141+
definition = {"definition": {"parts": []}}
142+
143+
for part in result.get("definition", {}).get("parts", []):
144+
path = part.get("path")
145+
payload = json.loads(_decode_b64(part.get("payload")))
146+
definition["definition"]["parts"].append({"path": path, "payload": payload})
147+
else:
148+
definition = result.copy()
149+
150+
if return_dataframe:
151+
df = pd.DataFrame(definition["definition"]["parts"])
152+
df.columns = ["Path", "Payload", "Payload Type"]
153+
return df
154+
else:
155+
return definition
156+
157+
158+
@log
159+
def list_eventstream_destinations(
160+
eventstream: str | UUID, workspace: Optional[str | UUID] = None
161+
) -> pd.DataFrame:
162+
"""
163+
Lists the destinations of the specified eventstream.
164+
165+
Parameters
166+
----------
167+
eventstream : str | uuid.UUID
168+
The name or ID of the eventstream.
169+
workspace : str | uuid.UUID, default=None
170+
The Fabric workspace name or ID.
171+
Defaults to None which resolves to the workspace of the attached lakehouse
172+
or if no lakehouse attached, resolves to the workspace of the notebook.
173+
174+
Returns
175+
-------
176+
pandas.DataFrame
177+
A pandas dataframe showing the destinations of the eventstream.
178+
"""
179+
180+
definition = get_eventstream_definition(
181+
eventstream=eventstream, workspace=workspace
182+
)
183+
184+
columns = {
185+
"Destination Id": "string",
186+
"Destination Name": "string",
187+
"Destination Type": "string",
188+
}
189+
190+
df = _create_dataframe(columns=columns)
191+
192+
rows = []
193+
for part in definition.get("definition").get("parts"):
194+
payload = part.get("payload")
195+
if part.get("path") == "eventstream.json":
196+
destinations = payload.get("destinations")
197+
for d in destinations:
198+
rows.append(
199+
{
200+
"Destination Id": d.get("id"),
201+
"Destination Name": d.get("name"),
202+
"Destination Type": d.get("type"),
203+
}
204+
)
205+
206+
if rows:
207+
df = pd.DataFrame(rows, columns=list(columns.keys()))
208+
209+
return df
210+
211+
212+
@log
213+
def list_eventstream_sources(
214+
eventstream: str | UUID, workspace: Optional[str | UUID] = None
215+
) -> pd.DataFrame:
216+
"""
217+
Lists the destinations of the specified eventstream.
218+
219+
Parameters
220+
----------
221+
eventstream : str | uuid.UUID
222+
The name or ID of the eventstream.
223+
workspace : str | uuid.UUID, default=None
224+
The Fabric workspace name or ID.
225+
Defaults to None which resolves to the workspace of the attached lakehouse
226+
or if no lakehouse attached, resolves to the workspace of the notebook.
227+
228+
Returns
229+
-------
230+
pandas.DataFrame
231+
A pandas dataframe showing the destinations of the eventstream.
232+
"""
233+
234+
definition = get_eventstream_definition(
235+
eventstream=eventstream, workspace=workspace
236+
)
237+
238+
columns = {
239+
"Source Id": "string",
240+
"Source Name": "string",
241+
"Source Type": "string",
242+
}
243+
244+
df = _create_dataframe(columns=columns)
245+
246+
rows = []
247+
for part in definition.get("definition").get("parts"):
248+
payload = part.get("payload")
249+
if part.get("path") == "eventstream.json":
250+
sources = payload.get("sources")
251+
for s in sources:
252+
rows.append(
253+
{
254+
"Source Id": s.get("id"),
255+
"Source Name": s.get("name"),
256+
"Source Type": s.get("type"),
257+
}
258+
)
259+
260+
if rows:
261+
df = pd.DataFrame(rows, columns=list(columns.keys()))
262+
263+
return df

0 commit comments

Comments
 (0)