Skip to content

Commit 40ffcd6

Browse files
committed
Merge branch 'main' into 1.7.latest
2 parents b497fa9 + 984983e commit 40ffcd6

File tree

15 files changed

+551
-89
lines changed

15 files changed

+551
-89
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
## dbt-databricks 1.7.15 (TBD)
2+
3+
## dbt-databricks 1.7.14 (May 1, 2024)
4+
5+
### Fixes
6+
7+
- Auth headers should now evaluate at call time ([648](https://github.com/databricks/dbt-databricks/pull/648))
8+
- User-configurable OAuth Scopes (currently limited to AWS) (thanks @stevenayers!) ([641](https://github.com/databricks/dbt-databricks/pull/641))
9+
10+
### Under the hood
11+
12+
- Reduce default idle limit for connection reuse to 60s and start organizing event logging ([648](https://github.com/databricks/dbt-databricks/pull/648))
13+
114
## dbt-databricks 1.7.13 (April 8, 2024)
215

316
### Features
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version: str = "1.7.13"
1+
version: str = "1.7.14"

dbt/adapters/databricks/connections.py

Lines changed: 86 additions & 82 deletions
Large diffs are not rendered by default.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from abc import ABC
2+
3+
from databricks.sql.exc import Error
4+
5+
6+
class ErrorEvent(ABC):
7+
def __init__(self, exception: Exception, message: str):
8+
self.message = message
9+
self.exception = exception
10+
11+
def __str__(self) -> str:
12+
return f"{self.message}: {self.exception}"
13+
14+
15+
class SQLErrorEvent:
16+
def __init__(self, exception: Exception, message: str):
17+
self.message = message
18+
self.exception = exception
19+
20+
def __str__(self) -> str:
21+
properties = ""
22+
if isinstance(self.exception, Error):
23+
properties = "\nError properties: "
24+
properties += ", ".join(
25+
[f"{key}={value}" for key, value in sorted(self.exception.context.items())]
26+
)
27+
28+
return f"{self.message}: {self.exception}{properties}"
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
from abc import ABC
2+
from typing import Optional
3+
from typing import Tuple
4+
5+
from databricks.sql.client import Connection
6+
from dbt.adapters.databricks.events.base import SQLErrorEvent
7+
from dbt.contracts.graph.nodes import ResultNode
8+
9+
10+
class ConnectionEvent(ABC):
11+
def __init__(self, connection: Connection, message: str):
12+
self.message = message
13+
self.session_id = "Unknown"
14+
if connection:
15+
self.session_id = connection.get_session_id_hex() or "Unknown"
16+
17+
def __str__(self) -> str:
18+
return f"Connection(session-id={self.session_id}) - {self.message}"
19+
20+
21+
class ConnectionCancel(ConnectionEvent):
22+
def __init__(self, connection: Connection):
23+
super().__init__(connection, "Cancelling connection")
24+
25+
26+
class ConnectionClose(ConnectionEvent):
27+
def __init__(self, connection: Connection):
28+
super().__init__(connection, "Closing connection")
29+
30+
31+
class ConnectionCancelError(ConnectionEvent):
32+
def __init__(self, connection: Connection, exception: Exception):
33+
super().__init__(
34+
connection, str(SQLErrorEvent(exception, "Exception while trying to cancel connection"))
35+
)
36+
37+
38+
class ConnectionCloseError(ConnectionEvent):
39+
def __init__(self, connection: Connection, exception: Exception):
40+
super().__init__(
41+
connection, str(SQLErrorEvent(exception, "Exception while trying to close connection"))
42+
)
43+
44+
45+
class ConnectionCreateError(ConnectionEvent):
46+
def __init__(self, connection: Connection, exception: Exception):
47+
super().__init__(
48+
connection, str(SQLErrorEvent(exception, "Exception while trying to create connection"))
49+
)
50+
51+
52+
class ConnectionWrapperEvent(ABC):
53+
def __init__(self, description: str, message: str):
54+
self.message = message
55+
self.description = description
56+
57+
def __str__(self) -> str:
58+
return f"{self.description} - {self.message}"
59+
60+
61+
class ConnectionAcquire(ConnectionWrapperEvent):
62+
def __init__(
63+
self,
64+
description: str,
65+
model: Optional[ResultNode],
66+
compute_name: Optional[str],
67+
thread_identifier: Tuple[int, int],
68+
):
69+
message = f"Acquired connection on thread {thread_identifier}, using "
70+
if not compute_name:
71+
message += "default compute resource"
72+
else:
73+
message += f"compute resource '{compute_name}'"
74+
75+
if model:
76+
# ResultNode *should* have relation_name attr, but we work around a core
77+
# issue by checking.
78+
relation_name = getattr(model, "relation_name", "[Unknown]")
79+
message += f" for model '{relation_name}'"
80+
81+
super().__init__(description, message)
82+
83+
84+
class ConnectionRelease(ConnectionWrapperEvent):
85+
def __init__(self, description: str):
86+
super().__init__(description, "Released connection")
87+
88+
89+
class ConnectionReset(ConnectionWrapperEvent):
90+
def __init__(self, description: str):
91+
super().__init__(description, "Reset connection handle")
92+
93+
94+
class ConnectionReuse(ConnectionWrapperEvent):
95+
def __init__(self, description: str, prior_name: str):
96+
super().__init__(description, f"Reusing connection previously named {prior_name}")
97+
98+
99+
class ConnectionCreate(ConnectionWrapperEvent):
100+
def __init__(self, description: str):
101+
super().__init__(description, "Creating connection")
102+
103+
104+
class ConnectionIdleCheck(ConnectionWrapperEvent):
105+
def __init__(self, description: str):
106+
super().__init__(description, "Checking idleness")
107+
108+
109+
class ConnectionIdleClose(ConnectionWrapperEvent):
110+
def __init__(self, description: str):
111+
super().__init__(description, "Closing for idleness")
112+
113+
114+
class ConnectionRetrieve(ConnectionWrapperEvent):
115+
def __init__(self, description: str):
116+
super().__init__(description, "Retrieving connection")
117+
118+
119+
class ConnectionCreated(ConnectionWrapperEvent):
120+
def __init__(self, description: str):
121+
super().__init__(description, "Connection created")
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from dbt.adapters.databricks.events.base import ErrorEvent
2+
3+
4+
class CredentialLoadError(ErrorEvent):
5+
def __init__(self, exception: Exception):
6+
super().__init__(exception, "Exception while trying to load credentials")
7+
8+
9+
class CredentialSaveError(ErrorEvent):
10+
def __init__(self, exception: Exception):
11+
super().__init__(exception, "Exception while trying to save credentials")
12+
13+
14+
class CredentialShardEvent:
15+
def __init__(self, password_len: int):
16+
self.password_len = password_len
17+
18+
def __str__(self) -> str:
19+
return f"Password is {self.password_len} characters, sharding it"
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from abc import ABC
2+
from uuid import UUID
3+
4+
from databricks.sql.client import Cursor
5+
from dbt.adapters.databricks.events.base import SQLErrorEvent
6+
7+
8+
class CursorEvent(ABC):
9+
def __init__(self, cursor: Cursor, message: str):
10+
self.message = message
11+
self.session_id = "Unknown"
12+
self.command_id = "Unknown"
13+
if cursor:
14+
if cursor.connection:
15+
self.session_id = cursor.connection.get_session_id_hex()
16+
if (
17+
cursor.active_result_set
18+
and cursor.active_result_set.command_id
19+
and cursor.active_result_set.command_id.operationId
20+
):
21+
self.command_id = (
22+
str(UUID(bytes=cursor.active_result_set.command_id.operationId.guid))
23+
or "Unknown"
24+
)
25+
26+
def __str__(self) -> str:
27+
return (
28+
f"Cursor(session-id={self.session_id}, command-id={self.command_id}) - {self.message}"
29+
)
30+
31+
32+
class CursorCloseError(CursorEvent):
33+
def __init__(self, cursor: Cursor, exception: Exception):
34+
super().__init__(
35+
cursor, str(SQLErrorEvent(exception, "Exception while trying to close cursor"))
36+
)
37+
38+
39+
class CursorCancelError(CursorEvent):
40+
def __init__(self, cursor: Cursor, exception: Exception):
41+
super().__init__(
42+
cursor, str(SQLErrorEvent(exception, "Exception while trying to cancel cursor"))
43+
)
44+
45+
46+
class CursorCreate(CursorEvent):
47+
def __init__(self, cursor: Cursor):
48+
super().__init__(cursor, "Created cursor")
49+
50+
51+
class CursorClose(CursorEvent):
52+
def __init__(self, cursor: Cursor):
53+
super().__init__(cursor, "Closing cursor")
54+
55+
56+
class CursorCancel(CursorEvent):
57+
def __init__(self, cursor: Cursor):
58+
super().__init__(cursor, "Cancelling cursor")
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from dbt.adapters.databricks.events.base import SQLErrorEvent
2+
3+
4+
class QueryError(SQLErrorEvent):
5+
def __init__(self, log_sql: str, exception: Exception):
6+
super().__init__(exception, f"Exception while trying to execute query\n{log_sql}\n")
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from abc import ABC
2+
3+
4+
class PipelineEvent(ABC):
5+
def __init__(self, pipeline_id: str, update_id: str, message: str):
6+
self.pipeline_id = pipeline_id
7+
self.update_id = update_id
8+
self.message = message
9+
10+
def __str__(self) -> str:
11+
return (
12+
f"Pipeline(pipeline-id={self.pipeline_id}, update-id={self.update_id}) - {self.message}"
13+
)
14+
15+
16+
class PipelineRefresh(PipelineEvent):
17+
def __init__(self, pipeline_id: str, update_id: str, model_name: str, state: str):
18+
super().__init__(
19+
pipeline_id, update_id, f"Refreshing model {model_name} with state {state}"
20+
)
21+
22+
23+
class PipelineRefreshError(PipelineEvent):
24+
def __init__(self, pipeline_id: str, update_id: str, message: str):
25+
super().__init__(pipeline_id, update_id, f"Error refreshing pipeline: {message}")

docs/oauth.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,33 @@ jaffle_shop:
2222
target: dev
2323
```
2424
25+
## Troubleshooting
2526
27+
DBT expects the OAuth application to have the "All APIs" scope and redirect URL `http://localhost:8020` by default.
28+
29+
If the OAuth application has only been configured with SQL access scopes or a custom redirect URL, you may need to update your profile accordingly:
30+
31+
``` YAML
32+
jaffle_shop:
33+
outputs:
34+
dev:
35+
host: <databricks host name>
36+
http_path: <http path for warehouse or cluster>
37+
catalog: <UC catalog name>
38+
schema: <schema name>
39+
auth_type: oauth # new
40+
client_id: <azure application ID> # only necessary for Azure
41+
oauth_redirect_url: https://example.com
42+
oauth_scopes:
43+
- sql
44+
- offline_access
45+
type: databricks
46+
target: dev
47+
```
48+
49+
You can find these settings in [Account Console](https://accounts.cloud.databricks.com) > [Settings](https://accounts.cloud.databricks.com/settings) > [App Connections](https://accounts.cloud.databricks.com/settings/app-integrations) > dbt adapter for Databricks
50+
51+
52+
53+
If you encounter any issues, please refer to the [OAuth user-to-machine (U2M) authentication guide](https://docs.databricks.com/en/dev-tools/auth/oauth-u2m.html).
2654

0 commit comments

Comments
 (0)