Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a15237f
Test Partial Message in interop tester
MarcoPolo Sep 24, 2025
3e7ff0b
add rust implementation for partial messages
jxs Sep 26, 2025
813a6df
remove instructions type field
jxs Sep 26, 2025
130dcc2
fix PublishPartial group_id rename
jxs Sep 29, 2025
98268aa
gossipsub-interop: partial messages, special case 2 nodes
MarcoPolo Sep 29, 2025
41949a4
fixup add partial to gossipsub interop subscript tester
MarcoPolo Sep 29, 2025
25fa6fd
workaround for rust bug
MarcoPolo Sep 29, 2025
f2de541
gossipsub-interop(rust-libp2p): fix instruction parsing
MarcoPolo Sep 29, 2025
b687025
gossipsub-interop(rust-libp2p): Fix cargo warning
MarcoPolo Sep 29, 2025
b656686
gossipsub-interop(rust-libp2p): fix bitmap's available/missing methods
MarcoPolo Sep 29, 2025
d4071af
gossipsub-interop(rust-libp2p): implement republishing logic
MarcoPolo Sep 29, 2025
fc64c48
debug print lines
MarcoPolo Sep 29, 2025
315a1e8
update rust version to latest rust-libp2p
jxs Sep 29, 2025
ddf72bf
update rust-libp2p dep
jxs Sep 30, 2025
eeb6ed1
update for latest rust-libp2p changes
jxs Oct 1, 2025
2a0c229
update rust-libp2p sim to the updated scriptparams
jxs Oct 1, 2025
624bbc4
gossipsub-interop: lint fixes
MarcoPolo Oct 1, 2025
131b03f
gossipsub-intero(go): update to latest partial message impl
MarcoPolo Oct 1, 2025
f4b515d
gossipsub-interop: update go-libp2p to the latest version
sukunrt Oct 12, 2025
abac98b
gossipsub-interop: bump go-libp2p version
MarcoPolo Nov 22, 2025
413ff78
add missing MergeMedata func
MarcoPolo Nov 22, 2025
4e289d6
properly return err on nested instruction
MarcoPolo Nov 22, 2025
be279c7
add partial_messages check
MarcoPolo Nov 22, 2025
c74b76e
gossipsub-interop: add a new scenario for partial messages
sukunrt Oct 12, 2025
959ec03
gossipsub-interop: add partial message fanout scenario test
MarcoPolo Nov 22, 2025
5b5c0e9
log "All parts received" only once per group id
MarcoPolo Nov 22, 2025
07765da
gossipsub-interop: Add a basic `make test` command to run all tests
MarcoPolo Nov 22, 2025
9ccd0cd
gossipsub-interop: Change rust log to match checker
MarcoPolo Nov 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gossipsub-interop/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
*.swp
/shadow.data
shadow-outputs/*
latest
synctest*.data
/shadow.yaml
/graph.gml
Expand Down
17 changes: 15 additions & 2 deletions gossipsub-interop/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,20 @@ binaries:

# Clean all generated shadow simulation files
clean:
rm -rf *.data || true
rm -rf shadow-outputs || true
rm plots/* || true

.PHONY: binaries all clean
test:
# Testing partial messages
@echo "Testing partial messages"
@uv run run.py --node_count 8 --composition "all-go" --scenario "partial-messages" && uv run checks/partial_messages.py latest --count 1

@echo "Testing partial messages chain"
@uv run run.py --node_count 8 --composition "all-go" --scenario "partial-messages-chain" && uv run checks/partial_messages.py latest --count 16

@echo "Testing fanout"
@uv run run.py --node_count 2 --composition "all-go" --scenario "partial-messages-fanout" && uv run checks/partial_messages.py latest/



.PHONY: binaries all clean test
18 changes: 18 additions & 0 deletions gossipsub-interop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ After implementing it, make sure to add build commands in the Makefile's `binari

Finally, add it to the `composition` function in `experiment.py`.

## Examples

Minimal test of partial messages

```bash
uv run run.py --node_count 2 --composition "all-go" --scenario "partial-messages" && uv run checks/partial_messages.py latest/
```

That command runs the shadow simulation and then verifies the stdout logs have the expected message.

## Tests

```bash
make test
```

This runs various shadow simulations and checks.

## Future work (contributions welcome)

- Add more scenarios.
Expand Down
93 changes: 93 additions & 0 deletions gossipsub-interop/checks/partial_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""Verify that each node stdout log contains the expected completion message."""

from __future__ import annotations

import argparse
import sys
from pathlib import Path

MESSAGE_SUBSTRING = '"msg":"All parts received"'


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Validate that every node stdout log inside a Shadow output directory "
"contains the expected completion message."
)
)
parser.add_argument(
"shadow_output",
help="Path to the Shadow output directory (the one containing the hosts/ folder).",
)
parser.add_argument(
"--count",
type=int,
default=1,
help="Minimum number of times each stdout log must contain the target message (default: 1).",
)
return parser.parse_args()


def iter_stdout_logs(hosts_dir: Path):
"""Yield all stdout log files under the given hosts directory."""
for stdout_file in sorted(hosts_dir.rglob("*.stdout")):
if stdout_file.is_file():
yield stdout_file


def count_occurrences(path: Path, needle: str) -> int:
"""Count how many times the string appears inside the file."""
with path.open("r", encoding="utf-8", errors="replace") as handle:
total = 0
for chunk in iter(lambda: handle.read(4096), ""):
total += chunk.count(needle)
return total


def main() -> int:
args = parse_args()
base_dir = Path(args.shadow_output).expanduser().resolve()
if not base_dir.exists():
print(f"shadow output directory does not exist: {base_dir}", file=sys.stderr)
return 1

hosts_dir = base_dir / "hosts"
if not hosts_dir.is_dir():
print(f"hosts directory not found under: {base_dir}", file=sys.stderr)
return 1

stdout_logs = list(iter_stdout_logs(hosts_dir))
if not stdout_logs:
print(f"no stdout logs found under: {hosts_dir}", file=sys.stderr)
return 1

missing = []
for log_path in stdout_logs:
occurrences = count_occurrences(log_path, MESSAGE_SUBSTRING)
if occurrences < args.count:
missing.append((log_path, occurrences))

if missing:
print(
"The following stdout logs do not contain the required message:",
file=sys.stderr,
)
for log_path, occurrences in missing:
rel_path = log_path.relative_to(base_dir)
print(
f" - {rel_path}: found {occurrences} occurrences (expected >= {args.count})",
file=sys.stderr,
)
print(f"{len(missing)} / {len(stdout_logs)} logs missing the message.", file=sys.stderr)
return 1

print(
f"All {len(stdout_logs)} stdout logs under {hosts_dir} contain the required message."
)
return 0


if __name__ == "__main__":
sys.exit(main())
222 changes: 219 additions & 3 deletions gossipsub-interop/experiment.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import random
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import timedelta
import random
from typing import List, Dict, Set
from typing import Dict, List, Set

from script_instruction import GossipSubParams, ScriptInstruction, NodeID
import script_instruction
from script_instruction import GossipSubParams, NodeID, ScriptInstruction


@dataclass
Expand Down Expand Up @@ -38,11 +38,227 @@ def spread_heartbeat_delay(
return instructions


def partial_message_scenario(
disable_gossip: bool, node_count: int
) -> List[ScriptInstruction]:
instructions: List[ScriptInstruction] = []
gs_params = GossipSubParams()
if disable_gossip:
gs_params.Dlazy = 0
gs_params.GossipFactor = 0
instructions.extend(spread_heartbeat_delay(node_count, gs_params))

number_of_conns_per_node = min(20, node_count - 1)
instructions.extend(random_network_mesh(node_count, number_of_conns_per_node))

topic = "a-subnet"
instructions.append(
script_instruction.SubscribeToTopic(topicID=topic, partial=True)
)

groupID = random.randint(0, (2**8) - 1)

# Wait for some setup time
elapsed_seconds = 30
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))

# Assign random parts to each node
if node_count == 2:
# If just two nodes, make sure we can always generate a full message
part = random.randint(0, 255)
instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=0,
instruction=script_instruction.AddPartialMessage(
topicID=topic, groupID=groupID, parts=part
),
)
)
instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=1,
instruction=script_instruction.AddPartialMessage(
topicID=topic, groupID=groupID, parts=(0xFF ^ part)
),
)
)
else:
for i in range(node_count):
parts = random.randint(0, 255)
instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=i,
instruction=script_instruction.AddPartialMessage(
topicID=topic, groupID=groupID, parts=parts
),
)
)

# Everyone publishes their partial message. This is how nodes learn about
# each others parts and can request them.
instructions.append(
script_instruction.PublishPartial(topicID=topic, groupID=groupID)
)

# Wait for everything to flush
elapsed_seconds += 10
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))

return instructions


def partial_message_chain_scenario(
disable_gossip: bool, node_count: int
) -> List[ScriptInstruction]:
instructions: List[ScriptInstruction] = []
gs_params = GossipSubParams()
if disable_gossip:
gs_params.Dlazy = 0
gs_params.GossipFactor = 0
instructions.extend(spread_heartbeat_delay(node_count, gs_params))

# Create a bidirectional chain topology: 0<->1<->2....<->n-1
# Each node connects to both previous and next (except first and last)
for i in range(node_count):
connections = []
if i > 0:
connections.append(i - 1) # Connect to previous
if i < node_count - 1:
connections.append(i + 1) # Connect to next

if connections:
instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=i,
instruction=script_instruction.Connect(connectTo=connections),
)
)

topic = "partial-msg-chain"
instructions.append(
script_instruction.SubscribeToTopic(topicID=topic, partial=True)
)

# Wait for setup time and mesh stabilization
elapsed_seconds = 30
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))

# 16 messages with 8 parts each
num_messages = 16
num_parts = 8

# Assign parts to nodes in round-robin fashion
# Each message-part combination goes to exactly one node
for msg_idx in range(num_messages):
groupID = msg_idx # Unique group ID for each message

# Assign each of the 8 parts to nodes in round-robin
for part_idx in range(num_parts):
node_idx = (msg_idx * num_parts + part_idx) % node_count
part_bitmap = 1 << part_idx # Single bit for this part

instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=node_idx,
instruction=script_instruction.AddPartialMessage(
topicID=topic, groupID=groupID, parts=part_bitmap
),
)
)

# Have multiple nodes with parts for each message try to publish
# This creates redundancy and ensures the exchange process starts
for msg_idx in range(num_messages):
groupID = msg_idx

elapsed_seconds += 2 # Delay between message groups
instructions.append(
script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds)
)
instructions.append(
script_instruction.PublishPartial(topicID=topic, groupID=groupID)
)

# Wait for propagation and assembly
elapsed_seconds += 30
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))
return instructions


def partial_message_fanout_scenario(
disable_gossip: bool, node_count: int
) -> List[ScriptInstruction]:
instructions: List[ScriptInstruction] = []
gs_params = GossipSubParams()
if disable_gossip:
gs_params.Dlazy = 0
gs_params.GossipFactor = 0
instructions.extend(spread_heartbeat_delay(node_count, gs_params))

number_of_conns_per_node = min(20, node_count - 1)
instructions.extend(random_network_mesh(node_count, number_of_conns_per_node))

topic = "a-subnet"
for i in range(node_count):
# The first node will not subscribe to the topic.
if i == 0:
continue

instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=i,
instruction=script_instruction.SubscribeToTopic(
topicID=topic, partial=True
),
)
)

groupID = random.randint(0, (2**8) - 1)

# Wait for some setup time
elapsed_seconds = 30
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))

# First node has everything
instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=0,
instruction=script_instruction.AddPartialMessage(
topicID=topic, groupID=groupID, parts=0xFF
),
)
)

# First node publishes to a fanout set, here we are saying the first 7 nodes after the publisher
instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=0,
instruction=script_instruction.PublishPartial(
topicID=topic,
groupID=groupID,
publishToNodeIDs=list(range(1, min(8, node_count))),
),
)
)

# Wait for everything to flush
elapsed_seconds += 10
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))

return instructions


def scenario(
scenario_name: str, node_count: int, disable_gossip: bool
) -> ExperimentParams:
instructions: List[ScriptInstruction] = []
match scenario_name:
case "partial-messages":
instructions = partial_message_scenario(disable_gossip, node_count)
case "partial-messages-chain":
instructions = partial_message_chain_scenario(disable_gossip, node_count)
case "partial-messages-fanout":
instructions = partial_message_fanout_scenario(disable_gossip, node_count)
case "subnet-blob-msg":
gs_params = GossipSubParams()
if disable_gossip:
Expand Down
Loading