Skip to content

Commit 9362637

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Fivetran support destination connector schemaless (#35959)
Support fivetran destination connector schemaless. This PR adds the indexes, but not the hidden tables. Has get_schema return NotFound in schemaless mode. Seems to work with the destination connector tester. If a schema is provided, the old behavior continues. Haven't bothered to clean up all the error messages/suggestions yet. Want to see if we can push this out to play with it, and then loop back to polishing error messages. GitOrigin-RevId: b071cf1ecb5aac96b94721c1f21cc3072f7f2938
1 parent 772e13a commit 9362637

File tree

16 files changed

+568
-403
lines changed

16 files changed

+568
-403
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/application/src/lib.rs

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,10 @@ use common::{
9696
SpawnHandle,
9797
UnixTimestamp,
9898
},
99-
schemas::DatabaseSchema,
99+
schemas::{
100+
DatabaseSchema,
101+
TableDefinition,
102+
},
100103
types::{
101104
env_var_limit_met,
102105
env_var_name_not_unique,
@@ -121,9 +124,12 @@ use common::{
121124
},
122125
RequestId,
123126
};
124-
use convex_fivetran_destination::api_types::{
125-
BatchWriteRow,
126-
DeleteType,
127+
use convex_fivetran_destination::{
128+
api_types::{
129+
BatchWriteRow,
130+
DeleteType,
131+
},
132+
constants::FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR,
127133
};
128134
use cron_jobs::CronJobExecutor;
129135
use database::{
@@ -183,7 +189,10 @@ use keybroker::{
183189
Identity,
184190
KeyBroker,
185191
};
186-
use maplit::btreemap;
192+
use maplit::{
193+
btreemap,
194+
btreeset,
195+
};
187196
use model::{
188197
airbyte_import::{
189198
AirbyteImportModel,
@@ -3373,13 +3382,50 @@ impl<RT: Runtime> Application<RT> {
33733382
&self,
33743383
namespace: TableNamespace,
33753384
identity: &Identity,
3376-
) -> anyhow::Result<JsonValue> {
3385+
) -> anyhow::Result<Option<DatabaseSchema>> {
33773386
let mut tx = self.begin(identity.clone()).await?;
33783387
let mut model = SchemaModel::new(&mut tx, namespace);
3379-
Ok(match model.get_by_state(SchemaState::Active).await? {
3380-
None => JsonValue::Null,
3381-
Some((_id, schema)) => JsonValue::try_from(schema)?,
3382-
})
3388+
Ok(model
3389+
.get_by_state(SchemaState::Active)
3390+
.await?
3391+
.map(|(_id, schema)| schema))
3392+
}
3393+
3394+
pub async fn fivetran_create_table(
3395+
&self,
3396+
identity: &Identity,
3397+
table_definition: TableDefinition,
3398+
) -> anyhow::Result<()> {
3399+
let table_name = table_definition.table_name;
3400+
3401+
// Add the indexes to the table.
3402+
let indexes: BTreeMap<IndexName, IndexedFields> = table_definition
3403+
.indexes
3404+
.into_iter()
3405+
.map(|(descriptor, fields)| {
3406+
let index_name = IndexName::new_reserved(table_name.clone(), descriptor.clone())?;
3407+
let index_fields = fields.fields;
3408+
Ok((index_name, index_fields))
3409+
})
3410+
.collect::<anyhow::Result<_>>()?;
3411+
self._add_system_indexes(identity, indexes).await?;
3412+
3413+
// Wait for the indexes to be ready.
3414+
loop {
3415+
let mut tx = self.begin(identity.clone()).await?;
3416+
if IndexModel::new(&mut tx)
3417+
.indexes_ready(
3418+
&FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR,
3419+
&btreeset! { table_name.clone() },
3420+
)
3421+
.await?
3422+
{
3423+
return Ok(());
3424+
}
3425+
let token = tx.into_token()?;
3426+
let subscription = self.database.subscribe(token).await?;
3427+
subscription.wait_for_invalidation().await;
3428+
}
33833429
}
33843430

33853431
pub async fn shutdown(&self) -> anyhow::Result<()> {

crates/application/src/tests/fivetran_import.rs

Lines changed: 84 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
use chrono::DateTime;
22
use common::{
33
bootstrap_model::index::IndexMetadata,
4-
types::{
5-
IndexDescriptor,
6-
IndexName,
7-
},
4+
types::IndexName,
85
};
9-
use convex_fivetran_destination::api_types::{
10-
BatchWriteOperation,
11-
BatchWriteRow,
12-
DeleteType,
6+
use convex_fivetran_destination::{
7+
api_types::{
8+
BatchWriteOperation,
9+
BatchWriteRow,
10+
DeleteType,
11+
},
12+
constants::{
13+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR,
14+
FIVETRAN_SYNCED_INDEX_DESCRIPTOR,
15+
},
1316
};
1417
use database::{
1518
IndexModel,
@@ -39,11 +42,14 @@ async fn test_create_new_row(rt: TestRuntime) -> anyhow::Result<()> {
3942
let mut tx = application.begin(Identity::system()).await?;
4043
let table: TableName = "users".parse()?;
4144
IndexModel::new(&mut tx)
42-
.add_application_index(
45+
.add_system_index(
4346
TableNamespace::test_user(),
4447
IndexMetadata::new_enabled(
45-
IndexName::new(table.clone(), IndexDescriptor::new("by_primary_key")?)?,
46-
vec![str::parse("id")?, str::parse("_creationTime")?].try_into()?,
48+
IndexName::new_reserved(
49+
table.clone(),
50+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(),
51+
)?,
52+
vec!["id".parse()?].try_into()?,
4753
),
4854
)
4955
.await?;
@@ -81,16 +87,14 @@ async fn test_update_row(rt: TestRuntime) -> anyhow::Result<()> {
8187
let mut tx = application.begin(Identity::system()).await?;
8288
let table: TableName = "posts".parse()?;
8389
IndexModel::new(&mut tx)
84-
.add_application_index(
90+
.add_system_index(
8591
TableNamespace::test_user(),
8692
IndexMetadata::new_enabled(
87-
IndexName::new(table.clone(), IndexDescriptor::new("by_primary_key")?)?,
88-
vec![
89-
str::parse("fivetran.deleted")?,
90-
str::parse("fivetran.id")?,
91-
str::parse("_creationTime")?,
92-
]
93-
.try_into()?,
93+
IndexName::new_reserved(
94+
table.clone(),
95+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(),
96+
)?,
97+
vec!["fivetran.deleted".parse()?, "fivetran.id".parse()?].try_into()?,
9498
),
9599
)
96100
.await?;
@@ -171,16 +175,14 @@ async fn test_soft_delete_row(rt: TestRuntime) -> anyhow::Result<()> {
171175
let mut tx = application.begin(Identity::system()).await?;
172176
let table: TableName = "posts".parse()?;
173177
IndexModel::new(&mut tx)
174-
.add_application_index(
178+
.add_system_index(
175179
TableNamespace::test_user(),
176180
IndexMetadata::new_enabled(
177-
IndexName::new(table.clone(), IndexDescriptor::new("by_primary_key")?)?,
178-
vec![
179-
str::parse("fivetran.deleted")?,
180-
str::parse("fivetran.id")?,
181-
str::parse("_creationTime")?,
182-
]
183-
.try_into()?,
181+
IndexName::new_reserved(
182+
table.clone(),
183+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(),
184+
)?,
185+
vec!["fivetran.deleted".parse()?, "fivetran.id".parse()?].try_into()?,
184186
),
185187
)
186188
.await?;
@@ -262,11 +264,14 @@ async fn test_update_missing_row(rt: TestRuntime) -> anyhow::Result<()> {
262264
let mut tx = application.begin(Identity::system()).await?;
263265
let table: TableName = "posts".parse()?;
264266
IndexModel::new(&mut tx)
265-
.add_application_index(
267+
.add_system_index(
266268
TableNamespace::test_user(),
267269
IndexMetadata::new_enabled(
268-
IndexName::new(table.clone(), IndexDescriptor::new("by_primary_key")?)?,
269-
vec![str::parse("fivetran.id")?, str::parse("_creationTime")?].try_into()?,
270+
IndexName::new_reserved(
271+
table.clone(),
272+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(),
273+
)?,
274+
vec!["fivetran.id".parse()?].try_into()?,
270275
),
271276
)
272277
.await?;
@@ -301,11 +306,14 @@ async fn test_replace_row(rt: TestRuntime) -> anyhow::Result<()> {
301306
let mut tx = application.begin(Identity::system()).await?;
302307
let table: TableName = "posts".parse()?;
303308
IndexModel::new(&mut tx)
304-
.add_application_index(
309+
.add_system_index(
305310
TableNamespace::test_user(),
306311
IndexMetadata::new_enabled(
307-
IndexName::new(table.clone(), IndexDescriptor::new("by_primary_key")?)?,
308-
vec![str::parse("id")?, str::parse("_creationTime")?].try_into()?,
312+
IndexName::new_reserved(
313+
table.clone(),
314+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(),
315+
)?,
316+
vec!["id".parse()?].try_into()?,
309317
),
310318
)
311319
.await?;
@@ -372,11 +380,14 @@ async fn test_hard_delete_row(rt: TestRuntime) -> anyhow::Result<()> {
372380
let mut tx = application.begin(Identity::system()).await?;
373381
let table: TableName = "posts".parse()?;
374382
IndexModel::new(&mut tx)
375-
.add_application_index(
383+
.add_system_index(
376384
TableNamespace::test_user(),
377385
IndexMetadata::new_enabled(
378-
IndexName::new(table.clone(), IndexDescriptor::new("by_primary_key")?)?,
379-
vec![str::parse("id")?, str::parse("_creationTime")?].try_into()?,
386+
IndexName::new_reserved(
387+
table.clone(),
388+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(),
389+
)?,
390+
vec!["id".parse()?].try_into()?,
380391
),
381392
)
382393
.await?;
@@ -426,16 +437,14 @@ async fn test_ignores_soft_deleted_rows(rt: TestRuntime) -> anyhow::Result<()> {
426437
let mut tx = application.begin(Identity::system()).await?;
427438
let table: TableName = "posts".parse()?;
428439
IndexModel::new(&mut tx)
429-
.add_application_index(
440+
.add_system_index(
430441
TableNamespace::test_user(),
431442
IndexMetadata::new_enabled(
432-
IndexName::new(table.clone(), IndexDescriptor::new("by_primary_key")?)?,
433-
vec![
434-
str::parse("fivetran.synced")?,
435-
str::parse("id")?,
436-
str::parse("_creationTime")?,
437-
]
438-
.try_into()?,
443+
IndexName::new_reserved(
444+
table.clone(),
445+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(),
446+
)?,
447+
vec!["fivetran.synced".parse()?, "id".parse()?].try_into()?,
439448
),
440449
)
441450
.await?;
@@ -495,11 +504,14 @@ async fn test_batch_of_operations_taking_more_than_one_transaction(
495504
let mut tx = application.begin(Identity::system()).await?;
496505
let table: TableName = "items".parse()?;
497506
IndexModel::new(&mut tx)
498-
.add_application_index(
507+
.add_system_index(
499508
TableNamespace::test_user(),
500509
IndexMetadata::new_enabled(
501-
IndexName::new(table.clone(), IndexDescriptor::new("by_primary_key")?)?,
502-
vec![str::parse("id")?, str::parse("_creationTime")?].try_into()?,
510+
IndexName::new_reserved(
511+
table.clone(),
512+
FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(),
513+
)?,
514+
vec!["id".parse()?].try_into()?,
503515
),
504516
)
505517
.await?;
@@ -555,14 +567,14 @@ async fn test_soft_truncate_all(rt: TestRuntime) -> anyhow::Result<()> {
555567
let mut tx = application.begin(Identity::system()).await?;
556568
let table: TableName = "table".parse()?;
557569
IndexModel::new(&mut tx)
558-
.add_application_index(
570+
.add_system_index(
559571
TableNamespace::test_user(),
560572
IndexMetadata::new_enabled(
561-
IndexName::new(table.clone(), IndexDescriptor::new("my_sync_index")?)?,
573+
IndexName::new_reserved(table.clone(), FIVETRAN_SYNCED_INDEX_DESCRIPTOR.clone())?,
562574
vec![
563-
str::parse("fivetran.deleted")?,
564-
str::parse("fivetran.synced")?,
565-
str::parse("_creationTime")?,
575+
"fivetran.deleted".parse()?,
576+
"fivetran.synced".parse()?,
577+
"_creationTime".parse()?,
566578
]
567579
.try_into()?,
568580
),
@@ -621,14 +633,14 @@ async fn test_hard_truncate_since_timestamp(rt: TestRuntime) -> anyhow::Result<(
621633
let mut tx = application.begin(Identity::system()).await?;
622634
let table: TableName = "table".parse()?;
623635
IndexModel::new(&mut tx)
624-
.add_application_index(
636+
.add_system_index(
625637
TableNamespace::test_user(),
626638
IndexMetadata::new_enabled(
627-
IndexName::new(table.clone(), IndexDescriptor::new("my_sync_index")?)?,
639+
IndexName::new_reserved(table.clone(), FIVETRAN_SYNCED_INDEX_DESCRIPTOR.clone())?,
628640
vec![
629-
str::parse("fivetran.deleted")?,
630-
str::parse("fivetran.synced")?,
631-
str::parse("_creationTime")?,
641+
"fivetran.deleted".parse()?,
642+
"fivetran.synced".parse()?,
643+
"_creationTime".parse()?,
632644
]
633645
.try_into()?,
634646
),
@@ -680,14 +692,14 @@ async fn test_soft_truncate_since_timestamp(rt: TestRuntime) -> anyhow::Result<(
680692
let mut tx = application.begin(Identity::system()).await?;
681693
let table: TableName = "table".parse()?;
682694
IndexModel::new(&mut tx)
683-
.add_application_index(
695+
.add_system_index(
684696
TableNamespace::test_user(),
685697
IndexMetadata::new_enabled(
686-
IndexName::new(table.clone(), IndexDescriptor::new("my_sync_index")?)?,
698+
IndexName::new_reserved(table.clone(), FIVETRAN_SYNCED_INDEX_DESCRIPTOR.clone())?,
687699
vec![
688-
str::parse("fivetran.deleted")?,
689-
str::parse("fivetran.synced")?,
690-
str::parse("_creationTime")?,
700+
"fivetran.deleted".parse()?,
701+
"fivetran.synced".parse()?,
702+
"_creationTime".parse()?,
691703
]
692704
.try_into()?,
693705
),
@@ -783,14 +795,14 @@ async fn test_soft_truncate_larger_than_one_transaction(rt: TestRuntime) -> anyh
783795
let mut tx = application.begin(Identity::system()).await?;
784796
let table: TableName = "table".parse()?;
785797
IndexModel::new(&mut tx)
786-
.add_application_index(
798+
.add_system_index(
787799
TableNamespace::test_user(),
788800
IndexMetadata::new_enabled(
789-
IndexName::new(table.clone(), IndexDescriptor::new("my_sync_index")?)?,
801+
IndexName::new_reserved(table.clone(), FIVETRAN_SYNCED_INDEX_DESCRIPTOR.clone())?,
790802
vec![
791-
str::parse("fivetran.deleted")?,
792-
str::parse("fivetran.synced")?,
793-
str::parse("_creationTime")?,
803+
"fivetran.deleted".parse()?,
804+
"fivetran.synced".parse()?,
805+
"_creationTime".parse()?,
794806
]
795807
.try_into()?,
796808
),
@@ -846,14 +858,14 @@ async fn test_hard_truncate_larger_than_one_transaction(rt: TestRuntime) -> anyh
846858
let mut tx = application.begin(Identity::system()).await?;
847859
let table: TableName = "table".parse()?;
848860
IndexModel::new(&mut tx)
849-
.add_application_index(
861+
.add_system_index(
850862
TableNamespace::test_user(),
851863
IndexMetadata::new_enabled(
852-
IndexName::new(table.clone(), IndexDescriptor::new("my_sync_index")?)?,
864+
IndexName::new_reserved(table.clone(), FIVETRAN_SYNCED_INDEX_DESCRIPTOR.clone())?,
853865
vec![
854-
str::parse("fivetran.deleted")?,
855-
str::parse("fivetran.synced")?,
856-
str::parse("_creationTime")?,
866+
"fivetran.deleted".parse()?,
867+
"fivetran.synced".parse()?,
868+
"_creationTime".parse()?,
857869
]
858870
.try_into()?,
859871
),

0 commit comments

Comments
 (0)