Skip to content

Commit 0144f9a

Browse files
jordanhunt22Convex, Inc.
authored andcommitted
[Http Streaming] Fix error streaming response (#30760)
GitOrigin-RevId: 0b53282c14f0defa0ba80ab0d77f9e83f9bcddb1
1 parent cb98010 commit 0144f9a

File tree

5 files changed

+72
-73
lines changed

5 files changed

+72
-73
lines changed

crates/isolate/src/environment/helpers/validation.rs

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,7 @@ use database::{
3232
use errors::ErrorMetadata;
3333
use keybroker::Identity;
3434
use model::{
35-
backend_state::{
36-
types::BackendState,
37-
BackendStateModel,
38-
DISABLED_ERROR_MESSAGE,
39-
PAUSED_ERROR_MESSAGE,
40-
SUSPENDED_ERROR_MESSAGE,
41-
},
35+
backend_state::BackendStateModel,
4236
components::ComponentsModel,
4337
modules::{
4438
function_validators::ReturnsValidator,
@@ -310,23 +304,13 @@ impl ValidatedPathAndArgs {
310304
};
311305
return Ok(result);
312306
}
313-
let mut backend_state_model = BackendStateModel::new(tx);
314-
let backend_state = backend_state_model.get_backend_state().await?;
315-
match backend_state {
316-
BackendState::Running => {},
317-
BackendState::Paused => {
318-
return Ok(Err(JsError::from_message(PAUSED_ERROR_MESSAGE.to_string())));
319-
},
320-
BackendState::Disabled => {
321-
return Ok(Err(JsError::from_message(
322-
DISABLED_ERROR_MESSAGE.to_string(),
323-
)));
324-
},
325-
BackendState::Suspended => {
326-
return Ok(Err(JsError::from_message(
327-
SUSPENDED_ERROR_MESSAGE.to_string(),
328-
)));
307+
308+
match BackendStateModel::new(tx).fail_while_not_running().await {
309+
Ok(Ok(())) => {},
310+
Ok(Err(e)) => {
311+
return Ok(Err(e));
329312
},
313+
Err(e) => return Err(e),
330314
}
331315

332316
let path = match public_path.clone() {
@@ -565,8 +549,7 @@ impl TryFrom<ValidatedPathAndArgs> for pb::common::ValidatedPathAndArgs {
565549
///
566550
/// This should only be constructed via `ValidatedHttpRoute::try_from` to use
567551
/// the type system to enforce that validation is never skipped.
568-
#[derive(Clone, Eq, PartialEq)]
569-
#[cfg_attr(any(test, feature = "testing"), derive(Debug))]
552+
#[derive(Clone, Eq, PartialEq, Debug)]
570553
pub struct ValidatedHttpPath {
571554
path: ResolvedComponentFunctionPath,
572555
npm_version: Option<Version>,
@@ -600,18 +583,24 @@ impl ValidatedHttpPath {
600583
tx: &mut Transaction<RT>,
601584
udf_path: sync_types::CanonicalizedUdfPath,
602585
npm_version: Option<Version>,
603-
) -> anyhow::Result<Self> {
586+
) -> anyhow::Result<Result<Self, JsError>> {
604587
if !udf_path.is_system() {
605-
BackendStateModel::new(tx).fail_while_not_running().await?;
588+
match BackendStateModel::new(tx).fail_while_not_running().await {
589+
Ok(Ok(())) => {},
590+
Ok(Err(e)) => {
591+
return Ok(Err(e));
592+
},
593+
Err(e) => return Err(e),
594+
}
606595
}
607-
Ok(Self {
596+
Ok(Ok(Self {
608597
path: ResolvedComponentFunctionPath {
609598
component: ComponentId::test_user(),
610599
udf_path,
611600
component_path: Some(ComponentPath::test_user()),
612601
},
613602
npm_version,
614-
})
603+
}))
615604
}
616605

617606
pub async fn new<RT: Runtime>(
@@ -625,7 +614,13 @@ impl ValidatedHttpPath {
625614
path.udf_path,
626615
);
627616
if !path.udf_path.is_system() {
628-
BackendStateModel::new(tx).fail_while_not_running().await?;
617+
match BackendStateModel::new(tx).fail_while_not_running().await {
618+
Ok(Ok(())) => {},
619+
Ok(Err(e)) => {
620+
return Ok(Err(e));
621+
},
622+
Err(e) => return Err(e),
623+
}
629624
}
630625
let (_, component) =
631626
BootstrapComponentsModel::new(tx).must_component_path_to_ids(&path.component)?;

crates/isolate/src/test_helpers.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ use crate::{
173173
validate_schedule_args,
174174
ActionCallbacks,
175175
BackendIsolateWorker,
176-
HttpActionOutcome,
177176
HttpActionResponse,
178177
HttpActionResult,
179178
IsolateClient,
@@ -915,7 +914,7 @@ impl<RT: Runtime, P: Persistence + Clone> UdfTest<RT, P> {
915914
HttpActionResponsePart::Head(head) => response_head = Some(head),
916915
}
917916
}
918-
let response = match outcome.result {
917+
let response = match outcome {
919918
HttpActionResult::Error(e) => Err(e),
920919
HttpActionResult::Streamed => {
921920
let response_head = response_head.unwrap();
@@ -935,17 +934,22 @@ impl<RT: Runtime, P: Persistence + Clone> UdfTest<RT, P> {
935934
http_request: HttpActionRequest,
936935
identity: Identity,
937936
http_response_streamer: HttpActionResponseStreamer,
938-
) -> anyhow::Result<(HttpActionOutcome, LogLines)> {
937+
) -> anyhow::Result<(HttpActionResult, LogLines)> {
939938
let app = Arc::new(self.clone());
940939
let mut tx = self.database.begin(identity.clone()).await?;
941940
let path: UdfPath = udf_path.parse()?;
941+
let validated_path =
942+
match ValidatedHttpPath::new_for_tests(&mut tx, path.canonicalize(), None).await? {
943+
Ok(validated_path) => validated_path,
944+
Err(e) => return Ok((HttpActionResult::Error(e), vec![].into())),
945+
};
942946

943947
let fetch_client = Arc::new(ProxiedFetchClient::new(None, DEV_INSTANCE_NAME.to_owned()));
944948
let (log_line_sender, mut log_line_receiver) = mpsc::unbounded_channel();
945949
let outcome = self
946950
.isolate
947951
.execute_http_action(
948-
ValidatedHttpPath::new_for_tests(&mut tx, path.canonicalize(), None).await?,
952+
validated_path,
949953
RoutedHttpPath(http_request.head.url.path().to_string()),
950954
http_request,
951955
identity,
@@ -961,7 +965,7 @@ impl<RT: Runtime, P: Persistence + Clone> UdfTest<RT, P> {
961965
while let Some(log_line) = log_line_receiver.recv().await {
962966
log_lines.push(log_line);
963967
}
964-
Ok((outcome, log_lines.into()))
968+
Ok((outcome.result, log_lines.into()))
965969
}
966970

967971
pub async fn action(&self, udf_path: &str, args: ConvexObject) -> anyhow::Result<ConvexValue> {

crates/isolate/src/tests/backend_state.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use common::{
33
testing::assert_contains,
44
};
55
use database::Database;
6-
use errors::ErrorMetadata;
76
use keybroker::Identity;
87
use model::backend_state::{
98
types::BackendState,
@@ -23,6 +22,7 @@ use crate::{
2322
http_post_request,
2423
},
2524
HttpActionResponseStreamer,
25+
HttpActionResult,
2626
};
2727

2828
#[convex_macro::test_runtime]
@@ -42,7 +42,7 @@ async fn test_action_while_paused(rt: TestRuntime) -> anyhow::Result<()> {
4242

4343
#[convex_macro::test_runtime]
4444
async fn test_http_action_while_paused(rt: TestRuntime) -> anyhow::Result<()> {
45-
test_http_action_helper(rt, BackendState::Paused).await
45+
test_http_action_helper(rt, BackendState::Paused, PAUSED_ERROR_MESSAGE).await
4646
}
4747

4848
#[convex_macro::test_runtime]
@@ -62,7 +62,7 @@ async fn test_action_while_disabled(rt: TestRuntime) -> anyhow::Result<()> {
6262

6363
#[convex_macro::test_runtime]
6464
async fn test_http_action_while_disabled(rt: TestRuntime) -> anyhow::Result<()> {
65-
test_http_action_helper(rt, BackendState::Disabled).await
65+
test_http_action_helper(rt, BackendState::Disabled, DISABLED_ERROR_MESSAGE).await
6666
}
6767

6868
#[convex_macro::test_runtime]
@@ -82,7 +82,7 @@ async fn test_action_while_suspended(rt: TestRuntime) -> anyhow::Result<()> {
8282

8383
#[convex_macro::test_runtime]
8484
async fn test_http_action_while_suspended(rt: TestRuntime) -> anyhow::Result<()> {
85-
test_http_action_helper(rt, BackendState::Suspended).await
85+
test_http_action_helper(rt, BackendState::Suspended, SUSPENDED_ERROR_MESSAGE).await
8686
}
8787

8888
async fn test_query_helper(
@@ -128,32 +128,25 @@ async fn test_action_helper(
128128
async fn test_http_action_helper(
129129
rt: TestRuntime,
130130
backend_state: BackendState,
131+
error_message: &str,
131132
) -> anyhow::Result<()> {
132133
let t = http_action_udf_test(rt).await?;
133134
toggle_backend_state(&t.database, backend_state.clone()).await?;
134135
let (http_response_sender, _http_response_receiver) = mpsc::unbounded_channel();
135-
let error = t
136+
let result = t
136137
.raw_http_action(
137138
"http_action",
138139
http_post_request("basic", "hi".as_bytes().to_vec()),
139140
Identity::system(),
140141
HttpActionResponseStreamer::new(http_response_sender),
141142
)
142-
.await
143-
.unwrap_err();
144-
assert_error(error, backend_state);
145-
Ok(())
146-
}
147-
148-
fn assert_error(error: anyhow::Error, backend_state: BackendState) {
149-
let error_message = match backend_state {
150-
BackendState::Paused => "NoRunWhilePaused",
151-
BackendState::Disabled => "NoRunWhileDisabled",
152-
BackendState::Suspended => "NoRunWhileSuspended",
153-
BackendState::Running => return,
143+
.await?;
144+
let (HttpActionResult::Error(e), _) = result else {
145+
anyhow::bail!("Expected error, got {:?}", result);
154146
};
155-
let error_metadata = error.downcast_ref::<ErrorMetadata>().unwrap();
156-
assert_eq!(error_metadata.short_msg, error_message);
147+
assert_contains(&e.message, error_message);
148+
149+
Ok(())
157150
}
158151

159152
async fn toggle_backend_state<RT: Runtime>(

crates/isolate/src/tests/http_action.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,19 +198,20 @@ async fn test_http_echo(rt: TestRuntime) -> anyhow::Result<()> {
198198

199199
#[convex_macro::test_runtime]
200200
async fn test_http_scheduler(rt: TestRuntime) -> anyhow::Result<()> {
201-
let t = http_action_udf_test(rt).await?;
201+
let t = http_action_udf_test(rt.clone()).await?;
202202

203203
let (http_response_sender, _http_response_receiver) = mpsc::unbounded_channel();
204-
let (outcome, _log_lines) = t
204+
let (result, _) = t
205205
.raw_http_action(
206206
"http_action",
207207
http_request("schedule"),
208208
Identity::system(),
209209
HttpActionResponseStreamer::new(http_response_sender),
210210
)
211211
.await?;
212+
let http_finish_ts = rt.clone().unix_timestamp();
212213

213-
assert_matches!(outcome.result, HttpActionResult::Streamed);
214+
assert_matches!(result, HttpActionResult::Streamed);
214215

215216
let result = t.query("scheduler:getScheduledJobs", assert_obj!()).await?;
216217
must_let!(let ConvexValue::Array(scheduled_jobs) = result);
@@ -222,7 +223,7 @@ async fn test_http_scheduler(rt: TestRuntime) -> anyhow::Result<()> {
222223

223224
// End time of the HTTP action + 2 seconds, which should be a little after when
224225
// the job was scheduled for
225-
let expected_ts = (outcome.unix_timestamp + Duration::from_secs(2)).as_secs_f64() * 1000.0;
226+
let expected_ts = (http_finish_ts + Duration::from_secs(2)).as_secs_f64() * 1000.0;
226227
assert!((job.scheduled_time - expected_ts).abs() < 500.0);
227228
Ok(())
228229
}

crates/model/src/backend_state/mod.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use common::{
55
ParsedDocument,
66
ResolvedDocument,
77
},
8+
errors::JsError,
89
query::{
910
Order,
1011
Query,
@@ -108,24 +109,29 @@ impl<'a, RT: Runtime> BackendStateModel<'a, RT> {
108109
Ok(backend_state)
109110
}
110111

111-
pub async fn fail_while_not_running(&mut self) -> anyhow::Result<()> {
112+
/// Fails with an error if the backend is not running. We have to return a
113+
/// result of a result of () and a JSError because we use them to
114+
/// differentiate between system and user errors.
115+
pub async fn fail_while_not_running(&mut self) -> anyhow::Result<Result<(), JsError>> {
112116
let backend_state = self.get_backend_state().await?;
113117
match backend_state {
114118
BackendState::Running => {},
115-
BackendState::Paused => anyhow::bail!(ErrorMetadata::bad_request(
116-
"NoRunWhilePaused",
117-
PAUSED_ERROR_MESSAGE,
118-
)),
119-
BackendState::Disabled => anyhow::bail!(ErrorMetadata::bad_request(
120-
"NoRunWhileDisabled",
121-
DISABLED_ERROR_MESSAGE,
122-
)),
123-
BackendState::Suspended => anyhow::bail!(ErrorMetadata::bad_request(
124-
"NoRunWhileSuspended",
125-
SUSPENDED_ERROR_MESSAGE,
126-
)),
127-
};
128-
Ok(())
119+
BackendState::Paused => {
120+
return Ok(Err(JsError::from_message(PAUSED_ERROR_MESSAGE.to_string())));
121+
},
122+
BackendState::Disabled => {
123+
return Ok(Err(JsError::from_message(
124+
DISABLED_ERROR_MESSAGE.to_string(),
125+
)));
126+
},
127+
BackendState::Suspended => {
128+
return Ok(Err(JsError::from_message(
129+
SUSPENDED_ERROR_MESSAGE.to_string(),
130+
)));
131+
},
132+
}
133+
134+
Ok(Ok(()))
129135
}
130136

131137
pub async fn toggle_backend_state(&mut self, new_state: BackendState) -> anyhow::Result<()> {

0 commit comments

Comments
 (0)