Skip to content

Commit fc167de

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Open source log sinks model (#34658)
On the path to open sourcing all the models and unifying the model code once again. GitOrigin-RevId: 65bbb96d6a1ad49649f37816c1cbf1a84647cdf6
1 parent e253741 commit fc167de

File tree

10 files changed

+949
-0
lines changed

10 files changed

+949
-0
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/model/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@ openidconnect = { workspace = true }
3434
pb = { path = "../pb" }
3535
proptest = { workspace = true, optional = true }
3636
proptest-derive = { workspace = true, optional = true }
37+
proptest-http = { workspace = true, optional = true }
3738
rand = { workspace = true }
39+
reqwest = { workspace = true }
3840
runtime = { path = "../runtime" }
3941
saffron = { workspace = true }
4042
search = { path = "../search" }
4143
semver = { workspace = true }
44+
sentry = { workspace = true }
4245
serde = { workspace = true }
4346
serde_bytes = { workspace = true }
4447
serde_json = { workspace = true }
@@ -59,6 +62,7 @@ keybroker = { path = "../keybroker", features = ["testing"] }
5962
metrics = { path = "../metrics", features = ["testing"] }
6063
proptest = { workspace = true }
6164
proptest-derive = { workspace = true }
65+
proptest-http = { workspace = true }
6266
runtime = { path = "../runtime", features = ["testing"] }
6367
search = { path = "../search", features = ["testing"] }
6468
storage = { path = "../storage", features = ["testing"] }
@@ -77,6 +81,7 @@ testing = [
7781
"metrics/testing",
7882
"proptest",
7983
"proptest-derive",
84+
"proptest-http",
8085
"runtime/testing",
8186
"search/testing",
8287
"storage/testing",

crates/model/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ use environment_variables::{
9696
use exports::EXPORTS_BY_STATE_AND_TS_INDEX;
9797
use file_storage::FILE_STORAGE_ID_INDEX;
9898
use keybroker::Identity;
99+
use log_sinks::LogSinksTable;
99100
use maplit::btreeset;
100101
use modules::{
101102
MODULES_TABLE,
@@ -152,6 +153,7 @@ pub mod exports;
152153
pub mod external_packages;
153154
pub mod file_storage;
154155
pub mod fivetran_import;
156+
pub mod log_sinks;
155157
mod metrics;
156158
pub mod migrations;
157159
pub mod modules;
@@ -184,6 +186,7 @@ enum DefaultTableNumber {
184186
CronJobs = 19,
185187
Schemas = 20,
186188
CronJobLogs = 21,
189+
LogSinks = 23,
187190
BackendState = 24,
188191
ExternalPackages = 25,
189192
ScheduledJobs = 27,
@@ -224,6 +227,7 @@ impl From<DefaultTableNumber> for &'static dyn SystemTable {
224227
DefaultTableNumber::CronJobs => &CronJobsTable,
225228
DefaultTableNumber::Schemas => &SchemasTable,
226229
DefaultTableNumber::CronJobLogs => &CronJobLogsTable,
230+
DefaultTableNumber::LogSinks => &LogSinksTable,
227231
DefaultTableNumber::BackendState => &BackendStateTable,
228232
DefaultTableNumber::ExternalPackages => &ExternalPackagesTable,
229233
DefaultTableNumber::ScheduledJobs => &ScheduledJobsTable,
@@ -435,6 +439,7 @@ pub fn app_system_tables() -> Vec<&'static dyn SystemTable> {
435439
&SnapshotImportsTable,
436440
&FunctionHandlesTable,
437441
&CanonicalUrlsTable,
442+
&LogSinksTable,
438443
];
439444
system_tables.extend(component_system_tables());
440445
system_tables

crates/model/src/log_sinks/mod.rs

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
use std::sync::LazyLock;
2+
3+
use common::{
4+
document::{
5+
ParsedDocument,
6+
ResolvedDocument,
7+
},
8+
query::{
9+
Order,
10+
Query,
11+
},
12+
runtime::Runtime,
13+
};
14+
use database::{
15+
patch_value,
16+
ResolvedQuery,
17+
SystemMetadataModel,
18+
Transaction,
19+
};
20+
use errors::ErrorMetadata;
21+
use value::{
22+
ConvexValue,
23+
ResolvedDocumentId,
24+
TableName,
25+
TableNamespace,
26+
};
27+
28+
use crate::{
29+
SystemIndex,
30+
SystemTable,
31+
};
32+
33+
pub mod types;
34+
use types::{
35+
LogSinksRow,
36+
SinkConfig,
37+
SinkState,
38+
SinkType,
39+
LOG_SINKS_LIMIT,
40+
};
41+
42+
pub static LOG_SINKS_TABLE: LazyLock<TableName> = LazyLock::new(|| {
43+
"_log_sinks"
44+
.parse()
45+
.expect("Invalid built-in _log_sinks table")
46+
});
47+
48+
pub struct LogSinksTable;
49+
impl SystemTable for LogSinksTable {
50+
fn table_name(&self) -> &'static TableName {
51+
&LOG_SINKS_TABLE
52+
}
53+
54+
fn indexes(&self) -> Vec<SystemIndex> {
55+
vec![]
56+
}
57+
58+
fn validate_document(&self, document: ResolvedDocument) -> anyhow::Result<()> {
59+
ParsedDocument::<LogSinksRow>::try_from(document).map(|_| ())
60+
}
61+
}
62+
63+
pub struct LogSinksModel<'a, RT: Runtime> {
64+
tx: &'a mut Transaction<RT>,
65+
}
66+
67+
impl<'a, RT: Runtime> LogSinksModel<'a, RT> {
68+
pub fn new(tx: &'a mut Transaction<RT>) -> Self {
69+
Self { tx }
70+
}
71+
72+
pub async fn get_by_provider(
73+
&mut self,
74+
provider: SinkType,
75+
) -> anyhow::Result<Option<ParsedDocument<LogSinksRow>>> {
76+
let mut result: Vec<_> = self
77+
.get_by_provider_including_tombstoned(provider.clone())
78+
.await?
79+
.into_iter()
80+
.filter(|doc| doc.status != SinkState::Tombstoned)
81+
.collect();
82+
anyhow::ensure!(
83+
result.len() <= 1,
84+
"Multiple sinks found of the same type: {:?}",
85+
provider
86+
);
87+
Ok(result.pop())
88+
}
89+
90+
async fn get_by_provider_including_tombstoned(
91+
&mut self,
92+
provider: SinkType,
93+
) -> anyhow::Result<Vec<ParsedDocument<LogSinksRow>>> {
94+
let result: Vec<_> = self
95+
.get_all()
96+
.await?
97+
.into_iter()
98+
.filter(|doc| doc.config.sink_type() == provider)
99+
.collect();
100+
Ok(result)
101+
}
102+
103+
pub async fn get_all(&mut self) -> anyhow::Result<Vec<ParsedDocument<LogSinksRow>>> {
104+
let mut result: Vec<_> = vec![];
105+
106+
let value_query = Query::full_table_scan(LOG_SINKS_TABLE.clone(), Order::Asc);
107+
let mut query_stream = ResolvedQuery::new(self.tx, TableNamespace::Global, value_query)?;
108+
while let Some(doc) = query_stream.next(self.tx, None).await? {
109+
let row: ParsedDocument<LogSinksRow> = doc.try_into()?;
110+
result.push(row);
111+
}
112+
113+
Ok(result)
114+
}
115+
116+
pub async fn patch_status(
117+
&mut self,
118+
id: ResolvedDocumentId,
119+
status: SinkState,
120+
) -> anyhow::Result<()> {
121+
SystemMetadataModel::new_global(self.tx)
122+
.patch(
123+
id,
124+
patch_value!("status" => Some(ConvexValue::Object(status.try_into()?)))?,
125+
)
126+
.await?;
127+
Ok(())
128+
}
129+
130+
pub async fn mark_for_removal(&mut self, id: ResolvedDocumentId) -> anyhow::Result<()> {
131+
self.patch_status(id, SinkState::Tombstoned).await?;
132+
Ok(())
133+
}
134+
135+
pub async fn add_or_update(&mut self, config: SinkConfig) -> anyhow::Result<()> {
136+
let sink_type = config.sink_type();
137+
let row = LogSinksRow {
138+
status: SinkState::Pending,
139+
config,
140+
};
141+
142+
// Filter to non-tombstoned log sinks
143+
let sinks = self
144+
.get_all()
145+
.await?
146+
.into_iter()
147+
.filter(|row| row.status != SinkState::Tombstoned)
148+
.collect::<Vec<_>>();
149+
if sinks.len() >= LOG_SINKS_LIMIT {
150+
return Err(ErrorMetadata::bad_request(
151+
"LogSinkQuotaExceeded",
152+
"Cannot add more LogSinks, the quota for this project has been reached.",
153+
)
154+
.into());
155+
}
156+
157+
if let Some(row) = self.get_by_provider(sink_type.clone()).await? {
158+
self.mark_for_removal(row.id()).await?;
159+
}
160+
161+
SystemMetadataModel::new_global(self.tx)
162+
.insert(&LOG_SINKS_TABLE, row.try_into()?)
163+
.await?;
164+
Ok(())
165+
}
166+
167+
// It's generally not safe to delete an existing sink without marking it
168+
// Tombstoned first since the LogManager will not know to remove the sink.
169+
// However, we can do this during startup before the LogManager has started
170+
// (like when adding a local log sink)
171+
pub async fn add_on_startup(&mut self, config: SinkConfig) -> anyhow::Result<()> {
172+
// Search for matching provider
173+
if let Some(sink) = self.get_by_provider(config.sink_type()).await? {
174+
SystemMetadataModel::new_global(self.tx)
175+
.delete(sink.id())
176+
.await?;
177+
};
178+
self.add_or_update(config).await?;
179+
Ok(())
180+
}
181+
182+
pub async fn clear(&mut self) -> anyhow::Result<()> {
183+
let providers = self.get_all().await?;
184+
185+
for sink in providers {
186+
self.patch_status(sink.id(), SinkState::Tombstoned).await?;
187+
}
188+
Ok(())
189+
}
190+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use std::{
2+
fmt,
3+
str::FromStr,
4+
};
5+
6+
use common::{
7+
log_streaming::LogEventFormatVersion,
8+
pii::PII,
9+
};
10+
use serde::{
11+
Deserialize,
12+
Serialize,
13+
};
14+
15+
#[derive(Debug, Clone, PartialEq)]
16+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
17+
pub struct AxiomConfig {
18+
pub api_key: PII<String>,
19+
pub dataset_name: String,
20+
pub attributes: Vec<AxiomAttribute>,
21+
pub version: LogEventFormatVersion,
22+
}
23+
24+
#[derive(Serialize, Deserialize)]
25+
#[serde(rename_all = "camelCase")]
26+
pub struct SerializedAxiomConfig {
27+
pub api_key: String,
28+
pub dataset_name: String,
29+
pub attributes: Vec<SerializedAxiomAttribute>,
30+
pub version: Option<String>,
31+
}
32+
33+
impl From<AxiomConfig> for SerializedAxiomConfig {
34+
fn from(value: AxiomConfig) -> Self {
35+
Self {
36+
api_key: value.api_key.0,
37+
dataset_name: value.dataset_name,
38+
attributes: value
39+
.attributes
40+
.into_iter()
41+
.map(SerializedAxiomAttribute::from)
42+
.collect(),
43+
version: Some(value.version.to_string()),
44+
}
45+
}
46+
}
47+
48+
impl TryFrom<SerializedAxiomConfig> for AxiomConfig {
49+
type Error = anyhow::Error;
50+
51+
fn try_from(value: SerializedAxiomConfig) -> Result<Self, Self::Error> {
52+
Ok(Self {
53+
api_key: PII(value.api_key),
54+
dataset_name: value.dataset_name,
55+
attributes: value
56+
.attributes
57+
.into_iter()
58+
.map(AxiomAttribute::from)
59+
.collect(),
60+
version: value
61+
.version
62+
.map(|v| LogEventFormatVersion::from_str(v.as_str()))
63+
.transpose()?
64+
.unwrap_or(LogEventFormatVersion::V1),
65+
})
66+
}
67+
}
68+
69+
impl fmt::Display for AxiomConfig {
70+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71+
write!(f, "AxiomConfig {{ version:{:?} ... }}", self.version)
72+
}
73+
}
74+
75+
#[derive(Deserialize, Debug, Clone, PartialEq, Eq)]
76+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
77+
pub struct AxiomAttribute {
78+
pub key: String,
79+
pub value: String,
80+
}
81+
82+
#[derive(Serialize, Deserialize)]
83+
#[serde(rename_all = "camelCase")]
84+
pub struct SerializedAxiomAttribute {
85+
pub key: String,
86+
pub value: String,
87+
}
88+
89+
impl From<AxiomAttribute> for SerializedAxiomAttribute {
90+
fn from(attribute: AxiomAttribute) -> Self {
91+
Self {
92+
key: attribute.key,
93+
value: attribute.value,
94+
}
95+
}
96+
}
97+
98+
impl From<SerializedAxiomAttribute> for AxiomAttribute {
99+
fn from(value: SerializedAxiomAttribute) -> Self {
100+
Self {
101+
key: value.key,
102+
value: value.value,
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)