-
Notifications
You must be signed in to change notification settings - Fork 120
Open
Description
change src/router.rs
#[async_trait]
impl SocketSend for RouterSocket {
async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> {
assert!(message.len() > 1);
let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?;
match self.backend.peers.get_mut(&peer_id) {
Some(mut peer) => {
println!("sleep 5");
tokio::time::sleep(std::time::Duration::from_secs(5)).await; // pause for 5 seconds to simulate transmission in progress
peer.send_queue.send(Message::Message(message)).await?;
Ok(())
}
None => Err(ZmqError::Other("Destination client not found by identity")),
}
}
}server.rs
use bytes::Bytes;
use tokio;
use zeromq::prelude::*;
use zeromq::ZmqMessage;
struct Server {
router: zeromq::RouterSocket,
identity: Bytes,
}
impl Server {
async fn new(url: &str, identity: Bytes) -> Self {
let mut router = zeromq::RouterSocket::new();
router.bind(url).await.unwrap();
Self { router, identity }
}
async fn run(&mut self) {
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
let mut cnt = 0;
loop {
tokio::select! {
msg = self.router.recv() => {
println!("Server recv {:?}", msg);
}
_ = ticker.tick() => {
let content = Bytes::from(format!("server-{}",cnt));
let msg = ZmqMessage::try_from(vec![self.identity.clone(), content]).unwrap();
match self.router.send(msg).await{
Ok(_) => {}
Err(e) => {
println!("Server send error {:?}", e);
}
}
cnt += 1;
}
}
}
}
}
#[tokio::main]
async fn main() {
let url = "tcp://0.0.0.0:5559";
let identity = Bytes::from("identity");
let mut server = Server::new(url, identity.clone()).await;
tokio::spawn(async move {
server.run().await;
println!("server exit");
});
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
ticker.tick().await;
println!("I'm alive");
}
}client.py
import os
import time
import zmq
def get_socket():
context = zmq.Context()
s = context.socket(zmq.DEALER)
s.setsockopt_string(zmq.IDENTITY, "identity")
s.connect("tcp://127.0.0.1:5559")
return s
def main():
s = get_socket()
cnt = 0
while True:
msg = f"Client send {cnt}"
print(msg)
s.send(msg.encode(), zmq.NOBLOCK)
if cnt == 10:
os._exit(0)
cnt += 1
time.sleep(1)
if __name__ == "__main__":
main()Reproduce steps:
- run server.rs and client.py
- restart client.py when it shutdown
- the server.rs is currently not printing anything due to a deadlock
Metadata
Metadata
Assignees
Labels
No labels