Skip to content

Commit 07e3e1d

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Open source streaming import endpoints. (#34584)
Allows streaming import from airbyte (and soon fivetran) to work. GitOrigin-RevId: 6e645b238700ab8d10b1c55b630836d17e33463c
1 parent 08cb7a3 commit 07e3e1d

File tree

16 files changed

+2875
-17
lines changed

16 files changed

+2875
-17
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/application/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ async_zip = { workspace = true }
1717
async_zip_reader = { version = "0.1.0", path = "../async_zip_reader" }
1818
authentication = { path = "../../crates/authentication" }
1919
bytes = { workspace = true }
20+
chrono = { workspace = true }
2021
cmd_util = { path = "../cmd_util" }
2122
common = { path = "../common" }
23+
convex_fivetran_destination = { path = "../fivetran_destination" }
2224
convex_macro = { path = "../convex_macro" }
2325
csv-async = { workspace = true }
2426
database = { path = "../database" }
Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
use std::{
2+
collections::BTreeMap,
3+
sync::LazyLock,
4+
};
5+
6+
use common::bootstrap_model::index::database_index::IndexedFields;
7+
use convex_fivetran_destination::constants::{
8+
METADATA_CONVEX_FIELD_NAME,
9+
SOFT_DELETE_CONVEX_FIELD_NAME,
10+
};
11+
use errors::ErrorMetadata;
12+
use serde::Deserialize;
13+
use serde_json::Value as JsonValue;
14+
use value::{
15+
ConvexObject,
16+
ConvexValue,
17+
FieldName,
18+
FieldPath,
19+
IdentifierFieldName,
20+
TableName,
21+
};
22+
23+
use crate::valid_identifier::{
24+
prefix_field,
25+
ValidIdentifier,
26+
IDENTIFIER_PREFIX,
27+
};
28+
29+
pub const DUPLICATE_FIELD_LIMIT: usize = 3;
30+
31+
/// Field name for CDC deletes. See Airbyte docs: https://docs.airbyte.com/understanding-airbyte/cdc#syncing
32+
/// When this field is present, it represents a deleted record.
33+
static CDC_DELETED_FIELD: LazyLock<FieldName> = LazyLock::new(|| {
34+
format!("{IDENTIFIER_PREFIX}ab_cdc_deleted_at")
35+
.parse()
36+
.unwrap()
37+
});
38+
39+
/// Airbyte fields that are related to CDC are prefixed with `_ab_cdc`
40+
static CDC_PREFIX: LazyLock<String> = LazyLock::new(|| format!("{IDENTIFIER_PREFIX}ab_cdc"));
41+
42+
#[derive(Clone, Debug, PartialEq)]
43+
pub struct AirbyteRecord {
44+
table_name: TableName,
45+
deleted: bool,
46+
record: ConvexObject,
47+
}
48+
49+
impl AirbyteRecord {
50+
#[cfg(any(test, feature = "testing"))]
51+
pub fn new(table_name: TableName, deleted: bool, record: ConvexObject) -> Self {
52+
Self {
53+
table_name,
54+
deleted,
55+
record,
56+
}
57+
}
58+
59+
pub fn table_name(&self) -> &TableName {
60+
&self.table_name
61+
}
62+
63+
pub fn deleted(&self) -> bool {
64+
self.deleted
65+
}
66+
67+
pub fn into_object(self) -> ConvexObject {
68+
self.record
69+
}
70+
}
71+
72+
#[derive(Clone, Deserialize)]
73+
#[serde(rename_all = "camelCase")]
74+
/// Message interface for Airbyte streaming import records. Do not modify
75+
/// without considering backwards compatibility.
76+
pub struct AirbyteRecordMessage {
77+
table_name: String,
78+
data: JsonValue,
79+
}
80+
81+
/// Change field names in a JSON object to be valid identifiers
82+
fn valid_json(v: JsonValue) -> anyhow::Result<JsonValue> {
83+
let r = match v {
84+
JsonValue::Null | JsonValue::Bool(_) | JsonValue::Number(_) | JsonValue::String(_) => v,
85+
JsonValue::Array(arr) => arr
86+
.into_iter()
87+
.map(valid_json)
88+
.collect::<anyhow::Result<_>>()?,
89+
JsonValue::Object(map) => {
90+
let map_clone = map.clone();
91+
let map = map
92+
.into_iter()
93+
.map(|(mut field, value)| {
94+
let valid_identifier = field.parse::<ValidIdentifier<FieldName>>()?;
95+
let new_field = valid_identifier.0.to_string();
96+
let mut modified = new_field != field;
97+
field = new_field;
98+
for _ in 0..DUPLICATE_FIELD_LIMIT {
99+
if modified != map_clone.get(&field).is_some() {
100+
return Ok((field, valid_json(value)?));
101+
}
102+
field = prefix_field(&field);
103+
modified = true;
104+
}
105+
Err(anyhow::anyhow!(
106+
"Too many duplicate field names found for modified field {field}"
107+
))
108+
})
109+
.collect::<anyhow::Result<_>>()?;
110+
JsonValue::Object(map)
111+
},
112+
};
113+
Ok(r)
114+
}
115+
116+
impl TryFrom<AirbyteRecordMessage> for AirbyteRecord {
117+
type Error = anyhow::Error;
118+
119+
fn try_from(msg: AirbyteRecordMessage) -> anyhow::Result<AirbyteRecord> {
120+
let table_name = msg.table_name.parse::<ValidIdentifier<TableName>>()?.0;
121+
let object: ConvexObject = valid_json(msg.data)?.try_into()?;
122+
let deleted = match object.get(&*CDC_DELETED_FIELD) {
123+
Some(ts) => ts != &ConvexValue::Null,
124+
None => false,
125+
};
126+
// Filter out CDC prefixed fields because they should not be exposed to
127+
// developers and collide with system field space (fields prefixed with
128+
// underscore are system fields in Convex).
129+
let fields_and_values: BTreeMap<FieldName, ConvexValue> = object
130+
.into_iter()
131+
.filter(|(field_name, _value)| !field_name.starts_with(&CDC_PREFIX.clone()))
132+
.collect();
133+
let record: ConvexObject = fields_and_values.try_into()?;
134+
Ok(Self {
135+
table_name,
136+
deleted,
137+
record,
138+
})
139+
}
140+
}
141+
#[derive(Deserialize)]
142+
#[serde(rename_all = "camelCase")]
143+
pub struct AirbyteStream {
144+
primary_key: Option<Vec<Vec<String>>>,
145+
#[expect(dead_code)]
146+
json_schema: JsonValue,
147+
}
148+
149+
#[derive(Clone, Debug)]
150+
pub struct PrimaryKey(IndexedFields);
151+
152+
impl TryFrom<Vec<Vec<String>>> for PrimaryKey {
153+
type Error = anyhow::Error;
154+
155+
fn try_from(v: Vec<Vec<String>>) -> anyhow::Result<PrimaryKey> {
156+
let field_paths = v
157+
.into_iter()
158+
.map(|fields| {
159+
let field_names = fields
160+
.into_iter()
161+
.map(|f| f.parse::<IdentifierFieldName>())
162+
.collect::<anyhow::Result<_>>()?;
163+
let field_path = FieldPath::new(field_names)?;
164+
Ok(field_path)
165+
})
166+
.collect::<anyhow::Result<Vec<FieldPath>>>()?;
167+
let index_fields = field_paths.try_into()?;
168+
Ok(PrimaryKey(index_fields))
169+
}
170+
}
171+
172+
impl PrimaryKey {
173+
pub fn into_indexed_fields(self) -> IndexedFields {
174+
self.0
175+
}
176+
}
177+
178+
#[derive(Debug)]
179+
pub enum ValidatedAirbyteStream {
180+
Append,
181+
Dedup(PrimaryKey),
182+
}
183+
184+
impl TryFrom<AirbyteStream> for ValidatedAirbyteStream {
185+
type Error = anyhow::Error;
186+
187+
fn try_from(
188+
AirbyteStream {
189+
primary_key,
190+
json_schema: _,
191+
}: AirbyteStream,
192+
) -> anyhow::Result<Self> {
193+
// TODO(emma): Validate schema
194+
match primary_key {
195+
None => Ok(ValidatedAirbyteStream::Append),
196+
Some(p) => {
197+
anyhow::ensure!(
198+
!p.is_empty(),
199+
ErrorMetadata::bad_request("EmptyPrimaryKey", "Primary keys cannot be empty")
200+
);
201+
Ok(ValidatedAirbyteStream::Dedup(p.try_into()?))
202+
},
203+
}
204+
}
205+
}
206+
207+
pub fn mark_as_soft_deleted(object: ConvexObject) -> anyhow::Result<ConvexObject> {
208+
let metadata_key = FieldName::from(METADATA_CONVEX_FIELD_NAME.clone());
209+
210+
let mut new_value: BTreeMap<FieldName, ConvexValue> = object.into();
211+
let metadata_object = match new_value.remove(&metadata_key) {
212+
Some(ConvexValue::Object(object)) => object,
213+
_ => ConvexObject::empty(),
214+
};
215+
216+
new_value.insert(
217+
metadata_key,
218+
ConvexValue::Object(metadata_object.shallow_merge(ConvexObject::for_value(
219+
FieldName::from(SOFT_DELETE_CONVEX_FIELD_NAME.clone()),
220+
ConvexValue::Boolean(true),
221+
)?)?),
222+
);
223+
new_value.try_into()
224+
}
225+
226+
#[cfg(test)]
227+
mod tests {
228+
use serde_json::json;
229+
use value::{
230+
assert_obj,
231+
FieldName,
232+
TableName,
233+
};
234+
235+
use super::{
236+
valid_json,
237+
AirbyteRecord,
238+
AirbyteRecordMessage,
239+
};
240+
use crate::{
241+
airbyte_import::CDC_DELETED_FIELD,
242+
valid_identifier::ValidIdentifier,
243+
};
244+
245+
#[test]
246+
fn test_valid_identifier() -> anyhow::Result<()> {
247+
let bad_identifier = "_id";
248+
let valid_identifier = bad_identifier.parse::<ValidIdentifier<FieldName>>()?;
249+
let expected_identifier: FieldName = "source_id".parse()?;
250+
assert_eq!(expected_identifier, valid_identifier.0);
251+
252+
let bad_identifier = "*name!of.table";
253+
let valid_identifier = bad_identifier.parse::<ValidIdentifier<TableName>>()?;
254+
let expected_identifier: TableName = "source_name_of_table".parse()?;
255+
assert_eq!(expected_identifier, valid_identifier.0);
256+
257+
let good_identifier = "ok_table";
258+
let valid_identifier = good_identifier.parse::<ValidIdentifier<TableName>>()?;
259+
let expected_identifier: TableName = "ok_table".parse()?;
260+
assert_eq!(expected_identifier, valid_identifier.0);
261+
262+
let bad_identifier = "_system_table";
263+
let valid_identifier = bad_identifier.parse::<ValidIdentifier<TableName>>()?;
264+
let expected_identifier: TableName = "source_system_table".parse()?;
265+
assert_eq!(expected_identifier, valid_identifier.0);
266+
Ok(())
267+
}
268+
269+
#[test]
270+
fn test_valid_object() -> anyhow::Result<()> {
271+
let data = json!({"_bad_field_name": "hello", "good_field_name": "goodbye"});
272+
let valid_data = valid_json(data)?;
273+
assert_eq!(
274+
json!({"source_bad_field_name": "hello", "good_field_name": "goodbye"}),
275+
valid_data
276+
);
277+
278+
// Nested object case
279+
let data = json!({"good_field_name": {"_bad_field_name": "hello"}});
280+
let valid_data = valid_json(data)?;
281+
assert_eq!(
282+
json!({"good_field_name": {"source_bad_field_name": "hello"}}),
283+
valid_data
284+
);
285+
286+
// Nested array case
287+
let data = json!({"good_field_name": [{"_bad_field_name": "hello"}]});
288+
let valid_data = valid_json(data)?;
289+
assert_eq!(
290+
json!({"good_field_name": [{"source_bad_field_name": "hello"}]}),
291+
valid_data
292+
);
293+
294+
// Edge case: prefixed field collides with existing field
295+
let data = json!({"source_id": 1, "_id": 2});
296+
let valid_data = valid_json(data)?;
297+
assert_eq!(json!({"source_id": 1, "source_source_id": 2}), valid_data);
298+
299+
// Duplicate limit reached
300+
let data =
301+
json!({"_id": 0, "source_id": 1, "source_source_id": 2, "source_source_source_id": 3 });
302+
assert!(valid_json(data).is_err());
303+
Ok(())
304+
}
305+
306+
#[test]
307+
fn test_record_validation() -> anyhow::Result<()> {
308+
let table_name = "stream_table".to_string();
309+
let airbyte_record_message = AirbyteRecordMessage {
310+
table_name: table_name.clone(),
311+
data: json!({"field": "value"}),
312+
};
313+
let expected_record = AirbyteRecord {
314+
table_name: table_name.parse()?,
315+
deleted: false,
316+
record: assert_obj!("field" => "value"),
317+
};
318+
assert_eq!(
319+
AirbyteRecord::try_from(airbyte_record_message)?,
320+
expected_record
321+
);
322+
323+
// CDC fields
324+
let table_name = "stream_table".to_string();
325+
let airbyte_record_message = AirbyteRecordMessage {
326+
table_name: table_name.clone(),
327+
data: json!({"field": "value", "_ab_cdc_lsn": "lsn", CDC_DELETED_FIELD.clone(): "timestamp"}),
328+
};
329+
let expected_record = AirbyteRecord {
330+
table_name: table_name.parse()?,
331+
deleted: true,
332+
record: assert_obj!("field" => "value"),
333+
};
334+
assert_eq!(
335+
AirbyteRecord::try_from(airbyte_record_message)?,
336+
expected_record
337+
);
338+
Ok(())
339+
}
340+
}

0 commit comments

Comments
 (0)