Skip to content

Commit 9cb11cb

Browse files
Merge pull request #69 from liquibase/snapshot_support_update
Add Fixes for Snapshotting Databricks Tables
2 parents d9166bc + 979a21e commit 9cb11cb

File tree

16 files changed

+433
-77
lines changed

16 files changed

+433
-77
lines changed

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,21 @@ If hive_metastore is used, this is not tested and may not provide all the below
6767
2. [x] CLUSTER BY (DDL) - createClusteredTable - createTable with clusterColumns as additional option for liquid - <b> SUPPORTED </b> in Contributed Harness
6868
3. [x] ANALYZE TABLE - analyzeTable - change type with compute stats column options - <b> SUPPORTED </b> in Contributed Harness
6969
4. [x] VACUUM - vacuumTable - change type with retentionHours parameter (default is 168) - <b> SUPPORTED </b> in Contributed Harness
70-
5. [ ] ALTER CLUSTER KEY - changeClusterColumns - change type that will be used until index change types are mapped with CLUSTER BY columns for snapshot purposes
70+
5. [ ] ALTER CLUSTER KEY - changeClusterColumns - change type that will be used until index change types are mapped with CLUSTER BY columns for snapshot purposes - TO DO
7171

7272

7373
## Remaining Required Change Types to Finish in Base/Contributed
7474
1. [ ] (nice to have, not required) createFunction/dropFunction - in Liquibase Pro, should work in Databricks, but change type not accessible from Liquibase Core
7575
2. [x] (nice to have, not required) addCheckConstraint/dropCheckConstraint - in Liquibase Pro, should work in Databricks, but change type not accessible from Liquibase Core
7676
3. [ ] addDefaultValue (of various types). Databricks/Delta tables support this, but does not get populated by databricks in the JDBC Driver (COLUMN_DEF property always None even with default)
77-
7877
The remaining other change types are not relevant to Databricks and have been marked with INVALID TEST
7978

8079

80+
## General TO DO:
81+
1. [ ] Add support for Snapshotting complex types like STRUCT/MAP
82+
2. [ ] Add support for snapshotting IDENTITY KEYs
83+
3. [ ] Add TIMESTAMP_NTZ Data Type
84+
8185
## Aspirational Roadmap - Databricks Specific Additional Change Types to Add:
8286

8387
1. COPY INTO
@@ -108,7 +112,10 @@ Then put this driver jar under the liquibase/lib directory.
108112
3. Build this project or retrieve the jar from the latest release.
109113
Then put this extension jar under the liquibase/lib directory.
110114

111-
4. Edit the connection parameters to your Databricks catlaog/database under the liquibase.properties file. The format will look like this:
115+
4. IMPORTANT: If using Linux/MaxOS - run the following command in your terminal before continuing (you can add this to the bash/zsh profile):
116+
export JAVA_OPTS=--add-opens=java.base/java.nio=ALL-UNNAMED
117+
118+
5. Edit the connection parameters to your Databricks catlaog/database under the liquibase.properties file. The format will look like this:
112119

113120
```
114121
url: jdbc:databricks://<workspace_url>:443/default;transportMode=http;ssl=1;httpPath=<http_path>;AuthMech=3;ConnCatalog=<catalog>;ConnSchema=<database>;

lib/DatabricksJDBC42.jar

19.6 MB
Binary file not shown.

liquibase.properties

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
url: jdbc:databricks://<workspace_url>:443/default;transportMode=http;ssl=1;httpPath=sql/<warehouse_id>;AuthMech=3;ConnCatalog=main;ConnSchema=liquibase_harness_test_ds;
2-
username: token
3-
password: <dbx_token>
1+
url= jdbc:databricks://<workspace_url>:443/default;transportMode=http;ssl=1;httpPath=sql/<warehouse_id>;AuthMech=3;ConnCatalog=main;ConnSchema=liquibase_harness_test_ds;
2+
username= token
3+
password= <dbx_token>

pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
<groupId>org.liquibase.ext</groupId>
1212
<artifactId>liquibase-databricks</artifactId>
13-
<version>1.1.0-SNAPSHOT</version>
13+
<version>1.1.0</version>
1414

1515
<name>Liquibase Extension Databricks support</name>
1616
<description>Liquibase Extension for Databricks.</description>
@@ -52,7 +52,7 @@
5252
<dependency>
5353
<groupId>com.databricks</groupId>
5454
<artifactId>databricks-jdbc</artifactId>
55-
<version>2.6.33</version>
55+
<version>2.6.34</version>
5656
<scope>test</scope>
5757
</dependency>
5858
<dependency>
@@ -85,10 +85,10 @@
8585
<version>${liquibase.version}</version>
8686
<configuration>
8787
<propertyFileWillOverride>true</propertyFileWillOverride>
88-
<propertyFile>target/classes/liquibase.properties</propertyFile>
88+
<propertyFile>dev_db.properties</propertyFile>
8989
<logging>DEBUG</logging>
90-
<changeLogFile>target/classes/changelog.sql</changeLogFile>
91-
<outputChangeLogFile>target/classes/generatedChangelog.databricks.sql</outputChangeLogFile>
90+
<changeLogFile>dev_setup.xml</changeLogFile>
91+
<outputChangeLogFile>generatedChangelog.databricks.sql</outputChangeLogFile>
9292
<diffTypes>tables,views</diffTypes>
9393
<diffIncludeCatalog>true</diffIncludeCatalog>
9494
<diffIncludeSchema>true</diffIncludeSchema>

src/main/java/liquibase/ext/databricks/database/DatabricksDatabase.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package liquibase.ext.databricks.database;
22

3+
import liquibase.Scope;
34
import liquibase.database.AbstractJdbcDatabase;
45
import liquibase.database.DatabaseConnection;
56
import liquibase.database.jvm.JdbcConnection;
67
import liquibase.exception.DatabaseException;
78
import liquibase.structure.DatabaseObject;
89
import liquibase.statement.SqlStatement;
910
import liquibase.statement.core.RawCallStatement;
11+
import liquibase.structure.core.Schema;
1012
import liquibase.util.StringUtil;
1113
import java.math.BigInteger;
14+
import java.sql.ResultSet;
1215
import java.util.Arrays;
1316
import java.util.HashSet;
1417
import java.util.Set;
@@ -22,6 +25,8 @@ public class DatabricksDatabase extends AbstractJdbcDatabase {
2225
public static final String PRODUCT_NAME = "databricks";
2326
// Set default catalog - must be unity Catalog Enabled
2427

28+
public String systemSchema = "information_schema";
29+
2530
// This is from the new INFORMATION_SCHEMA() database
2631
private Set<String> systemTablesAndViews = new HashSet<>();
2732

@@ -65,6 +70,12 @@ public Set<String> getSystemViews() {
6570
return systemTablesAndViews;
6671
}
6772

73+
// Static Value
74+
@Override
75+
public String getSystemSchema() {
76+
return this.systemSchema;
77+
}
78+
6879
@Override
6980
public Integer getDefaultPort() {
7081
return 443;
@@ -82,12 +93,14 @@ public boolean isCorrectDatabaseImplementation(DatabaseConnection conn) throws D
8293

8394
@Override
8495
public String getDefaultDriver(String url) {
85-
if (url.startsWith("jdbc:databricks:") || url.startsWith("jdbc:spark:")) {
96+
if (url.startsWith("jdbc:databricks") || url.startsWith("jdbc:spark")) {
97+
8698
return "com.databricks.client.jdbc.Driver";
8799
}
88100
return null;
89101
}
90102

103+
91104
@Override
92105
public boolean supportsInitiallyDeferrableColumns() {
93106
return false;
@@ -174,6 +187,56 @@ protected SqlStatement getConnectionSchemaNameCallStatement() {
174187
return new RawCallStatement("select current_schema()");
175188
}
176189

190+
@Override
191+
protected String getConnectionSchemaName() {
192+
DatabaseConnection connection = getConnection();
193+
194+
if (connection == null) {
195+
return null;
196+
}
197+
try (ResultSet resultSet = ((JdbcConnection) connection).createStatement().executeQuery("SELECT CURRENT_SCHEMA()")) {
198+
resultSet.next();
199+
return resultSet.getString(1);
200+
} catch (Exception e) {
201+
Scope.getCurrentScope().getLog(getClass()).info("Error getting default schema", e);
202+
}
203+
204+
String foundSchema = parseUrlForSchema(connection.getURL());
205+
System.out.println("SCHEMA IDENFIED: "+ foundSchema);
206+
207+
return foundSchema;
208+
}
209+
210+
private String parseUrlForSchema(String url) {
211+
212+
String schemaToken = "ConnSchema=";
213+
214+
int startIndex = url.indexOf(schemaToken);
215+
216+
// If ConnSchema not found, find the default value
217+
if (startIndex == -1) {
218+
219+
return "default";
220+
}
221+
222+
startIndex += schemaToken.length();
223+
int endIndex = url.indexOf(";", startIndex);
224+
225+
if (endIndex == -1) {
226+
return url.substring(startIndex);
227+
}
228+
229+
return url.substring(startIndex, endIndex);
230+
}
231+
232+
@Override
233+
public void setDefaultSchemaName(final String schemaName) {
234+
this.defaultSchemaName = correctObjectName(schemaName, Schema.class);
235+
}
236+
237+
public void setSystemSchema(String systemSchema) {this.systemSchema = systemSchema;}
238+
239+
177240
private Set<String> getDatabricksReservedWords() {
178241

179242
// Get Reserved words from: https://docs.databricks.com/sql/language-manual/sql-ref-reserved-words.html
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package liquibase.ext.databricks.datatype;
2+
3+
import liquibase.datatype.core.BigIntType;
4+
import liquibase.change.core.LoadDataChange;
5+
import liquibase.database.Database;
6+
import liquibase.datatype.DataTypeInfo;
7+
import liquibase.datatype.DatabaseDataType;
8+
import liquibase.datatype.LiquibaseDataType;
9+
import liquibase.ext.databricks.database.DatabricksDatabase;
10+
11+
12+
13+
@DataTypeInfo(name = "bigint", aliases = {"java.sql.Types.BIGINT", "java.math.BigInteger", "java.lang.Long", "integer8", "bigserial", "serial8", "int8"}, minParameters = 0, maxParameters = 0, priority = DatabricksDatabase.PRIORITY_DATABASE)
14+
public class BigintDatatypeDatabricks extends BigIntType {
15+
16+
private boolean autoIncrement;
17+
18+
@Override
19+
public boolean isAutoIncrement() {
20+
return autoIncrement;
21+
}
22+
23+
public void setAutoIncrement(boolean autoIncrement) {
24+
this.autoIncrement = autoIncrement;
25+
}
26+
27+
@Override
28+
public DatabaseDataType toDatabaseDataType(Database database) {
29+
if (database instanceof DatabricksDatabase) {
30+
return new DatabaseDataType("BIGINT");
31+
}
32+
33+
return super.toDatabaseDataType(database);
34+
}
35+
36+
@Override
37+
public boolean supports(Database database) {
38+
return database instanceof DatabricksDatabase;
39+
}
40+
41+
@Override
42+
public int getPriority() {
43+
return PRIORITY_DATABASE;
44+
}
45+
46+
47+
@Override
48+
public LoadDataChange.LOAD_DATA_TYPE getLoadTypeName() {
49+
return LoadDataChange.LOAD_DATA_TYPE.NUMERIC;
50+
}
51+
}

src/main/java/liquibase/ext/databricks/datatype/DatetimeDatatypeDatabricks.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
name = "timestamp",
1414
aliases = {"java.sql.Types.DATETIME", "datetime"},
1515
minParameters = 0,
16-
maxParameters = 2,
16+
maxParameters = 0,
1717
priority = PRIORITY_DATABASE
1818
)
1919
public class DatetimeDatatypeDatabricks extends LiquibaseDataType {
@@ -23,7 +23,7 @@ public class DatetimeDatatypeDatabricks extends LiquibaseDataType {
2323
public DatabaseDataType toDatabaseDataType(Database database) {
2424

2525
if (database instanceof DatabricksDatabase) {
26-
return new DatabaseDataType("TIMESTAMP", getParameters());
26+
return new DatabaseDataType("TIMESTAMP");
2727
}
2828

2929
return super.toDatabaseDataType(database);
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package liquibase.ext.databricks.datatype;
2+
3+
import liquibase.change.core.LoadDataChange;
4+
import liquibase.database.Database;
5+
import liquibase.datatype.DataTypeInfo;
6+
import liquibase.datatype.DatabaseDataType;
7+
import liquibase.datatype.LiquibaseDataType;
8+
import liquibase.ext.databricks.database.DatabricksDatabase;
9+
10+
import static liquibase.ext.databricks.database.DatabricksDatabase.PRIORITY_DATABASE;
11+
12+
13+
@DataTypeInfo(
14+
name = "int",
15+
minParameters = 0,
16+
maxParameters = 0,
17+
priority = PRIORITY_DATABASE
18+
)
19+
public class IntegerDatatypeDatabricks extends LiquibaseDataType {
20+
public IntegerDatatypeDatabricks() {
21+
}
22+
23+
public boolean supports(Database database) {
24+
return database instanceof DatabricksDatabase;
25+
}
26+
27+
public DatabaseDataType toDatabaseDataType(Database database) {
28+
if (database instanceof DatabricksDatabase) {
29+
30+
DatabaseDataType type = new DatabaseDataType("INT", this.getParameters());
31+
type.setType("INT");
32+
return type;
33+
} else {
34+
return super.toDatabaseDataType(database);
35+
}
36+
37+
}
38+
39+
public LoadDataChange.LOAD_DATA_TYPE getLoadTypeName() {
40+
return LoadDataChange.LOAD_DATA_TYPE.NUMERIC;
41+
}
42+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package liquibase.ext.databricks.datatype;
2+
3+
import liquibase.change.core.LoadDataChange;
4+
import liquibase.database.Database;
5+
import liquibase.datatype.DataTypeInfo;
6+
import liquibase.datatype.DatabaseDataType;
7+
import liquibase.datatype.LiquibaseDataType;
8+
import liquibase.ext.databricks.database.DatabricksDatabase;
9+
10+
import static liquibase.ext.databricks.database.DatabricksDatabase.PRIORITY_DATABASE;
11+
12+
13+
@DataTypeInfo(
14+
name = "string",
15+
minParameters = 0,
16+
maxParameters = 0,
17+
priority = PRIORITY_DATABASE
18+
)
19+
public class StringDatatypeDatabricks extends LiquibaseDataType {
20+
public StringDatatypeDatabricks() {
21+
}
22+
23+
public boolean supports(Database database) {
24+
return database instanceof DatabricksDatabase;
25+
}
26+
27+
public DatabaseDataType toDatabaseDataType(Database database) {
28+
if (database instanceof DatabricksDatabase) {
29+
30+
DatabaseDataType type = new DatabaseDataType("STRING");
31+
32+
type.setType("STRING");
33+
34+
return type;
35+
} else {
36+
return super.toDatabaseDataType(database);
37+
}
38+
39+
}
40+
41+
public LoadDataChange.LOAD_DATA_TYPE getLoadTypeName() {
42+
return LoadDataChange.LOAD_DATA_TYPE.STRING;
43+
}
44+
}

0 commit comments

Comments
 (0)