Skip to content

Deno Pub and Sub with router not working #336

@andreclerigo

Description

@andreclerigo

Using the main branch from zenoh-ts.

zenoh.json5

{
  mode: "router",

  plugins_loading: {
    enabled: true,
    search_dirs: ["./target/debug", "~/.zenoh/lib"]
  },

  plugins: {
    remote_api: {
      websocket_port: 10000
    }
  },

  listen: {
    endpoints: {
      router: ["tcp/0.0.0.0:7447"]
    }
  }
}
andreclerigo in …/zenoh-ts on  main [?] via 🦕 via  v20.18.0 via 🦀 v1.85.0
$ cargo build
   Compiling zenoh-plugin-remote-api v1.6.1 (/Users/andreclerigo/git/zenoh-ts/zenoh-plugin-remote-api)
   Compiling zenoh-bridge-remote-api v1.4.0 (/Users/andreclerigo/git/zenoh-ts/zenoh-bridge-remote-api)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 3.98s

andreclerigo in …/zenoh-ts on  main [?] via 🦕 via  v20.18.0 via 🦀 v1.85.0
$ cargo install cargo-run-bin
    Updating crates.io index
     Ignored package `cargo-run-bin v1.7.5` is already installed, use --force to override

andreclerigo in …/zenoh-ts on  main [?] via 🦕 via  v20.18.0 via 🦀 v1.85.0
$ cargo bin zenohd --config zenoh.json5
...
2025-10-15T10:02:04.489036Z  INFO main ThreadId(01) zenohd: zenohd v3b0cef30 built with rustc 1.85.0 (4d91de4e4 2025-02-17)
2025-10-15T10:02:04.490613Z  INFO main ThreadId(01) zenohd: Initial conf: {"access_control":{"default_permission":"deny","enabled":false,"policies":null,"rules":null,"subjects":null},"adminspace":{"enabled":true,"permissions":{"read":true,"write":false}},"aggregation":{"publishers":[],"subscribers":[]},"connect":{"endpoints":[],"exit_on_failure":null,"retry":null,"timeout_ms":null},"downsampling":[],"id":null,"listen":{"endpoints":{"router":["tcp/0.0.0.0:7447"]},"exit_on_failure":null,"retry":null,"timeout_ms":null},"low_pass_filter":[],"metadata":null,"mode":"router","namespace":null,"open":{"return_conditions":{"connect_scouted":null,"declares":null}},"plugins":{"remote_api":{"websocket_port":10000}},"plugins_loading":{"enabled":true,"search_dirs":["./target/debug","~/.zenoh/lib"]},"qos":{"network":[],"publication":[]},"queries_default_timeout":null,"routing":{"interests":{"timeout":null},"peer":{"linkstate":{"transport_weights":[]},"mode":null},"router":{"linkstate":{"transport_weights":[]},"peers_failover_brokering":null}},"scouting":{"delay":null,"gossip":{"autoconnect":null,"autoconnect_strategy":null,"enabled":null,"multihop":null,"target":null},"multicast":{"address":null,"autoconnect":null,"autoconnect_strategy":null,"enabled":true,"interface":null,"listen":null,"ttl":null},"timeout":null},"timestamping":{"drop_future_timestamp":null,"enabled":null},"transport":{"auth":{"pubkey":{"key_size":null,"known_keys_file":null,"private_key_file":null,"private_key_pem":null,"public_key_file":null,"public_key_pem":null},"usrpwd":{"dictionary_file":null,"password":null,"user":null}},"link":{"protocols":null,"rx":{"buffer_size":65535,"max_message_size":1073741824},"tcp":{"so_rcvbuf":null,"so_sndbuf":null},"tls":{"close_link_on_expiration":null,"connect_certificate":null,"connect_private_key":null,"enable_mtls":null,"listen_certificate":null,"listen_private_key":null,"root_ca_certificate":null,"so_rcvbuf":null,"so_sndbuf":null,"verify_name_on_connect":null},"tx":{"batch_size":65535,"keep_alive":4,"lease":10000,"queue":{"allocation":{"mode":"lazy"},"batching":{"enabled":true,"time_limit":1},"congestion_control":{"block":{"wait_before_close":5000000},"drop":{"max_wait_before_drop_fragments":50000,"wait_before_drop":1000}},"size":{"background":2,"control":2,"data":2,"data_high":2,"data_low":2,"interactive_high":2,"interactive_low":2,"real_time":2}},"sequence_number_resolution":"32bit","threads":2},"unixpipe":{"file_access_mask":null}},"multicast":{"compression":{"enabled":false},"join_interval":2500,"max_sessions":1000,"qos":{"enabled":false}},"shared_memory":{"enabled":true,"mode":"lazy","transport_optimization":{"enabled":true,"message_size_threshold":3072,"pool_size":16777216}},"unicast":{"accept_pending":100,"accept_timeout":10000,"compression":{"enabled":false},"lowlatency":false,"max_links":1,"max_sessions":1000,"open_timeout":10000,"qos":{"enabled":true}}}}
2025-10-15T10:02:04.491282Z  INFO main ThreadId(01) zenoh::net::runtime: Using ZID: 6d25f87ac08a70e8c9732c21eb9587ad
2025-10-15T10:02:04.491771Z  INFO main ThreadId(01) zenoh::api::loader: Loading  plugin "remote_api"
2025-10-15T10:02:05.108588Z  INFO main ThreadId(01) zenoh::api::loader: Starting  plugin "remote_api"
2025-10-15T10:02:05.111380Z  INFO main ThreadId(01) zenoh::api::loader: Successfully started plugin remote_api from "./target/debug/libzenoh_plugin_remote_api.dylib"
2025-10-15T10:02:05.111554Z  INFO main ThreadId(01) zenoh::api::loader: Finished loading plugins
2025-10-15T10:02:05.113169Z  INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/10.0.23.36:7447
2025-10-15T10:02:05.113184Z  INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/10.0.22.135:7447
2025-10-15T10:02:05.113189Z  INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/100.123.153.124:7447
2025-10-15T10:02:05.113779Z  INFO main ThreadId(01) zenoh::net::runtime::orchestrator: zenohd listening scout messages on 224.0.0.224:7446

zenoh-deno-pub-loop.ts

import { Session, Config } from "npm:@eclipse-zenoh/[email protected]";
const key = Deno.args[0] ?? "bsole/sensors/orientation";

function normWsLocator(s?: string | null): string {
    if (!s || s.trim() === "") return "ws/127.0.0.1:10000";
    const t = s.trim();
    if (t.startsWith("ws://")) return "ws/" + t.slice("ws://".length);
    if (t.startsWith("wss://")) return "wss/" + t.slice("wss://".length);
    return t;
}

const WS = normWsLocator(Deno.env.get("ZENOH_WS"));
const enc = new TextEncoder();

const session = await Session.open(new Config(WS));
console.log(`[deno-pub-loop] connected to ${WS}, declaring publisher on ${key}`);

const pubDecl = (session as any).declarePublisher ?? (session as any).declare_publisher;
if (typeof pubDecl !== "function") {
    console.error("[deno-pub-loop] No declarePublisher API on Session");
    Deno.exit(2);
}
const pub = await pubDecl.call(session, key);
console.log("[deno-pub-loop] publisher declared");

let i = 0;
while (true) {
    const payload = enc.encode(JSON.stringify({ i, ts: Date.now() }));
    // Use the publisher's put
    await (pub.put ?? pub.write).call(pub, payload);
    console.log(`[deno-pub-loop] put #${i}`);
    i++;
    await new Promise((r) => setTimeout(r, 1000));
}

zenoh-deno-sub.ts

import { Session, Config } from "npm:@eclipse-zenoh/[email protected]";

function normWsLocator(s?: string | null): string {
    if (!s || s.trim() === "") return "ws/127.0.0.1:10000";
    const t = s.trim();
    if (t.startsWith("ws://")) return "ws/" + t.slice("ws://".length);
    if (t.startsWith("wss://")) return "wss/" + t.slice("wss://".length);
    return t;
}

const WS = normWsLocator(Deno.env.get("ZENOH_WS"));
const KEYEXPR =
    Deno.args[0] ??
    Deno.env.get("ZENOH_SUB") ??
    Deno.env.get("ZENOH_KEYEXPR") ??
    "bsole/sensors/**";

const dec = new TextDecoder();

const session = await Session.open(new Config(WS));
console.log(`[deno-sub] connected to ${WS}`);
console.log(`[deno-sub] subscribing to ${KEYEXPR}`);

let shuttingDown = false;
async function shutdown(code = 0) {
    if (shuttingDown) return;
    shuttingDown = true;
    try {
        await session.close?.();
    } catch {}
    await new Promise((r) => setTimeout(r, 20));
    Deno.exit(code);
}
addEventListener("SIGTERM", () => shutdown(0));
addEventListener("SIGINT", () => shutdown(0));

function bytesToStr(v: unknown): string {
    if (v instanceof Uint8Array) return dec.decode(v);
    if (typeof v === "string") return v;
    try {
        return JSON.stringify(v);
    } catch {
        return String(v);
    }
}
function getKey(s: any): string {
    return s?.key ?? s?.keyexpr ?? s?.keyExpr ?? "<unknown-key>";
}
function getVal(s: any): unknown {
    return s?.value?.payload ?? s?.payload ?? s?.value ?? null;
}
function prettyMaybeJSON(s: string): string {
    try {
        return JSON.stringify(JSON.parse(s), null, 2);
    } catch {
        return s;
    }
}

async function useDeclareSubscriber() {
    const decl = (session as any).declareSubscriber ?? (session as any).declare_subscriber;
    const sub = await decl.call(session, KEYEXPR, (sample: any) => {
        const k = getKey(sample);
        const v = getVal(sample);
        const asString = bytesToStr(v);
        console.log(`[${new Date().toISOString()}] ${k}\n${prettyMaybeJSON(asString)}\n`);
        // Debug: uncomment to see raw sample shape once
        // console.error("[raw sample]", JSON.stringify(sample, null, 2));
    });
    // Keep alive
    await new Promise(() => {});
    await sub?.close?.();
}

async function useSubscribeIterator() {
    const iter = await (session as any).subscribe(KEYEXPR);
    for await (const sample of iter as AsyncIterable<any>) {
        const k = getKey(sample);
        const v = getVal(sample);
        const asString = bytesToStr(v);
        console.log(`[${new Date().toISOString()}] ${k}\n${prettyMaybeJSON(asString)}\n`);
        // console.error("[raw sample]", JSON.stringify(sample, null, 2));
    }
}

if (
    typeof (session as any).declareSubscriber === "function" ||
    typeof (session as any).declare_subscriber === "function"
) {
    await useDeclareSubscriber();
} else if (typeof (session as any).subscribe === "function") {
    await useSubscribeIterator();
} else {
    console.error("[deno-sub] No subscriber API found on zenoh-ts Session.");
    await shutdown(2);
}

The output of my scripts running at the same time:

$ ZENOH_WS=ws://127.0.0.1:10000 \
deno run --quiet --allow-net --allow-env tools/zenoh-deno-sub.ts "bsole/sensors/**"

Connected to ws://127.0.0.1:10000
Successfully opened session with id: 91968f26-5da3-4110-9c0c-3a66a4bf4387
[deno-sub] connected to ws/127.0.0.1:10000
[deno-sub] subscribing to bsole/sensors/**

$ ZENOH_WS=ws://127.0.0.1:10000 \
deno run --quiet --allow-net --allow-env tools/zenoh-deno-pub-loop.ts

Connected to ws://127.0.0.1:10000
Successfully opened session with id: a22e9b0e-d328-498f-91c5-9b7bf5dd76f6
[deno-pub-loop] connected to ws/127.0.0.1:10000, declaring publisher on bsole/sensors/orientation
[deno-pub-loop] publisher declared
[deno-pub-loop] put #0
[deno-pub-loop] put #1
[deno-pub-loop] put #2
[deno-pub-loop] put #3
[deno-pub-loop] put #4
[deno-pub-loop] put #5
[deno-pub-loop] put #6

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions