Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE/hybrid_table_materialization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
## Summary

- Add hybrid_table materialization and supporting macros
- Relation config + changeset; adapter describe method
- Tests for basic, incremental, schema changes, constraints
- Remove local-only docs from repo (PR will include usage/summary)

resolves #
[docs](https://github.com/dbt-labs/docs.getdbt.com/issues/new/choose)

<!---
Include the number of the issue addressed by this PR above if applicable.
PRs for code changes without an associated issue *will not be merged*.
See CONTRIBUTING.md for more information.

Include the number of the docs issue that was opened for this PR. If
this change has no user-facing implications, "N/A" suffices instead. New
docs tickets can be created by clicking the link above.
-->

### Problem

<!---
Describe the problem this PR is solving. What is the application state
before this PR is merged?
-->

### Solution

<!---
Describe the way this PR solves the above problem. Add as much detail as you
can to help reviewers understand your changes. Include any alternatives and
tradeoffs you considered.
-->

### Checklist

- [ ] I have read the contributing guide and understand what's expected of me
- [ ] I have run this code in development and it appears to resolve the stated issue
- [ ] This PR includes tests, or tests are not required/relevant for this PR
- [ ] This PR has no interface changes (e.g. macros, cli, logs, json artifacts, config files, adapter interface, etc) or this PR has already received feedback and approval from Product or DX
68 changes: 68 additions & 0 deletions dbt-snowflake/src/dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,71 @@ def describe_dynamic_table(self, relation: SnowflakeRelation) -> Dict[str, Any]:
]
)
}

@available
def describe_hybrid_table(self, relation: SnowflakeRelation) -> Dict[str, Any]:
"""
Get all relevant metadata about a hybrid table to return as a dict to Agate Table row

Args:
relation (SnowflakeRelation): the relation to describe
"""
quoting = relation.quote_policy
schema = f'"{relation.schema}"' if quoting.schema else relation.schema
database = f'"{relation.database}"' if quoting.database else relation.database

# Get hybrid table metadata
show_sql = f"show hybrid tables like '{relation.identifier}' in schema {database}.{schema}"
res, ht_table = self.execute(show_sql, fetch=True)
if res.code != "SUCCESS":
raise DbtRuntimeError(f"Could not get hybrid table metadata: {show_sql} failed")

# Get index information
show_indexes_sql = f"show indexes in {database}.{schema}.{relation.identifier}"
idx_res, idx_table = self.execute(show_indexes_sql, fetch=True)

# Get column information
desc_sql = f"describe table {database}.{schema}.{relation.identifier}"
col_res, col_table = self.execute(desc_sql, fetch=True)

# Get primary key information
show_pk_sql = f"show primary keys in table {database}.{schema}.{relation.identifier}"
pk_res, pk_table = self.execute(show_pk_sql, fetch=True)

# normalize column names to lower case
ht_table = ht_table.rename(column_names=[name.lower() for name in ht_table.column_names])

result = {
"hybrid_table": ht_table.select(
[
"name",
"schema_name",
"database_name",
]
)
}

# Add indexes if available
if idx_res.code == "SUCCESS" and len(idx_table.rows) > 0:
idx_table = idx_table.rename(
column_names=[name.lower() for name in idx_table.column_names]
)
result["indexes"] = idx_table

# Add columns if available
if col_res.code == "SUCCESS" and len(col_table.rows) > 0:
col_table = col_table.rename(
column_names=[name.lower() for name in col_table.column_names]
)
select_cols = [c for c in col_table.column_names if c in ("name", "type")]
result["columns"] = col_table.select(select_cols)

# Add primary key info if available
if pk_res.code == "SUCCESS" and len(pk_table.rows) > 0:
pk_table = pk_table.rename(
column_names=[name.lower() for name in pk_table.column_names]
)
select_pk = [c for c in pk_table.column_names if c == "column_name"]
result["primary_keys"] = pk_table.select(select_pk)

return result
58 changes: 55 additions & 3 deletions dbt-snowflake/src/dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import textwrap
from dataclasses import dataclass, field
from typing import FrozenSet, Optional, Type, Iterator, Tuple
from typing import FrozenSet, Optional, Type, Iterator, Tuple, Dict


from dbt.adapters.base.relation import BaseRelation
Expand All @@ -23,9 +23,15 @@
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeHybridTableConfig,
SnowflakeHybridTableConfigChangeset,
SnowflakeHybridTablePrimaryKeyConfigChange,
SnowflakeHybridTableIndexConfigChange,
SnowflakeHybridTableColumnConfigChange,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase


@dataclass(frozen=True, eq=False, repr=False)
Expand All @@ -34,15 +40,18 @@ class SnowflakeRelation(BaseRelation):
table_format: str = constants.INFO_SCHEMA_TABLE_FORMAT
quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy())
require_alias: bool = False
relation_configs = {
SnowflakeRelationType.DynamicTable: SnowflakeDynamicTableConfig,
# Map materialization names to their RelationConfig classes
relation_configs: Dict[str, Type[SnowflakeRelationConfigBase]] = {
str(SnowflakeRelationType.DynamicTable): SnowflakeDynamicTableConfig,
str(SnowflakeRelationType.HybridTable): SnowflakeHybridTableConfig,
}
renameable_relations: FrozenSet[SnowflakeRelationType] = field(
default_factory=lambda: frozenset(
{
SnowflakeRelationType.Table, # type: ignore
SnowflakeRelationType.View, # type: ignore
SnowflakeRelationType.DynamicTable, # type: ignore
SnowflakeRelationType.HybridTable, # type: ignore
}
)
)
Expand All @@ -51,6 +60,7 @@ class SnowflakeRelation(BaseRelation):
default_factory=lambda: frozenset(
{
SnowflakeRelationType.DynamicTable, # type: ignore
SnowflakeRelationType.HybridTable, # type: ignore
SnowflakeRelationType.Table, # type: ignore
SnowflakeRelationType.View, # type: ignore
}
Expand All @@ -61,6 +71,10 @@ class SnowflakeRelation(BaseRelation):
def is_dynamic_table(self) -> bool:
return self.type == SnowflakeRelationType.DynamicTable

@property
def is_hybrid_table(self) -> bool:
return self.type == SnowflakeRelationType.HybridTable

@property
def is_materialized_view(self) -> bool:
return self.type == SnowflakeRelationType.DynamicTable
Expand All @@ -73,6 +87,10 @@ def is_iceberg_format(self) -> bool:
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)

@classproperty
def HybridTable(cls) -> str:
return str(SnowflakeRelationType.HybridTable)

@classproperty
def get_relation_type(cls) -> Type[SnowflakeRelationType]:
return SnowflakeRelationType
Expand Down Expand Up @@ -126,6 +144,40 @@ def dynamic_table_config_changeset(
return config_change_collection
return None

@classmethod
def hybrid_table_config_changeset(
cls, relation_results: RelationResults, relation_config: RelationConfig
) -> Optional[SnowflakeHybridTableConfigChangeset]:
existing_hybrid_table = SnowflakeHybridTableConfig.from_relation_results(relation_results)
new_hybrid_table = SnowflakeHybridTableConfig.from_relation_config(relation_config)

config_change_collection = SnowflakeHybridTableConfigChangeset()

# Check primary key changes
if new_hybrid_table.primary_key != existing_hybrid_table.primary_key:
config_change_collection.primary_key = SnowflakeHybridTablePrimaryKeyConfigChange(
action=RelationConfigChangeAction.create, # type:ignore
context=new_hybrid_table.primary_key,
)

# Check column changes
if new_hybrid_table.columns != existing_hybrid_table.columns:
config_change_collection.columns = SnowflakeHybridTableColumnConfigChange(
action=RelationConfigChangeAction.create, # type:ignore
context="columns modified",
)

# Check index changes
if new_hybrid_table.indexes != existing_hybrid_table.indexes:
config_change_collection.indexes = SnowflakeHybridTableIndexConfigChange(
action=RelationConfigChangeAction.create, # type:ignore
context="indexes modified",
)

if config_change_collection.has_changes:
return config_change_collection
return None

def as_case_sensitive(self) -> "SnowflakeRelation":
path_part_map = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
)
from dbt.adapters.snowflake.relation_configs.hybrid_table import (
SnowflakeHybridTableConfig,
SnowflakeHybridTableConfigChangeset,
SnowflakeHybridTablePrimaryKeyConfigChange,
SnowflakeHybridTableIndexConfigChange,
SnowflakeHybridTableColumnConfigChange,
)
from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from dataclasses import dataclass
from typing import Optional, Dict, Any, List, TYPE_CHECKING

from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import ComponentName
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase

if TYPE_CHECKING:
import agate


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeHybridTableConfig(SnowflakeRelationConfigBase):
"""
This config follows the specs found here:
https://docs.snowflake.com/en/sql-reference/sql/create-hybrid-table

The following parameters are configurable by dbt:
- name: name of the hybrid table
- schema_name: schema containing the table
- database_name: database containing the table
- columns: dictionary of column definitions with types
- primary_key: list of columns forming the primary key (required)
- indexes: list of secondary index definitions
- unique_key: list of columns with unique constraints
- foreign_keys: list of foreign key constraint definitions
- query: the SQL query for CTAS
"""

name: str
schema_name: str
database_name: str
columns: Dict[str, str] # column_name: data_type
primary_key: List[str]
query: Optional[str] = None
indexes: Optional[List[Dict[str, Any]]] = None
unique_key: Optional[List[str]] = None
foreign_keys: Optional[List[Dict[str, Any]]] = None

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
kwargs_dict = {
"name": cls._render_part(
ComponentName.Identifier, config_dict.get("name") # type:ignore
),
"schema_name": cls._render_part(
ComponentName.Schema, config_dict.get("schema_name") # type:ignore
),
"database_name": cls._render_part(
ComponentName.Database, config_dict.get("database_name") # type:ignore
),
"columns": config_dict.get("columns", {}),
"primary_key": config_dict.get("primary_key", []),
"query": config_dict.get("query"),
"indexes": config_dict.get("indexes"),
"unique_key": config_dict.get("unique_key"),
"foreign_keys": config_dict.get("foreign_keys"),
}

return super().from_dict(kwargs_dict) # type:ignore

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
"""Parse a RelationConfig into a dictionary for SnowflakeHybridTableConfig"""
config_dict = {
"name": relation_config.identifier,
"schema_name": relation_config.schema,
"database_name": relation_config.database,
"query": relation_config.compiled_code,
"columns": relation_config.config.extra.get("columns", {}), # type:ignore
"primary_key": relation_config.config.extra.get("primary_key", []), # type:ignore
"indexes": relation_config.config.extra.get("indexes"), # type:ignore
"unique_key": relation_config.config.extra.get("unique_key"), # type:ignore
"foreign_keys": relation_config.config.extra.get("foreign_keys"), # type:ignore
}

# Handle primary_key as string or list
if isinstance(config_dict["primary_key"], str):
config_dict["primary_key"] = [config_dict["primary_key"]]

# Handle unique_key as string or list
if isinstance(config_dict.get("unique_key"), str):
config_dict["unique_key"] = [config_dict["unique_key"]]

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
"""Parse results from SHOW HYBRID TABLES and SHOW INDEXES"""
hybrid_table: "agate.Row" = relation_results["hybrid_table"].rows[0]

config_dict = {
"name": hybrid_table.get("name"),
"schema_name": hybrid_table.get("schema_name"),
"database_name": hybrid_table.get("database_name"),
"query": hybrid_table.get("text"),
"columns": hybrid_table.get("columns", {}),
"primary_key": hybrid_table.get("primary_key", []),
"indexes": relation_results.get("indexes"),
"unique_key": hybrid_table.get("unique_key"),
"foreign_keys": hybrid_table.get("foreign_keys"),
}

return config_dict


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeHybridTablePrimaryKeyConfigChange(RelationConfigChange):
context: Optional[List[str]] = None

@property
def requires_full_refresh(self) -> bool:
return True


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeHybridTableIndexConfigChange(RelationConfigChange):
context: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
return True


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeHybridTableColumnConfigChange(RelationConfigChange):
context: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
return True


@dataclass
class SnowflakeHybridTableConfigChangeset:
primary_key: Optional[SnowflakeHybridTablePrimaryKeyConfigChange] = None
indexes: Optional[SnowflakeHybridTableIndexConfigChange] = None
columns: Optional[SnowflakeHybridTableColumnConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
return any(
[
self.primary_key.requires_full_refresh if self.primary_key else False,
self.indexes.requires_full_refresh if self.indexes else False,
self.columns.requires_full_refresh if self.columns else False,
]
)

@property
def has_changes(self) -> bool:
return any([self.primary_key, self.indexes, self.columns])
Loading
Loading