Skip to content

ROUTER/DEALER mode may cause deadlock when using the same identity #210

@Remalloc

Description

@Remalloc

change src/router.rs

#[async_trait]
impl SocketSend for RouterSocket {
    async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> {
        assert!(message.len() > 1);
        let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?;
        match self.backend.peers.get_mut(&peer_id) {
            Some(mut peer) => {
                println!("sleep 5"); 
                tokio::time::sleep(std::time::Duration::from_secs(5)).await; // pause for 5 seconds to simulate transmission in progress
                peer.send_queue.send(Message::Message(message)).await?;
                Ok(())
            }
            None => Err(ZmqError::Other("Destination client not found by identity")),
        }
    }
}

server.rs

use bytes::Bytes;
use tokio;
use zeromq::prelude::*;
use zeromq::ZmqMessage;

struct Server {
    router: zeromq::RouterSocket,
    identity: Bytes,
}

impl Server {
    async fn new(url: &str, identity: Bytes) -> Self {
        let mut router = zeromq::RouterSocket::new();
        router.bind(url).await.unwrap();
        Self { router, identity }
    }

    async fn run(&mut self) {
        let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
        let mut cnt = 0;
        loop {
            tokio::select! {
                msg = self.router.recv() => {
                    println!("Server recv {:?}", msg);
                }
                _ = ticker.tick() => {
                    let content = Bytes::from(format!("server-{}",cnt));
                    let msg = ZmqMessage::try_from(vec![self.identity.clone(), content]).unwrap();
                    match self.router.send(msg).await{
                        Ok(_) => {}
                        Err(e) => {
                            println!("Server send error {:?}", e);
                        }
                    }
                    cnt += 1;

                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let url = "tcp://0.0.0.0:5559";
    let identity = Bytes::from("identity");
    let mut server = Server::new(url, identity.clone()).await;

    tokio::spawn(async move {
        server.run().await;
        println!("server exit");
    });
    let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
    loop {
        ticker.tick().await;
        println!("I'm alive");
    }
}

client.py

import os
import time

import zmq


def get_socket():
    context = zmq.Context()
    s = context.socket(zmq.DEALER)
    s.setsockopt_string(zmq.IDENTITY, "identity")
    s.connect("tcp://127.0.0.1:5559")
    return s


def main():
    s = get_socket()
    cnt = 0
    while True:
        msg = f"Client send {cnt}"
        print(msg)
        s.send(msg.encode(), zmq.NOBLOCK)
        if cnt == 10:
            os._exit(0)
        cnt += 1
        time.sleep(1)


if __name__ == "__main__":
    main()

Reproduce steps:

  1. run server.rs and client.py
  2. restart client.py when it shutdown
  3. the server.rs is currently not printing anything due to a deadlock

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions