Skip to content

Commit b12197f

Browse files
authored
Merge pull request #66 from golemcloud/fix_worker
Propagate error message and close WS (important for things like worker not found)
2 parents f5225b4 + 99cb4cf commit b12197f

File tree

2 files changed

+55
-23
lines changed

2 files changed

+55
-23
lines changed

Cargo.lock

Lines changed: 35 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

golem-cloud-server-oss/src/api/worker_connect.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::service::worker::WorkerService;
55
use futures_util::{SinkExt, StreamExt};
66
use golem_cloud_server_base::model::WorkerId;
77
use golem_common::model::TemplateId;
8-
use poem::web::websocket::{Message, WebSocket, WebSocketStream};
8+
use poem::web::websocket::{CloseCode, Message, WebSocket, WebSocketStream};
99
use poem::web::Data;
1010
use poem::*;
1111
use tonic::Status;
@@ -29,26 +29,29 @@ pub async fn ws(
2929
) -> Response {
3030
tracing::info!("Connect request: {:?} {:?}", req.uri(), req);
3131

32-
let worker_metadata = match validate_worker_id(req, Data(service)).await {
32+
let worker_id = match get_worker_id(req) {
3333
Ok(worker_id) => worker_id,
3434
Err(err) => return (http::StatusCode::BAD_REQUEST, err.0).into_response(),
3535
};
3636

37-
dbg!("Connecting to the worker_id {}", worker_metadata.clone());
38-
3937
let service = service.clone();
4038

4139
web_socket
4240
.on_upgrade(move |mut socket| async move {
4341
tokio::spawn(async move {
44-
match try_proxy_worker_connection(&service, worker_metadata.worker_id, &mut socket)
45-
.await
46-
{
42+
let result = try_proxy_worker_connection(&service, worker_id, &mut socket).await;
43+
match result {
4744
Ok(()) => {
4845
tracing::info!("Worker connection closed");
4946
}
5047
Err(err) => {
5148
tracing::error!("Error connecting to worker: {}", err);
49+
let close_message = format!("Error connecting to worker: {}", err);
50+
let message = Message::Close(Some((CloseCode::Error, close_message)));
51+
52+
if let Err(e) = socket.send(message).await {
53+
tracing::error!("Failed to send closing frame: {}", e);
54+
}
5255
}
5356
}
5457
})
@@ -120,29 +123,26 @@ impl From<crate::service::worker::WorkerError> for ConnectError {
120123
fn make_worker_id(
121124
template_id: TemplateId,
122125
worker_name: String,
123-
) -> std::result::Result<golem_cloud_server_base::model::WorkerId, ConnectError> {
124-
golem_cloud_server_base::model::WorkerId::new(template_id, worker_name)
126+
) -> std::result::Result<WorkerId, ConnectError> {
127+
WorkerId::new(template_id, worker_name)
125128
.map_err(|error| ConnectError(format!("Invalid worker name: {error}")))
126129
}
127130

128-
async fn validate_worker_id(
129-
req: &Request,
130-
Data(service): Data<&ConnectService>,
131-
) -> Result<golem_cloud_server_base::model::WorkerMetadata, ConnectError> {
131+
fn make_template_id(template_id: String) -> std::result::Result<TemplateId, ConnectError> {
132+
TemplateId::try_from(template_id.as_str())
133+
.map_err(|error| ConnectError(format!("Invalid template id: {error}")))
134+
}
135+
136+
fn get_worker_id(req: &Request) -> Result<golem_cloud_server_base::model::WorkerId, ConnectError> {
132137
let (template_id, worker_name) = req.path_params::<(String, String)>().map_err(|_| {
133138
ConnectError(
134139
"Valid path parameters (template_id and worker_name) are required ".to_string(),
135140
)
136141
})?;
137142

138-
let template_id = TemplateId::try_from(template_id.as_str())
139-
.map_err(|err| ConnectError(format!("Invalid template id: {}", err)))?;
143+
let template_id = make_template_id(template_id)?;
140144

141145
let worker_id = make_worker_id(template_id, worker_name)?;
142146

143-
service
144-
.worker_service
145-
.get_metadata(&worker_id)
146-
.await
147-
.map_err(|err| ConnectError(format!("Invalid worker {}, {}", worker_id, err)))
147+
Ok(worker_id)
148148
}

0 commit comments

Comments
 (0)