diff --git a/src/sempy_labs/_dataflows.py b/src/sempy_labs/_dataflows.py index 21d0b82c..fae8d6bd 100644 --- a/src/sempy_labs/_dataflows.py +++ b/src/sempy_labs/_dataflows.py @@ -132,3 +132,58 @@ def list_dataflow_storage_accounts() -> pd.DataFrame: df["Enabled"] = df["Enabled"].astype(bool) return df + + +def list_upstream_dataflows(dataflow: str, workspace: Optional[str] = None, df: Optional[pd.DataFrame] = None) -> pd.DataFrame: + """ + Shows a list of upstream dataflows for the specified dataflow. + + Parameters + ---------- + dataflow : str + The dataflow name. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + df : pandas.DataFrame, default=None + The DataFrame to append the results to. Defaults to None which creates a new DataFrame. + + Returns + ------- + pandas.DataFrame + A pandas dataframe showing the upstream dataflows which exist. + """ + + workspace, workspace_id = resolve_workspace_name_and_id(workspace) + dataflow, dataflow_id = resolve_dataflow_name_and_id(dataflow, workspace) + + client = fabric.PowerBIRestClient() + response = client.get(f"/v1.0/myorg/groups/{workspace_id}/dataflows/{dataflow_id}/upstreamDataflows") + response.raise_for_status() + + if df is None: + df = pd.DataFrame(columns=["Dataflow Name", "Dataflow Id", "Workspace Name", "Workspace Id", "Upstream Dataflow Name", "Upstream Dataflow Id", "Upstream Workspace Name", "Upstream Workspace Id"]) + + for v in response.json().get("value", []): + + target_dataflow, target_dataflow_id = resolve_dataflow_name_and_id(v.get("targetDataflowId"), v.get("groupId")) + target_workspace, target_workspace_id = resolve_workspace_name_and_id(v.get("groupId")) + + new_data = { + "Dataflow Name": dataflow, + "Dataflow Id": dataflow_id, + "Workspace Name": workspace, + "Workspace Id": workspace_id, + "Upstream Dataflow Name": target_dataflow, + "Upstream Dataflow Id": target_dataflow_id, + "Upstream Workspace Name": target_workspace, + "Upstream Workspace Id": target_workspace_id, + } + + df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) + + # Recursively call the function for each target dataflow + df = list_upstream_dataflows(v.get("targetDataflowId"), v.get("groupId"), df) + + return df