-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
Using the main branch from zenoh-ts.
zenoh.json5
{
mode: "router",
plugins_loading: {
enabled: true,
search_dirs: ["./target/debug", "~/.zenoh/lib"]
},
plugins: {
remote_api: {
websocket_port: 10000
}
},
listen: {
endpoints: {
router: ["tcp/0.0.0.0:7447"]
}
}
}andreclerigo in …/zenoh-ts on main [?] via 🦕 via v20.18.0 via 🦀 v1.85.0
$ cargo build
Compiling zenoh-plugin-remote-api v1.6.1 (/Users/andreclerigo/git/zenoh-ts/zenoh-plugin-remote-api)
Compiling zenoh-bridge-remote-api v1.4.0 (/Users/andreclerigo/git/zenoh-ts/zenoh-bridge-remote-api)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 3.98s
andreclerigo in …/zenoh-ts on main [?] via 🦕 via v20.18.0 via 🦀 v1.85.0
$ cargo install cargo-run-bin
Updating crates.io index
Ignored package `cargo-run-bin v1.7.5` is already installed, use --force to override
andreclerigo in …/zenoh-ts on main [?] via 🦕 via v20.18.0 via 🦀 v1.85.0
$ cargo bin zenohd --config zenoh.json5
...
2025-10-15T10:02:04.489036Z INFO main ThreadId(01) zenohd: zenohd v3b0cef30 built with rustc 1.85.0 (4d91de4e4 2025-02-17)
2025-10-15T10:02:04.490613Z INFO main ThreadId(01) zenohd: Initial conf: {"access_control":{"default_permission":"deny","enabled":false,"policies":null,"rules":null,"subjects":null},"adminspace":{"enabled":true,"permissions":{"read":true,"write":false}},"aggregation":{"publishers":[],"subscribers":[]},"connect":{"endpoints":[],"exit_on_failure":null,"retry":null,"timeout_ms":null},"downsampling":[],"id":null,"listen":{"endpoints":{"router":["tcp/0.0.0.0:7447"]},"exit_on_failure":null,"retry":null,"timeout_ms":null},"low_pass_filter":[],"metadata":null,"mode":"router","namespace":null,"open":{"return_conditions":{"connect_scouted":null,"declares":null}},"plugins":{"remote_api":{"websocket_port":10000}},"plugins_loading":{"enabled":true,"search_dirs":["./target/debug","~/.zenoh/lib"]},"qos":{"network":[],"publication":[]},"queries_default_timeout":null,"routing":{"interests":{"timeout":null},"peer":{"linkstate":{"transport_weights":[]},"mode":null},"router":{"linkstate":{"transport_weights":[]},"peers_failover_brokering":null}},"scouting":{"delay":null,"gossip":{"autoconnect":null,"autoconnect_strategy":null,"enabled":null,"multihop":null,"target":null},"multicast":{"address":null,"autoconnect":null,"autoconnect_strategy":null,"enabled":true,"interface":null,"listen":null,"ttl":null},"timeout":null},"timestamping":{"drop_future_timestamp":null,"enabled":null},"transport":{"auth":{"pubkey":{"key_size":null,"known_keys_file":null,"private_key_file":null,"private_key_pem":null,"public_key_file":null,"public_key_pem":null},"usrpwd":{"dictionary_file":null,"password":null,"user":null}},"link":{"protocols":null,"rx":{"buffer_size":65535,"max_message_size":1073741824},"tcp":{"so_rcvbuf":null,"so_sndbuf":null},"tls":{"close_link_on_expiration":null,"connect_certificate":null,"connect_private_key":null,"enable_mtls":null,"listen_certificate":null,"listen_private_key":null,"root_ca_certificate":null,"so_rcvbuf":null,"so_sndbuf":null,"verify_name_on_connect":null},"tx":{"batch_size":65535,"keep_alive":4,"lease":10000,"queue":{"allocation":{"mode":"lazy"},"batching":{"enabled":true,"time_limit":1},"congestion_control":{"block":{"wait_before_close":5000000},"drop":{"max_wait_before_drop_fragments":50000,"wait_before_drop":1000}},"size":{"background":2,"control":2,"data":2,"data_high":2,"data_low":2,"interactive_high":2,"interactive_low":2,"real_time":2}},"sequence_number_resolution":"32bit","threads":2},"unixpipe":{"file_access_mask":null}},"multicast":{"compression":{"enabled":false},"join_interval":2500,"max_sessions":1000,"qos":{"enabled":false}},"shared_memory":{"enabled":true,"mode":"lazy","transport_optimization":{"enabled":true,"message_size_threshold":3072,"pool_size":16777216}},"unicast":{"accept_pending":100,"accept_timeout":10000,"compression":{"enabled":false},"lowlatency":false,"max_links":1,"max_sessions":1000,"open_timeout":10000,"qos":{"enabled":true}}}}
2025-10-15T10:02:04.491282Z INFO main ThreadId(01) zenoh::net::runtime: Using ZID: 6d25f87ac08a70e8c9732c21eb9587ad
2025-10-15T10:02:04.491771Z INFO main ThreadId(01) zenoh::api::loader: Loading plugin "remote_api"
2025-10-15T10:02:05.108588Z INFO main ThreadId(01) zenoh::api::loader: Starting plugin "remote_api"
2025-10-15T10:02:05.111380Z INFO main ThreadId(01) zenoh::api::loader: Successfully started plugin remote_api from "./target/debug/libzenoh_plugin_remote_api.dylib"
2025-10-15T10:02:05.111554Z INFO main ThreadId(01) zenoh::api::loader: Finished loading plugins
2025-10-15T10:02:05.113169Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/10.0.23.36:7447
2025-10-15T10:02:05.113184Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/10.0.22.135:7447
2025-10-15T10:02:05.113189Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/100.123.153.124:7447
2025-10-15T10:02:05.113779Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: zenohd listening scout messages on 224.0.0.224:7446zenoh-deno-pub-loop.ts
import { Session, Config } from "npm:@eclipse-zenoh/[email protected]";
const key = Deno.args[0] ?? "bsole/sensors/orientation";
function normWsLocator(s?: string | null): string {
if (!s || s.trim() === "") return "ws/127.0.0.1:10000";
const t = s.trim();
if (t.startsWith("ws://")) return "ws/" + t.slice("ws://".length);
if (t.startsWith("wss://")) return "wss/" + t.slice("wss://".length);
return t;
}
const WS = normWsLocator(Deno.env.get("ZENOH_WS"));
const enc = new TextEncoder();
const session = await Session.open(new Config(WS));
console.log(`[deno-pub-loop] connected to ${WS}, declaring publisher on ${key}`);
const pubDecl = (session as any).declarePublisher ?? (session as any).declare_publisher;
if (typeof pubDecl !== "function") {
console.error("[deno-pub-loop] No declarePublisher API on Session");
Deno.exit(2);
}
const pub = await pubDecl.call(session, key);
console.log("[deno-pub-loop] publisher declared");
let i = 0;
while (true) {
const payload = enc.encode(JSON.stringify({ i, ts: Date.now() }));
// Use the publisher's put
await (pub.put ?? pub.write).call(pub, payload);
console.log(`[deno-pub-loop] put #${i}`);
i++;
await new Promise((r) => setTimeout(r, 1000));
}zenoh-deno-sub.ts
import { Session, Config } from "npm:@eclipse-zenoh/[email protected]";
function normWsLocator(s?: string | null): string {
if (!s || s.trim() === "") return "ws/127.0.0.1:10000";
const t = s.trim();
if (t.startsWith("ws://")) return "ws/" + t.slice("ws://".length);
if (t.startsWith("wss://")) return "wss/" + t.slice("wss://".length);
return t;
}
const WS = normWsLocator(Deno.env.get("ZENOH_WS"));
const KEYEXPR =
Deno.args[0] ??
Deno.env.get("ZENOH_SUB") ??
Deno.env.get("ZENOH_KEYEXPR") ??
"bsole/sensors/**";
const dec = new TextDecoder();
const session = await Session.open(new Config(WS));
console.log(`[deno-sub] connected to ${WS}`);
console.log(`[deno-sub] subscribing to ${KEYEXPR}`);
let shuttingDown = false;
async function shutdown(code = 0) {
if (shuttingDown) return;
shuttingDown = true;
try {
await session.close?.();
} catch {}
await new Promise((r) => setTimeout(r, 20));
Deno.exit(code);
}
addEventListener("SIGTERM", () => shutdown(0));
addEventListener("SIGINT", () => shutdown(0));
function bytesToStr(v: unknown): string {
if (v instanceof Uint8Array) return dec.decode(v);
if (typeof v === "string") return v;
try {
return JSON.stringify(v);
} catch {
return String(v);
}
}
function getKey(s: any): string {
return s?.key ?? s?.keyexpr ?? s?.keyExpr ?? "<unknown-key>";
}
function getVal(s: any): unknown {
return s?.value?.payload ?? s?.payload ?? s?.value ?? null;
}
function prettyMaybeJSON(s: string): string {
try {
return JSON.stringify(JSON.parse(s), null, 2);
} catch {
return s;
}
}
async function useDeclareSubscriber() {
const decl = (session as any).declareSubscriber ?? (session as any).declare_subscriber;
const sub = await decl.call(session, KEYEXPR, (sample: any) => {
const k = getKey(sample);
const v = getVal(sample);
const asString = bytesToStr(v);
console.log(`[${new Date().toISOString()}] ${k}\n${prettyMaybeJSON(asString)}\n`);
// Debug: uncomment to see raw sample shape once
// console.error("[raw sample]", JSON.stringify(sample, null, 2));
});
// Keep alive
await new Promise(() => {});
await sub?.close?.();
}
async function useSubscribeIterator() {
const iter = await (session as any).subscribe(KEYEXPR);
for await (const sample of iter as AsyncIterable<any>) {
const k = getKey(sample);
const v = getVal(sample);
const asString = bytesToStr(v);
console.log(`[${new Date().toISOString()}] ${k}\n${prettyMaybeJSON(asString)}\n`);
// console.error("[raw sample]", JSON.stringify(sample, null, 2));
}
}
if (
typeof (session as any).declareSubscriber === "function" ||
typeof (session as any).declare_subscriber === "function"
) {
await useDeclareSubscriber();
} else if (typeof (session as any).subscribe === "function") {
await useSubscribeIterator();
} else {
console.error("[deno-sub] No subscriber API found on zenoh-ts Session.");
await shutdown(2);
}The output of my scripts running at the same time:
$ ZENOH_WS=ws://127.0.0.1:10000 \
deno run --quiet --allow-net --allow-env tools/zenoh-deno-sub.ts "bsole/sensors/**"
Connected to ws://127.0.0.1:10000
Successfully opened session with id: 91968f26-5da3-4110-9c0c-3a66a4bf4387
[deno-sub] connected to ws/127.0.0.1:10000
[deno-sub] subscribing to bsole/sensors/**
$ ZENOH_WS=ws://127.0.0.1:10000 \
deno run --quiet --allow-net --allow-env tools/zenoh-deno-pub-loop.ts
Connected to ws://127.0.0.1:10000
Successfully opened session with id: a22e9b0e-d328-498f-91c5-9b7bf5dd76f6
[deno-pub-loop] connected to ws/127.0.0.1:10000, declaring publisher on bsole/sensors/orientation
[deno-pub-loop] publisher declared
[deno-pub-loop] put #0
[deno-pub-loop] put #1
[deno-pub-loop] put #2
[deno-pub-loop] put #3
[deno-pub-loop] put #4
[deno-pub-loop] put #5
[deno-pub-loop] put #6Metadata
Metadata
Assignees
Labels
No labels