Skip to content

Commit afa9ab6

Browse files
committed
Add support for triggering nested workflows.
1 parent 7a1b6c9 commit afa9ab6

File tree

12 files changed

+267
-69
lines changed

12 files changed

+267
-69
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ ctrlc2 = { version = "3", features = ["termination", "tokio"] }
99
derive_builder = "0.20"
1010
envy = "0.4"
1111
futures-util = "0.3"
12+
http = "1"
1213
jsonwebtoken = "9"
1314
prost = "0.13"
1415
prost-types = "0.13"

examples/fibonacci.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use hatchet_sdk::{Client, StepBuilder, WorkflowBuilder};
1+
use hatchet_sdk::{Client, Context, StepBuilder, WorkflowBuilder};
22

33
fn fibonacci(n: u32) -> u32 {
44
(1..=n)
@@ -33,7 +33,7 @@ struct Output {
3333
result: u32,
3434
}
3535

36-
async fn execute(Input { n }: Input) -> anyhow::Result<Output> {
36+
async fn execute(_context: Context, Input { n }: Input) -> anyhow::Result<Output> {
3737
Ok(Output {
3838
result: fibonacci(n),
3939
})
@@ -51,7 +51,7 @@ async fn main() -> anyhow::Result<()> {
5151
.init();
5252

5353
let client = Client::new()?;
54-
let mut worker = client.worker("example").build();
54+
let mut worker = client.worker("example_fibonacci").build();
5555
worker.register_workflow(
5656
WorkflowBuilder::default()
5757
.name("fibonacci")

examples/spawn_workflow.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use hatchet_sdk::{Client, Context, StepBuilder, WorkflowBuilder};
2+
3+
async fn execute_hello(
4+
mut context: Context,
5+
_: serde_json::Value,
6+
) -> anyhow::Result<serde_json::Value> {
7+
context
8+
.trigger_workflow(
9+
"world",
10+
serde_json::json!({
11+
"x": 42
12+
}),
13+
)
14+
.await?;
15+
Ok(serde_json::json!({
16+
"message": "Hello"
17+
}))
18+
}
19+
20+
async fn execute_world(
21+
_context: Context,
22+
_: serde_json::Value,
23+
) -> anyhow::Result<serde_json::Value> {
24+
Ok(serde_json::json!({
25+
"message": "World"
26+
}))
27+
}
28+
29+
#[tokio::main]
30+
async fn main() -> anyhow::Result<()> {
31+
dotenv::dotenv().ok();
32+
tracing_subscriber::fmt()
33+
.with_target(false)
34+
.with_env_filter(
35+
tracing_subscriber::EnvFilter::from_default_env()
36+
.add_directive("hatchet_sdk=debug".parse()?),
37+
)
38+
.init();
39+
40+
let client = Client::new()?;
41+
let mut worker = client.worker("example_spawn_workflow").build();
42+
worker.register_workflow(
43+
WorkflowBuilder::default()
44+
.name("hello")
45+
.step(
46+
StepBuilder::default()
47+
.name("execute")
48+
.function(&execute_hello)
49+
.build()?,
50+
)
51+
.build()?,
52+
);
53+
worker.register_workflow(
54+
WorkflowBuilder::default()
55+
.name("world")
56+
.step(
57+
StepBuilder::default()
58+
.name("execute")
59+
.function(&execute_world)
60+
.build()?,
61+
)
62+
.build()?,
63+
);
64+
worker.start().await?;
65+
Ok(())
66+
}

src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct Client {
2525
}
2626

2727
impl Client {
28-
pub fn new() -> crate::Result<Self> {
28+
pub fn new() -> crate::InternalResult<Self> {
2929
let environment = envy::prefixed("HATCHET_CLIENT_").from_env::<Environment>()?;
3030
Ok(Self { environment })
3131
}

src/error.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#[derive(Debug, thiserror::Error)]
2-
pub enum Error {
2+
pub enum InternalError {
33
#[error("failed to load configuration from the environment: {0}")]
44
Environment(#[from] envy::Error),
55
#[error("worker registration request: {0}")]
66
CouldNotRegisterWorker(tonic::Status),
77
#[error("workflow registration request:: {0}")]
88
CouldNotPutWorkflow(tonic::Status),
9+
#[error("workflow schedule request:: {0}")]
10+
CouldNotTriggerWorkflow(tonic::Status),
911
#[error("dispatcher listen error: {0}")]
1012
CouldNotListenToDispatcher(tonic::Status),
1113
#[error("step status send error: {0}")]
@@ -28,4 +30,12 @@ pub enum Error {
2830
CouldNotDecodeActionPayload(serde_json::Error),
2931
}
3032

33+
pub type InternalResult<T> = std::result::Result<T, InternalError>;
34+
35+
#[derive(Debug, thiserror::Error)]
36+
pub enum Error {
37+
#[error("internal error: {0}")]
38+
Internal(#[from] InternalError),
39+
}
40+
3141
pub type Result<T> = std::result::Result<T, Error>;

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
mod client;
22
mod error;
3+
mod step_function;
34
mod worker;
45
mod workflow;
56

67
pub use error::{Error, Result};
8+
pub(crate) use error::{InternalError, InternalResult};
79

810
#[derive(Clone, Copy, Debug, Default, serde::Deserialize)]
911
#[serde(rename_all = "lowercase")]
@@ -15,6 +17,7 @@ enum ClientTlStrategy {
1517
}
1618

1719
pub use client::Client;
20+
pub use step_function::Context;
1821
pub use worker::{Worker, WorkerBuilder};
1922
pub use workflow::{Step, StepBuilder, Workflow, WorkflowBuilder};
2023

src/step_function.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use tracing::info;
2+
3+
use crate::worker::{grpc, ServiceWithAuthorization};
4+
5+
pub struct Context {
6+
spawn_index: u16,
7+
workflow_run_id: String,
8+
workflow_step_run_id: String,
9+
workflow_service_client: grpc::workflow_service_client::WorkflowServiceClient<
10+
tonic::service::interceptor::InterceptedService<
11+
tonic::transport::Channel,
12+
ServiceWithAuthorization,
13+
>,
14+
>,
15+
}
16+
17+
impl Context {
18+
pub(crate) fn new(
19+
workflow_run_id: String,
20+
workflow_step_run_id: String,
21+
workflow_service_client: grpc::workflow_service_client::WorkflowServiceClient<
22+
tonic::service::interceptor::InterceptedService<
23+
tonic::transport::Channel,
24+
ServiceWithAuthorization,
25+
>,
26+
>,
27+
) -> Self {
28+
Self {
29+
spawn_index: 0,
30+
workflow_run_id,
31+
workflow_service_client,
32+
workflow_step_run_id,
33+
}
34+
}
35+
36+
pub async fn trigger_workflow<I: serde::Serialize>(
37+
&mut self,
38+
workflow_name: &str,
39+
input: I,
40+
) -> anyhow::Result<()> {
41+
info!("Scheduling another workflow {workflow_name}");
42+
let response = self
43+
.workflow_service_client
44+
.trigger_workflow(grpc::TriggerWorkflowRequest {
45+
name: workflow_name.to_owned(),
46+
input: serde_json::to_string(&input).expect("must succeed"),
47+
parent_id: Some(self.workflow_run_id.clone()),
48+
parent_step_run_id: Some(self.workflow_step_run_id.clone()),
49+
child_index: Some(self.spawn_index as i32),
50+
child_key: None,
51+
additional_metadata: None, // FIXME: Add support.
52+
desired_worker_id: None, // FIXME: Add support.
53+
priority: Some(1), // FIXME: Add support.
54+
})
55+
.await
56+
.map_err(crate::InternalError::CouldNotTriggerWorkflow)
57+
.map_err(crate::Error::Internal)?
58+
.into_inner();
59+
info!(
60+
"Scheduled another workflow run ID: {}",
61+
response.workflow_run_id
62+
);
63+
self.spawn_index += 1;
64+
Ok(())
65+
}
66+
}
67+
68+
pub(crate) type StepFunction =
69+
dyn Fn(
70+
Context,
71+
serde_json::Value,
72+
) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<serde_json::Value>>;

src/worker/heartbeat.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub(crate) async fn run<F>(
1010
>,
1111
worker_id: &str,
1212
mut interrupt_receiver: tokio::sync::mpsc::Receiver<()>,
13-
) -> crate::Result<()>
13+
) -> crate::InternalResult<()>
1414
where
1515
F: tonic::service::Interceptor + Send + 'static,
1616
{
@@ -24,7 +24,7 @@ where
2424
worker_id: worker_id.clone(),
2525
})
2626
.await
27-
.map_err(crate::Error::CouldNotSendHeartbeat)?;
27+
.map_err(crate::InternalError::CouldNotSendHeartbeat)?;
2828

2929
tokio::select! {
3030
_ = interval.tick() => {
@@ -35,7 +35,7 @@ where
3535
}
3636
}
3737
}
38-
crate::Result::Ok(())
38+
crate::InternalResult::Ok(())
3939
})
4040
.await
4141
.expect("must succeed spawing")?;

0 commit comments

Comments
 (0)