Skip to content

Commit 6e9dab2

Browse files
authored
ENG-3200: Centralize OAuth logic to a single repository (#7)
1 parent 2c492d2 commit 6e9dab2

File tree

24 files changed

+163
-167
lines changed

24 files changed

+163
-167
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ actix-governor = "0.5.0"
1313
actix-web = "4.5.1"
1414
actix-web-lab = "0.20.2"
1515
anyhow = "1.0.79"
16+
async-trait = "0.1.80"
1617
chrono = { version = "0.4.33", features = ["serde"] }
1718
dotenvy = "0.15.7"
1819
envconfig = "0.10.0"

src/algebra/mod.rs

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,11 @@
11
mod parameter;
22
mod refresh;
3+
mod storage;
34
mod token;
45
mod trigger;
56

67
pub use parameter::*;
78
pub use refresh::*;
9+
pub use storage::*;
810
pub use token::*;
911
pub use trigger::*;
10-
11-
use chrono::{DateTime, Utc};
12-
use integrationos_domain::{
13-
algebra::{MongoStore, StoreExt},
14-
Connection, Id, IntegrationOSError,
15-
};
16-
use mongodb::bson::doc;
17-
18-
pub async fn get_connections_to_refresh(
19-
collection: &MongoStore<Connection>,
20-
refresh_before: &DateTime<Utc>,
21-
refresh_after: &DateTime<Utc>,
22-
) -> Result<Vec<Connection>, IntegrationOSError> {
23-
collection
24-
.get_many(
25-
Some(doc! {
26-
"oauth.enabled.expires_at": doc! {
27-
"$gt": refresh_before.timestamp(),
28-
"$lte": refresh_after.timestamp(),
29-
},
30-
}),
31-
None,
32-
None,
33-
None,
34-
None,
35-
)
36-
.await
37-
}
38-
39-
pub async fn get_connection_to_trigger(
40-
collection: &MongoStore<Connection>,
41-
id: Id,
42-
) -> Result<Option<Connection>, IntegrationOSError> {
43-
collection
44-
.get_one(doc! {
45-
"_id": id.to_string(),
46-
})
47-
.await
48-
}

src/algebra/parameter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ use std::{
1313
};
1414
use tracing::warn;
1515

16-
pub trait Parameter {
16+
pub trait ParameterExt {
1717
fn headers(&self, computation: Option<&Computation>) -> Result<Option<HeaderMap>, Error>;
1818
fn body(&self, secret: &OAuthSecret) -> Result<Option<Value>, Error>;
1919
fn query(&self, computation: Option<&Computation>) -> Result<Option<Value>, Error>;
2020
}
2121

22-
impl Parameter for ConnectionOAuthDefinition {
22+
impl ParameterExt for ConnectionOAuthDefinition {
2323
fn headers(&self, computation: Option<&Computation>) -> Result<Option<HeaderMap>, Error> {
2424
headers(self, computation)
2525
}

src/algebra/refresh.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use crate::prelude::{
2-
get_connections_to_refresh, Query, Refresh, StatefulActor, Trigger, TriggerActor, Unit,
1+
use crate::{
2+
algebra::{StorageExt, TriggerActor},
3+
domain::{Query, Refresh, StatefulActor, Trigger, Unit},
34
};
45
use actix::prelude::*;
56
use chrono::{Duration, Utc};
@@ -66,9 +67,9 @@ impl Handler<Refresh> for RefreshActor {
6667
let state = self.state.clone();
6768

6869
Box::pin(async move {
69-
let connections =
70-
get_connections_to_refresh(&connections_store, &refresh_before, &refresh_after)
71-
.await?;
70+
let connections = connections_store
71+
.get_by(&refresh_before, &refresh_after)
72+
.await?;
7273

7374
tracing::info!("Found {} connections to refresh", connections.len());
7475

src/algebra/storage.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use async_trait::async_trait;
2+
use chrono::{DateTime, Utc};
3+
use integrationos_domain::{Connection, Id, IntegrationOSError, MongoStore, StoreExt};
4+
use mongodb::bson::doc;
5+
6+
#[async_trait]
7+
pub trait StorageExt {
8+
async fn get_by(
9+
&self,
10+
refresh_before: &DateTime<Utc>,
11+
refresh_after: &DateTime<Utc>,
12+
) -> Result<Vec<Connection>, IntegrationOSError>;
13+
14+
async fn get(&self, id: Id) -> Result<Option<Connection>, IntegrationOSError>;
15+
}
16+
17+
#[async_trait]
18+
impl StorageExt for MongoStore<Connection> {
19+
async fn get_by(
20+
&self,
21+
refresh_before: &DateTime<Utc>,
22+
refresh_after: &DateTime<Utc>,
23+
) -> Result<Vec<Connection>, IntegrationOSError> {
24+
self.get_many(
25+
Some(doc! {
26+
"oauth.enabled.expires_at": doc! {
27+
"$gt": refresh_before.timestamp(),
28+
"$lte": refresh_after.timestamp(),
29+
},
30+
}),
31+
None,
32+
None,
33+
None,
34+
None,
35+
)
36+
.await
37+
}
38+
39+
async fn get(&self, id: Id) -> Result<Option<Connection>, IntegrationOSError> {
40+
self.get_one(doc! {
41+
"_id": id.to_string(),
42+
})
43+
.await
44+
}
45+
}

src/algebra/token.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
use crate::prelude::Config;
1+
use crate::service::Configuration;
22
use chrono::{Duration, Utc};
33
use integrationos_domain::{Claims, IntegrationOSError as Error, InternalError};
44
use jsonwebtoken::{encode, EncodingKey, Header};
55

6-
pub trait TokenGenerator {
7-
fn generate(&self, configuration: Config, expiration: i64) -> Result<String, Error>;
6+
pub trait TokenExt {
7+
fn generate(&self, configuration: Configuration, expiration: i64) -> Result<String, Error>;
88
}
99

1010
#[derive(Debug, Default)]
11-
pub struct JwtTokenGenerator;
11+
pub struct Token;
1212

13-
impl TokenGenerator for JwtTokenGenerator {
14-
fn generate(&self, configuration: Config, expiration: i64) -> Result<String, Error> {
13+
impl TokenExt for Token {
14+
fn generate(&self, configuration: Configuration, expiration: i64) -> Result<String, Error> {
1515
let key = configuration.server().admin_secret();
1616
let key = key.as_bytes();
1717
let key = EncodingKey::from_secret(key);

src/algebra/trigger.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use super::Parameter;
2-
use crate::prelude::{Outcome, Trigger};
1+
use crate::domain::{Outcome, Trigger};
2+
3+
use super::ParameterExt;
34
use actix::prelude::*;
45
use chrono::Duration;
56
use integrationos_domain::{
@@ -256,8 +257,8 @@ impl Handler<Trigger> for TriggerActor {
256257
Outcome::success(id.to_string().as_str(), json!({ "id": id.to_string() }))
257258
}
258259
Err(e) => Outcome::failure(
259-
msg.connection().id.to_string().as_str(),
260-
json!({ "error": e.to_string() }),
260+
e,
261+
json!({ "connectionId": msg.connection().id.to_string() }),
261262
),
262263
}
263264
};

src/domain/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,9 @@ pub use query::*;
99
pub use refresh::*;
1010
pub use state::*;
1111
pub use trigger::*;
12+
13+
use futures::Future;
14+
use std::pin::Pin;
15+
16+
pub type Unit = ();
17+
pub type Task = Pin<Box<dyn Future<Output = Unit> + Send + Sync>>;

src/domain/outcome.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1-
use serde::{Deserialize, Serialize};
1+
use integrationos_domain::IntegrationOSError;
2+
use serde::Serialize;
23
use serde_json::Value;
34

4-
#[derive(Debug, Clone, Serialize, Deserialize)]
5+
#[derive(Debug, Clone, Serialize)]
56
#[serde(rename_all = "camelCase")]
7+
#[serde(tag = "type")]
68
pub enum Outcome {
7-
Success { message: String, metadata: Value },
8-
Failure { message: String, metadata: Value },
9+
Success {
10+
message: String,
11+
metadata: Value,
12+
},
13+
Failure {
14+
error: IntegrationOSError,
15+
metadata: Value,
16+
},
917
}
1018

1119
impl Outcome {
@@ -16,10 +24,7 @@ impl Outcome {
1624
}
1725
}
1826

19-
pub fn failure(message: &str, metadata: Value) -> Self {
20-
Self::Failure {
21-
message: message.to_string(),
22-
metadata,
23-
}
27+
pub fn failure(error: IntegrationOSError, metadata: Value) -> Self {
28+
Self::Failure { error, metadata }
2429
}
2530
}

0 commit comments

Comments
 (0)