Skip to content

Commit 860b493

Browse files
authored
Having a lib target for golem-shard-manager so it can be depended on (#267)
1 parent 3b7420e commit 860b493

File tree

4 files changed

+344
-272
lines changed

4 files changed

+344
-272
lines changed

Cargo.lock

Lines changed: 52 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

golem-shard-manager/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ name = "golem-shard-manager"
33
version = "0.1.0"
44
edition = "2021"
55

6+
[lib]
7+
path = "src/lib.rs"
8+
69
[[bin]]
710
name = "golem-shard-manager"
811
path = "src/server.rs"

golem-shard-manager/src/lib.rs

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
// Copyright 2024 Golem Cloud
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod error;
16+
mod healthcheck;
17+
mod http_server;
18+
mod model;
19+
mod persistence;
20+
mod rebalancing;
21+
mod shard_management;
22+
mod shard_manager_config;
23+
mod worker_executor;
24+
25+
use std::env;
26+
use std::net::{Ipv4Addr, SocketAddrV4};
27+
use std::sync::Arc;
28+
29+
use crate::healthcheck::{get_unhealthy_pods, GrpcHealthCheck, HealthCheck};
30+
use error::ShardManagerError;
31+
use golem_api_grpc::proto;
32+
use golem_api_grpc::proto::golem;
33+
use golem_api_grpc::proto::golem::shardmanager::shard_manager_service_server::{
34+
ShardManagerService, ShardManagerServiceServer,
35+
};
36+
use model::{Pod, RoutingTable};
37+
use persistence::{PersistenceService, PersistenceServiceDefault};
38+
use prometheus::{default_registry, Registry};
39+
use shard_management::ShardManagement;
40+
use shard_manager_config::ShardManagerConfig;
41+
use tonic::transport::Server;
42+
use tonic::Response;
43+
use tracing::{debug, info, warn};
44+
use tracing_subscriber::EnvFilter;
45+
use worker_executor::{WorkerExecutorService, WorkerExecutorServiceDefault};
46+
47+
use crate::http_server::HttpServerImpl;
48+
use crate::shard_manager_config::HealthCheckMode;
49+
50+
pub struct ShardManagerServiceImpl {
51+
shard_management: ShardManagement,
52+
shard_manager_config: Arc<ShardManagerConfig>,
53+
health_check: Arc<dyn HealthCheck + Send + Sync>,
54+
}
55+
56+
impl ShardManagerServiceImpl {
57+
async fn new(
58+
persistence_service: Arc<dyn PersistenceService + Send + Sync>,
59+
worker_executor_service: Arc<dyn WorkerExecutorService + Send + Sync>,
60+
shard_manager_config: Arc<ShardManagerConfig>,
61+
health_check: Arc<dyn HealthCheck + Send + Sync>,
62+
) -> Result<ShardManagerServiceImpl, ShardManagerError> {
63+
let shard_management = ShardManagement::new(
64+
persistence_service.clone(),
65+
worker_executor_service,
66+
health_check.clone(),
67+
shard_manager_config.rebalance_threshold,
68+
)
69+
.await?;
70+
71+
let shard_manager_service = ShardManagerServiceImpl {
72+
shard_management,
73+
shard_manager_config,
74+
health_check,
75+
};
76+
77+
info!("Starting health check process...");
78+
shard_manager_service.start_health_check();
79+
info!("Shard Manager is fully operational.");
80+
81+
Ok(shard_manager_service)
82+
}
83+
84+
async fn get_routing_table_internal(&self) -> RoutingTable {
85+
let routing_table = self.shard_management.current_snapshot().await;
86+
info!("Shard Manager providing routing table: {}", routing_table);
87+
routing_table
88+
}
89+
90+
async fn register_internal(
91+
&self,
92+
request: tonic::Request<golem::shardmanager::RegisterRequest>,
93+
) -> Result<(), ShardManagerError> {
94+
let pod = Pod::from_register_request(request)?;
95+
info!("Shard Manager received request to register pod: {}", pod);
96+
self.shard_management.register_pod(pod).await;
97+
Ok(())
98+
}
99+
100+
fn start_health_check(&self) {
101+
let delay = self.shard_manager_config.health_check.delay;
102+
let shard_management = self.shard_management.clone();
103+
let health_check = self.health_check.clone();
104+
105+
tokio::spawn(async move {
106+
loop {
107+
tokio::time::sleep(delay).await;
108+
Self::health_check(shard_management.clone(), health_check.clone()).await
109+
}
110+
});
111+
}
112+
113+
async fn health_check(
114+
shard_management: ShardManagement,
115+
health_check: Arc<dyn HealthCheck + Send + Sync>,
116+
) {
117+
debug!("Shard Manager scheduled to conduct health check");
118+
let routing_table = shard_management.current_snapshot().await;
119+
debug!("Shard Manager checking health of registered pods...");
120+
let failed_pods = get_unhealthy_pods(health_check, &routing_table.get_pods()).await;
121+
if failed_pods.is_empty() {
122+
debug!("All registered pods are healthy")
123+
} else {
124+
warn!(
125+
"The following pods were found to be unhealthy: {:?}",
126+
failed_pods
127+
);
128+
for failed_pod in failed_pods {
129+
shard_management.unregister_pod(failed_pod).await;
130+
}
131+
}
132+
133+
debug!("Golem Shard Manager finished checking health of registered pods");
134+
}
135+
}
136+
137+
#[tonic::async_trait]
138+
impl ShardManagerService for ShardManagerServiceImpl {
139+
async fn get_routing_table(
140+
&self,
141+
_request: tonic::Request<golem::shardmanager::GetRoutingTableRequest>,
142+
) -> Result<tonic::Response<golem::shardmanager::GetRoutingTableResponse>, tonic::Status> {
143+
Ok(Response::new(
144+
golem::shardmanager::GetRoutingTableResponse {
145+
result: Some(
146+
golem::shardmanager::get_routing_table_response::Result::Success(
147+
self.get_routing_table_internal().await.into(),
148+
),
149+
),
150+
},
151+
))
152+
}
153+
154+
async fn register(
155+
&self,
156+
request: tonic::Request<golem::shardmanager::RegisterRequest>,
157+
) -> Result<tonic::Response<golem::shardmanager::RegisterResponse>, tonic::Status> {
158+
match self.register_internal(request).await {
159+
Ok(_) => Ok(Response::new(golem::shardmanager::RegisterResponse {
160+
result: Some(golem::shardmanager::register_response::Result::Success(
161+
golem::shardmanager::RegisterSuccess {
162+
number_of_shards: self.shard_manager_config.number_of_shards as u32,
163+
},
164+
)),
165+
})),
166+
Err(error) => Ok(Response::new(golem::shardmanager::RegisterResponse {
167+
result: Some(golem::shardmanager::register_response::Result::Failure(
168+
error.into(),
169+
)),
170+
})),
171+
}
172+
}
173+
}
174+
175+
pub fn server_main() -> Result<(), Box<dyn std::error::Error>> {
176+
let config = ShardManagerConfig::new();
177+
let registry = default_registry().clone();
178+
179+
if config.enable_json_log {
180+
tracing_subscriber::fmt()
181+
.json()
182+
.flatten_event(true)
183+
// .with_span_events(FmtSpan::FULL) // NOTE: enable to see span events
184+
.with_env_filter(EnvFilter::from_default_env())
185+
.init();
186+
} else {
187+
tracing_subscriber::fmt()
188+
.with_env_filter(EnvFilter::from_default_env())
189+
.with_ansi(true)
190+
.init();
191+
}
192+
193+
// NOTE: to enable tokio-console, comment the lines above and uncomment the lines below,
194+
// and compile with RUSTFLAGS="--cfg tokio_unstable" cargo build
195+
// TODO: make tracing subscription configurable
196+
// console_subscriber::init();
197+
198+
tokio::runtime::Builder::new_multi_thread()
199+
.enable_all()
200+
.build()
201+
.unwrap()
202+
.block_on(async_main(&config, registry))
203+
}
204+
205+
async fn async_main(
206+
shard_manager_config: &ShardManagerConfig,
207+
registry: Registry,
208+
) -> Result<(), Box<dyn std::error::Error>> {
209+
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
210+
health_reporter
211+
.set_serving::<ShardManagerServiceServer<ShardManagerServiceImpl>>()
212+
.await;
213+
214+
let reflection_service = tonic_reflection::server::Builder::configure()
215+
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
216+
.build()
217+
.unwrap();
218+
219+
info!("Golem Shard Manager starting up...");
220+
221+
let _ = HttpServerImpl::new(
222+
SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), shard_manager_config.http_port),
223+
registry,
224+
);
225+
226+
info!("Using Redis at {}", shard_manager_config.redis.url());
227+
let pool = golem_common::redis::RedisPool::configured(&shard_manager_config.redis).await?;
228+
229+
let shard_manager_config = Arc::new(shard_manager_config.clone());
230+
231+
let persistence_service = Arc::new(PersistenceServiceDefault::new(
232+
&pool,
233+
&shard_manager_config.number_of_shards,
234+
));
235+
let worker_executors = Arc::new(WorkerExecutorServiceDefault::new(
236+
shard_manager_config.worker_executors.clone(),
237+
));
238+
239+
let shard_manager_port_str = env::var("GOLEM_SHARD_MANAGER_PORT")?;
240+
info!("The port read from env is {}", shard_manager_port_str);
241+
let shard_manager_port = shard_manager_port_str.parse::<u16>()?;
242+
let shard_manager_addr = format!("0.0.0.0:{}", shard_manager_port);
243+
244+
info!("Listening on port {}", shard_manager_port);
245+
246+
let addr = shard_manager_addr.parse()?;
247+
248+
let health_check: Arc<dyn HealthCheck + Send + Sync> =
249+
match &shard_manager_config.health_check.mode {
250+
HealthCheckMode::Grpc => Arc::new(GrpcHealthCheck::new(
251+
worker_executors.clone(),
252+
shard_manager_config.worker_executors.retries.clone(),
253+
)),
254+
#[cfg(feature = "kubernetes")]
255+
HealthCheckMode::K8s { namespace } => Arc::new(
256+
crate::healthcheck::kubernetes::KubernetesHealthCheck::new(
257+
namespace.clone(),
258+
shard_manager_config.worker_executors.retries.clone(),
259+
)
260+
.await
261+
.expect("Failed to initialize K8s health checker"),
262+
),
263+
};
264+
265+
let shard_manager = ShardManagerServiceImpl::new(
266+
persistence_service,
267+
worker_executors,
268+
shard_manager_config,
269+
health_check,
270+
)
271+
.await?;
272+
273+
let service = ShardManagerServiceServer::new(shard_manager);
274+
275+
// TODO: configurable limits
276+
Server::builder()
277+
.concurrency_limit_per_connection(1024)
278+
.max_concurrent_streams(Some(1024))
279+
.add_service(reflection_service)
280+
.add_service(service)
281+
.add_service(health_service)
282+
.serve(addr)
283+
.await?;
284+
285+
info!("Server started on port {}", shard_manager_port);
286+
287+
Ok(())
288+
}

0 commit comments

Comments
 (0)