Skip to content

Writing API Programs

Thomas Mangin edited this page Nov 13, 2025 · 5 revisions

Writing API Programs

Complete guide to building robust ExaBGP API programs


Table of Contents


Introduction

ExaBGP API programs are external processes that communicate with ExaBGP via STDIN/STDOUT pipes. This design is language-agnostic, simple, and powerful.

What Makes a Good API Program?

Essential characteristics:

  • βœ… Robust error handling - Don't crash on unexpected input
  • βœ… Proper buffering - Always flush STDOUT
  • βœ… State tracking - Avoid redundant announcements
  • βœ… Logging - Use STDERR for diagnostics
  • βœ… Signal handling - Graceful shutdown on SIGTERM
  • βœ… Health awareness - React to service state changes

Important principle:

πŸ”΄ ExaBGP does NOT manipulate RIB/FIB - Your program controls WHEN routes are announced. ExaBGP handles HOW they're sent via BGP. Route installation happens on the router, not in ExaBGP.


Language-Agnostic Patterns

ExaBGP works with any language that can read/write streams.

Universal Pattern

All languages follow this pattern:

1. Wait for ExaBGP to be ready (sleep 2-5 seconds)
2. Initialize state (track what's announced)
3. Enter main loop:
   a. Check service health
   b. Compare with current state
   c. Send commands if state changed
   d. FLUSH output (critical!)
   e. Sleep/wait for next check

Python

Advantages:

  • βœ… Standard library (no dependencies needed)
  • βœ… Easy JSON parsing
  • βœ… Good string handling
  • βœ… Rich libraries (socket, subprocess, etc.)

Basic template:

#!/usr/bin/env python3
import sys
import time

# Wait for ExaBGP
time.sleep(2)

# Main loop
while True:
    # Your logic here
    command = "announce route 100.10.0.0/24 next-hop self"
    sys.stdout.write(command + "\n")
    sys.stdout.flush()  # CRITICAL
    time.sleep(5)

Bash

Advantages:

  • βœ… No dependencies (available everywhere)
  • βœ… Simple for basic tasks
  • βœ… Easy to wrap existing tools

Disadvantages:

  • ❌ Limited JSON parsing
  • ❌ Harder to manage state
  • ❌ Less robust error handling

Basic template:

#!/bin/bash

# Wait for ExaBGP
sleep 2

# Main loop
while true; do
    # Your logic here
    echo "announce route 100.10.0.0/24 next-hop self"
    sleep 5
done

Go

Advantages:

  • βœ… Fast and efficient
  • βœ… Static binary (no dependencies)
  • βœ… Excellent concurrency
  • βœ… Strong typing

Basic template:

package main

import (
    "bufio"
    "fmt"
    "os"
    "time"
)

func main() {
    // Wait for ExaBGP
    time.Sleep(2 * time.Second)

    // Main loop
    for {
        // Your logic here
        fmt.Println("announce route 100.10.0.0/24 next-hop self")
        time.Sleep(5 * time.Second)
    }
}

Ruby

Basic template:

#!/usr/bin/env ruby

# Wait for ExaBGP
sleep 2

# Main loop
loop do
  # Your logic here
  puts "announce route 100.10.0.0/24 next-hop self"
  STDOUT.flush  # CRITICAL
  sleep 5
end

Node.js

Basic template:

#!/usr/bin/env node

// Wait for ExaBGP
setTimeout(() => {
    // Main loop
    setInterval(() => {
        // Your logic here
        console.log("announce route 100.10.0.0/24 next-hop self");
    }, 5000);
}, 2000);

Choosing Text vs JSON API

Text API

Use text encoder when:

  • βœ… Only sending commands (announce/withdraw)
  • βœ… Simple use cases (static routes, basic health checks)
  • βœ… Want human-readable format
  • βœ… Testing manually

Configuration:

process announce {
    run /etc/exabgp/api/announce.py;
    encoder text;
}

Example:

print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()

JSON API

Use JSON encoder when:

  • βœ… Receiving BGP updates from router
  • βœ… Need structured data parsing
  • βœ… Building complex integrations
  • βœ… Processing routes from peers

Configuration:

process receive {
    run /etc/exabgp/api/receive.py;
    encoder json;
    receive {
        parsed;
        updates;
        neighbor-changes;
    }
}

Example:

import json

for line in sys.stdin:
    msg = json.loads(line)
    if msg['type'] == 'update':
        # Process BGP updates
        pass

Using Both

Best practice for complex scenarios:

# Process 1: Send commands (text)
process announce {
    run /etc/exabgp/api/announce.py;
    encoder text;
}

# Process 2: Receive updates (JSON)
process receive {
    run /etc/exabgp/api/receive.py;
    encoder json;
    receive {
        parsed;
        updates;
    }
}

neighbor 192.168.1.1 {
    router-id 192.168.1.2;
    local-address 192.168.1.2;
    local-as 65001;
    peer-as 65000;

    api {
        processes [ announce, receive ];
    }
}

Program Structure

Basic Structure

Every API program should have:

#!/usr/bin/env python3
"""
program.py - Description of what this does
"""
import sys
import time
import signal

# Configuration
CONFIG = {
    'check_interval': 5,
    'service_ip': '100.10.0.100',
    'service_port': 80
}

# Global state
announced = False

def log(message):
    """Log to STDERR (goes to ExaBGP log)"""
    sys.stderr.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}\n")
    sys.stderr.flush()

def signal_handler(signum, frame):
    """Handle SIGTERM gracefully"""
    log(f"Received signal {signum}, shutting down")
    sys.exit(0)

def check_health():
    """Check if service is healthy"""
    # Your health check logic
    return True

def announce_route(prefix):
    """Announce route to ExaBGP"""
    global announced
    sys.stdout.write(f"announce route {prefix} next-hop self\n")
    sys.stdout.flush()
    announced = True
    log(f"Announced {prefix}")

def withdraw_route(prefix):
    """Withdraw route from ExaBGP"""
    global announced
    sys.stdout.write(f"withdraw route {prefix}\n")
    sys.stdout.flush()
    announced = False
    log(f"Withdrawn {prefix}")

def main():
    """Main program loop"""
    global announced

    # Register signal handler
    signal.signal(signal.SIGTERM, signal_handler)

    # Wait for ExaBGP
    log("Starting, waiting for ExaBGP...")
    time.sleep(2)
    log("Ready")

    # Main loop
    prefix = f"{CONFIG['service_ip']}/32"

    while True:
        try:
            healthy = check_health()

            if healthy and not announced:
                announce_route(prefix)
            elif not healthy and announced:
                withdraw_route(prefix)

            time.sleep(CONFIG['check_interval'])

        except Exception as e:
            log(f"Error in main loop: {e}")
            time.sleep(CONFIG['check_interval'])

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        log("Interrupted by user")
        sys.exit(0)
    except Exception as e:
        log(f"Fatal error: {e}")
        sys.exit(1)

Receiving BGP Updates Structure

For programs that process BGP updates:

#!/usr/bin/env python3
"""
receive.py - Process incoming BGP updates
"""
import sys
import json

def log(message):
    """Log to STDERR"""
    sys.stderr.write(f"{message}\n")
    sys.stderr.flush()

def handle_update(msg):
    """Process UPDATE messages"""
    try:
        update = msg['neighbor']['message']['update']

        # Process announcements
        if 'announce' in update:
            if 'ipv4 unicast' in update['announce']:
                routes = update['announce']['ipv4 unicast']
                for prefix, attrs_list in routes.items():
                    for attrs in attrs_list:
                        nexthop = attrs.get('next-hop', 'unknown')
                        log(f"[ANNOUNCE] {prefix} via {nexthop}")

        # Process withdrawals
        if 'withdraw' in update:
            if 'ipv4 unicast' in update['withdraw']:
                for prefix in update['withdraw']['ipv4 unicast'].keys():
                    log(f"[WITHDRAW] {prefix}")

    except Exception as e:
        log(f"[ERROR] Failed to process update: {e}")

def handle_state(msg):
    """Process STATE messages"""
    try:
        peer = msg['neighbor']['address']['peer']
        state = msg['neighbor']['state']
        log(f"[STATE] BGP session with {peer}: {state}")
    except Exception as e:
        log(f"[ERROR] Failed to process state: {e}")

def handle_notification(msg):
    """Process NOTIFICATION messages"""
    try:
        peer = msg['neighbor']['address']['peer']
        notification = msg['neighbor']['message']['notification']
        code = notification.get('code', 'unknown')
        log(f"[NOTIFICATION] From {peer}: code={code}")
    except Exception as e:
        log(f"[ERROR] Failed to process notification: {e}")

def main():
    """Main message processing loop"""
    log("[INFO] Starting BGP message processor")

    while True:
        line = sys.stdin.readline()
        if not line:
            break  # EOF

        try:
            msg = json.loads(line.strip())

            msg_type = msg.get('type', 'unknown')

            if msg_type == 'update':
                handle_update(msg)
            elif msg_type == 'state':
                handle_state(msg)
            elif msg_type == 'notification':
                handle_notification(msg)
            elif msg_type == 'keepalive':
                # Usually don't need to process
                pass
            else:
                log(f"[WARN] Unknown message type: {msg_type}")

        except json.JSONDecodeError as e:
            log(f"[ERROR] JSON parse error: {e}")
        except Exception as e:
            log(f"[ERROR] Processing error: {e}")

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        log("[INFO] Interrupted")
        sys.exit(0)

STDIN/STDOUT Communication

Critical Rules

ALWAYS flush STDOUT:

# WRONG - buffered, ExaBGP won't see it immediately
print("announce route 100.10.0.0/24 next-hop self")

# CORRECT - flushed
print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()

# ALSO CORRECT - flush parameter
print("announce route 100.10.0.0/24 next-hop self", flush=True)

# ALSO CORRECT - write + flush
sys.stdout.write("announce route 100.10.0.0/24 next-hop self\n")
sys.stdout.flush()

Unbuffered Mode

Python unbuffered mode (recommended):

#!/usr/bin/env python3 -u
# The -u flag disables buffering

Or in configuration:

process announce {
    run python3 -u /etc/exabgp/api/announce.py;
    encoder text;
}

Environment variable:

export PYTHONUNBUFFERED=1
python3 /etc/exabgp/api/announce.py

Reading from STDIN

Blocking read (simple):

while True:
    line = sys.stdin.readline()
    if not line:
        break  # EOF
    process(line)

Non-blocking read with select() (advanced):

import select

# Check if data available with timeout
ready, _, _ = select.select([sys.stdin], [], [], 1.0)
if ready:
    line = sys.stdin.readline()
    process(line)

Using Robust ACK Handling

ExaBGP 4.x and 5.x support ACK responses (enabled by default):

import sys
import select
import time

def wait_for_ack(expected_count=1, timeout=30):
    """
    Wait for ACK responses with polling loop.
    ExaBGP may not respond immediately, so we poll with sleep.

    Handles both text and JSON encoder formats:
    - Text: "done", "error", "shutdown"
    - JSON: {"answer": "done|error|shutdown", "message": "..."}
    """
    import json
    received = 0
    start_time = time.time()

    while received < expected_count:
        if time.time() - start_time >= timeout:
            return False

        ready, _, _ = select.select([sys.stdin], [], [], 0.1)
        if ready:
            line = sys.stdin.readline().strip()

            # Parse response (could be text or JSON)
            answer = None
            if line.startswith('{'):
                try:
                    data = json.loads(line)
                    answer = data.get('answer')
                except:
                    pass
            else:
                answer = line

            if answer == "done":
                received += 1
            elif answer == "error":
                return False
            elif answer == "shutdown":
                raise SystemExit(0)
        else:
            time.sleep(0.1)

    return True

def send_with_ack(command):
    """Send command and wait for ACK"""
    sys.stdout.write(command + "\n")
    sys.stdout.flush()
    return wait_for_ack(expected_count=1)

# Use it
if send_with_ack("announce route 100.10.0.0/24 next-hop self"):
    # Command succeeded
    pass
else:
    # Command failed - handle error
    sys.exit(1)

Error Handling Strategies

Defensive Programming

Always use try/except:

def check_health():
    """Check service health with error handling"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        result = sock.connect_ex(('100.10.0.100', 80))
        sock.close()
        return result == 0
    except socket.error as e:
        log(f"[ERROR] Socket error: {e}")
        return False
    except Exception as e:
        log(f"[ERROR] Unexpected error: {e}")
        return False

Retry Logic

Retry with exponential backoff:

def retry_with_backoff(func, max_retries=3, initial_delay=1):
    """Retry function with exponential backoff"""
    delay = initial_delay

    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                log(f"[ERROR] Failed after {max_retries} attempts: {e}")
                raise

            log(f"[WARN] Attempt {attempt + 1} failed: {e}, retrying in {delay}s")
            time.sleep(delay)
            delay *= 2  # Exponential backoff

# Use it
def announce():
    sys.stdout.write("announce route 100.10.0.0/24 next-hop self\n")
    sys.stdout.flush()

retry_with_backoff(announce, max_retries=3)

Circuit Breaker Pattern

Prevent cascade failures:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
        self.state = 'closed'  # closed, open, half-open

    def call(self, func):
        """Call function with circuit breaker protection"""
        if self.state == 'open':
            # Check if timeout expired
            if time.time() - self.last_failure_time >= self.timeout:
                self.state = 'half-open'
                log("[CIRCUIT] Half-open, trying again")
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func()

            # Success - reset circuit
            if self.state == 'half-open':
                self.state = 'closed'
                self.failure_count = 0
                log("[CIRCUIT] Closed, service recovered")

            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = 'open'
                log(f"[CIRCUIT] OPEN after {self.failure_count} failures")

            raise

# Use it
breaker = CircuitBreaker(failure_threshold=5, timeout=60)

while True:
    try:
        healthy = breaker.call(check_health)
        if healthy:
            announce_route()
    except Exception as e:
        log(f"[ERROR] Circuit breaker: {e}")

    time.sleep(5)

State Management

Track What's Announced

Avoid redundant announcements:

# Global state tracking
announced_routes = set()

def announce_if_needed(prefix):
    """Only announce if not already announced"""
    global announced_routes

    if prefix not in announced_routes:
        sys.stdout.write(f"announce route {prefix} next-hop self\n")
        sys.stdout.flush()
        announced_routes.add(prefix)
        log(f"[ANNOUNCE] {prefix}")
    else:
        log(f"[SKIP] {prefix} already announced")

def withdraw_if_needed(prefix):
    """Only withdraw if currently announced"""
    global announced_routes

    if prefix in announced_routes:
        sys.stdout.write(f"withdraw route {prefix}\n")
        sys.stdout.flush()
        announced_routes.remove(prefix)
        log(f"[WITHDRAW] {prefix}")
    else:
        log(f"[SKIP] {prefix} not announced")

State Machine Pattern

Formal state management:

class ServiceState:
    def __init__(self):
        self.current_state = 'unknown'
        self.route_announced = False

    def transition(self, new_state):
        """Handle state transitions"""
        if new_state == self.current_state:
            return  # No change

        log(f"[STATE] {self.current_state} -> {new_state}")

        old_state = self.current_state
        self.current_state = new_state

        # Handle transitions
        if new_state == 'healthy' and not self.route_announced:
            self.announce()
        elif new_state == 'unhealthy' and self.route_announced:
            self.withdraw()

    def announce(self):
        """Announce route"""
        sys.stdout.write("announce route 100.10.0.100/32 next-hop self\n")
        sys.stdout.flush()
        self.route_announced = True
        log("[ACTION] Route announced")

    def withdraw(self):
        """Withdraw route"""
        sys.stdout.write("withdraw route 100.10.0.100/32\n")
        sys.stdout.flush()
        self.route_announced = False
        log("[ACTION] Route withdrawn")

# Use it
state = ServiceState()

while True:
    healthy = check_health()
    state.transition('healthy' if healthy else 'unhealthy')
    time.sleep(5)

Production Patterns

Health Check Patterns

HTTP health check:

import urllib.request

def http_health_check(url, timeout=2):
    """HTTP GET health check"""
    try:
        req = urllib.request.Request(url)
        response = urllib.request.urlopen(req, timeout=timeout)
        return response.status == 200
    except Exception as e:
        log(f"[HEALTH] HTTP check failed: {e}")
        return False

# Use it
if http_health_check('http://127.0.0.1:8080/health'):
    announce_route('100.10.0.100/32')

TCP port check:

import socket

def tcp_port_check(host, port, timeout=2):
    """TCP connection check"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except Exception as e:
        log(f"[HEALTH] TCP check failed: {e}")
        return False

# Use it
if tcp_port_check('127.0.0.1', 80):
    announce_route('100.10.0.100/32')

Command execution check:

import subprocess

def command_health_check(command, timeout=5):
    """Execute command, check exit code"""
    try:
        result = subprocess.run(
            command,
            shell=True,
            capture_output=True,
            timeout=timeout
        )
        return result.returncode == 0
    except Exception as e:
        log(f"[HEALTH] Command check failed: {e}")
        return False

# Use it
if command_health_check('curl -sf http://localhost/health'):
    announce_route('100.10.0.100/32')

Hysteresis Pattern

Prevent route flapping:

class HealthTracker:
    def __init__(self, threshold_up=3, threshold_down=2):
        self.threshold_up = threshold_up
        self.threshold_down = threshold_down
        self.consecutive_up = 0
        self.consecutive_down = 0
        self.current_state = 'down'

    def update(self, healthy):
        """Update health state with hysteresis"""
        if healthy:
            self.consecutive_up += 1
            self.consecutive_down = 0

            if self.consecutive_up >= self.threshold_up:
                if self.current_state != 'up':
                    log(f"[HEALTH] Service UP (after {self.consecutive_up} checks)")
                    self.current_state = 'up'
        else:
            self.consecutive_down += 1
            self.consecutive_up = 0

            if self.consecutive_down >= self.threshold_down:
                if self.current_state != 'down':
                    log(f"[HEALTH] Service DOWN (after {self.consecutive_down} checks)")
                    self.current_state = 'down'

        return self.current_state

# Use it
health = HealthTracker(threshold_up=3, threshold_down=2)

while True:
    check_result = check_health()
    state = health.update(check_result)

    if state == 'up':
        announce_route('100.10.0.100/32')
    else:
        withdraw_route('100.10.0.100/32')

    time.sleep(5)

Graceful Shutdown

Handle SIGTERM properly:

import signal

def shutdown_handler(signum, frame):
    """Handle shutdown signal"""
    log("[SHUTDOWN] Received SIGTERM, cleaning up...")

    # Withdraw all announced routes
    for prefix in announced_routes:
        sys.stdout.write(f"withdraw route {prefix}\n")
        sys.stdout.flush()
        log(f"[CLEANUP] Withdrawn {prefix}")

    log("[SHUTDOWN] Complete")
    sys.exit(0)

# Register handler
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)

Complete Examples

Example 1: Production Health Check (Python)

#!/usr/bin/env python3
"""
healthcheck.py - Production-grade health check for ExaBGP
"""
import sys
import time
import socket
import signal
import urllib.request

# Configuration
SERVICE_IP = "100.10.0.100"
SERVICE_PORT = 80
CHECK_INTERVAL = 5
HEALTH_THRESHOLD_UP = 3
HEALTH_THRESHOLD_DOWN = 2
TIMEOUT = 2

# State
announced = False
consecutive_healthy = 0
consecutive_unhealthy = 0

def log(message):
    """Log to STDERR"""
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    sys.stderr.write(f"[{timestamp}] {message}\n")
    sys.stderr.flush()

def signal_handler(signum, frame):
    """Handle shutdown gracefully"""
    log(f"Received signal {signum}, withdrawing routes and exiting")
    if announced:
        sys.stdout.write(f"withdraw route {SERVICE_IP}/32\n")
        sys.stdout.flush()
    sys.exit(0)

def check_tcp_port(host, port, timeout):
    """Check if TCP port is open"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except Exception as e:
        log(f"TCP check error: {e}")
        return False

def check_http(url, timeout):
    """Check HTTP endpoint"""
    try:
        req = urllib.request.Request(url)
        response = urllib.request.urlopen(req, timeout=timeout)
        return response.status == 200
    except Exception as e:
        log(f"HTTP check error: {e}")
        return False

def is_healthy():
    """Perform health checks"""
    # TCP port check
    if not check_tcp_port('127.0.0.1', SERVICE_PORT, TIMEOUT):
        return False

    # HTTP health endpoint check
    if not check_http(f'http://127.0.0.1:{SERVICE_PORT}/health', TIMEOUT):
        return False

    return True

def announce_route():
    """Announce route"""
    global announced
    sys.stdout.write(f"announce route {SERVICE_IP}/32 next-hop self\n")
    sys.stdout.flush()
    announced = True
    log(f"ANNOUNCED {SERVICE_IP}/32")

def withdraw_route():
    """Withdraw route"""
    global announced
    sys.stdout.write(f"withdraw route {SERVICE_IP}/32\n")
    sys.stdout.flush()
    announced = False
    log(f"WITHDRAWN {SERVICE_IP}/32")

def main():
    """Main loop with hysteresis"""
    global consecutive_healthy, consecutive_unhealthy

    # Register signal handlers
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    log("Starting health check daemon")
    time.sleep(2)  # Wait for ExaBGP
    log("Ready")

    while True:
        try:
            healthy = is_healthy()

            if healthy:
                consecutive_healthy += 1
                consecutive_unhealthy = 0

                # Announce after N consecutive healthy checks
                if consecutive_healthy >= HEALTH_THRESHOLD_UP and not announced:
                    announce_route()
            else:
                consecutive_unhealthy += 1
                consecutive_healthy = 0

                # Withdraw after N consecutive unhealthy checks
                if consecutive_unhealthy >= HEALTH_THRESHOLD_DOWN and announced:
                    withdraw_route()

            time.sleep(CHECK_INTERVAL)

        except Exception as e:
            log(f"Error in main loop: {e}")
            time.sleep(CHECK_INTERVAL)

if __name__ == '__main__':
    main()

Example 2: BGP Route Monitor (Python)

#!/usr/bin/env python3
"""
route_monitor.py - Monitor BGP routes and trigger actions
"""
import sys
import json
import subprocess

# Track routes
routes = {}

def log(message):
    """Log to STDERR"""
    sys.stderr.write(f"{message}\n")
    sys.stderr.flush()

def trigger_script(action, prefix, nexthop):
    """Trigger external script on route change"""
    script = f"/etc/exabgp/scripts/on_{action}.sh"
    try:
        subprocess.run([script, prefix, nexthop], timeout=5)
        log(f"[TRIGGER] Executed {script} for {prefix}")
    except Exception as e:
        log(f"[ERROR] Script execution failed: {e}")

def handle_announcement(prefix, attrs):
    """Handle route announcement"""
    nexthop = attrs.get('next-hop', 'unknown')

    if prefix not in routes:
        log(f"[NEW] {prefix} via {nexthop}")
        routes[prefix] = attrs
        trigger_script('announce', prefix, nexthop)
    else:
        log(f"[UPDATE] {prefix} via {nexthop}")
        routes[prefix] = attrs

def handle_withdrawal(prefix):
    """Handle route withdrawal"""
    if prefix in routes:
        nexthop = routes[prefix].get('next-hop', 'unknown')
        log(f"[WITHDRAWN] {prefix}")
        del routes[prefix]
        trigger_script('withdraw', prefix, nexthop)

def main():
    """Main message processing loop"""
    log("[START] BGP route monitor")

    while True:
        line = sys.stdin.readline()
        if not line:
            break

        try:
            msg = json.loads(line.strip())

            if msg['type'] == 'update':
                update = msg['neighbor']['message']['update']

                # Process announcements
                if 'announce' in update:
                    if 'ipv4 unicast' in update['announce']:
                        for prefix, attrs_list in update['announce']['ipv4 unicast'].items():
                            handle_announcement(prefix, attrs_list[0])

                # Process withdrawals
                if 'withdraw' in update:
                    if 'ipv4 unicast' in update['withdraw']:
                        for prefix in update['withdraw']['ipv4 unicast'].keys():
                            handle_withdrawal(prefix)

            elif msg['type'] == 'state':
                peer = msg['neighbor']['address']['peer']
                state = msg['neighbor']['state']
                log(f"[STATE] BGP session {peer}: {state}")

        except json.JSONDecodeError as e:
            log(f"[ERROR] JSON parse error: {e}")
        except Exception as e:
            log(f"[ERROR] Processing error: {e}")

if __name__ == '__main__':
    main()

Example 3: Health Check (Go)

package main

import (
    "fmt"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"
)

const (
    serviceIP    = "100.10.0.100"
    servicePort  = 80
    checkInterval = 5 * time.Second
)

var announced = false

func log(message string) {
    timestamp := time.Now().Format("2006-01-02 15:04:05")
    fmt.Fprintf(os.Stderr, "[%s] %s\n", timestamp, message)
}

func checkHealth() bool {
    conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", servicePort), 2*time.Second)
    if err != nil {
        return false
    }
    conn.Close()
    return true
}

func announceRoute() {
    fmt.Printf("announce route %s/32 next-hop self\n", serviceIP)
    announced = true
    log(fmt.Sprintf("ANNOUNCED %s/32", serviceIP))
}

func withdrawRoute() {
    fmt.Printf("withdraw route %s/32\n", serviceIP)
    announced = false
    log(fmt.Sprintf("WITHDRAWN %s/32", serviceIP))
}

func main() {
    // Handle signals
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)

    go func() {
        sig := <-sigChan
        log(fmt.Sprintf("Received signal %v, exiting", sig))
        if announced {
            withdrawRoute()
        }
        os.Exit(0)
    }()

    log("Starting health check daemon")
    time.Sleep(2 * time.Second)  // Wait for ExaBGP
    log("Ready")

    for {
        healthy := checkHealth()

        if healthy && !announced {
            announceRoute()
        } else if !healthy && announced {
            withdrawRoute()
        }

        time.Sleep(checkInterval)
    }
}

Example 4: Health Check (Bash)

#!/bin/bash
# healthcheck.sh - Simple health check in bash

SERVICE_IP="100.10.0.100"
SERVICE_PORT=80
CHECK_INTERVAL=5

ANNOUNCED=0

log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}

check_health() {
    # Use nc (netcat) or curl for health check
    timeout 2 nc -z 127.0.0.1 $SERVICE_PORT 2>/dev/null
    return $?
}

announce_route() {
    echo "announce route ${SERVICE_IP}/32 next-hop self"
    ANNOUNCED=1
    log "ANNOUNCED ${SERVICE_IP}/32"
}

withdraw_route() {
    echo "withdraw route ${SERVICE_IP}/32"
    ANNOUNCED=0
    log "WITHDRAWN ${SERVICE_IP}/32"
}

cleanup() {
    log "Received SIGTERM, cleaning up"
    if [ $ANNOUNCED -eq 1 ]; then
        withdraw_route
    fi
    exit 0
}

trap cleanup SIGTERM SIGINT

log "Starting health check daemon"
sleep 2  # Wait for ExaBGP
log "Ready"

while true; do
    if check_health; then
        if [ $ANNOUNCED -eq 0 ]; then
            announce_route
        fi
    else
        if [ $ANNOUNCED -eq 1 ]; then
            withdraw_route
        fi
    fi

    sleep $CHECK_INTERVAL
done

Testing Your Programs

Manual Testing

Test commands without ExaBGP:

# Test your program's output
./announce.py | head -5

# Expected output:
# announce route 100.10.0.0/24 next-hop self

Test with mock stdin:

# Send test JSON to your receive program
echo '{"type":"state","neighbor":{"state":"up"}}' | ./receive.py

Unit Testing

Python unittest example:

import unittest
from io import StringIO
import sys

class TestHealthCheck(unittest.TestCase):
    def test_tcp_check(self):
        """Test TCP health check"""
        result = tcp_port_check('127.0.0.1', 80, timeout=2)
        self.assertIsInstance(result, bool)

    def test_announce_format(self):
        """Test announce command format"""
        output = StringIO()
        sys.stdout = output

        announce_route('100.10.0.0/24')

        result = output.getvalue()
        self.assertIn('announce route 100.10.0.0/24', result)

        sys.stdout = sys.__stdout__

if __name__ == '__main__':
    unittest.main()

Integration Testing

Test with ExaBGP:

# 1. Start ExaBGP in one terminal
exabgp /etc/exabgp/exabgp.conf

# 2. Check ExaBGP logs for your program output
tail -f /var/log/exabgp.log

# 3. Trigger health changes
# - Stop service -> Should withdraw route
# - Start service -> Should announce route

Common Pitfalls

1. Forgot to Flush

Problem:

print("announce route 100.10.0.0/24 next-hop self")
# Route never announced - stuck in buffer

Solution:

print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()  # Always flush!

2. Process Exits Immediately

Problem:

print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()
# Script exits, ExaBGP restarts it repeatedly

Solution:

print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()

# Keep running
while True:
    time.sleep(60)

3. No Error Handling

Problem:

# Crashes on any error
result = check_health()

Solution:

try:
    result = check_health()
except Exception as e:
    log(f"Health check failed: {e}")
    result = False

4. Route Flapping

Problem:

# Announces and withdraws on every tiny change
if check_health():
    announce()
else:
    withdraw()

Solution:

# Use hysteresis (require N consecutive checks)
if consecutive_healthy >= 3:
    announce()
elif consecutive_unhealthy >= 2:
    withdraw()

5. Redundant Announcements

Problem:

# Announces same route repeatedly
while True:
    announce_route('100.10.0.0/24')  # Wastes bandwidth
    time.sleep(5)

Solution:

# Track state
if healthy and not announced:
    announce_route('100.10.0.0/24')
    announced = True

Best Practices

1. Always Flush Output

sys.stdout.flush()  # After every command

2. Use STDERR for Logging

# STDOUT = commands to ExaBGP
# STDERR = logging (goes to ExaBGP log)
sys.stderr.write(f"[INFO] Service healthy\n")

3. Track State

# Avoid redundant operations
if new_state != old_state:
    take_action()

4. Handle Signals

signal.signal(signal.SIGTERM, shutdown_handler)

5. Use Hysteresis

# Require N consecutive checks before changing state
if consecutive_healthy >= THRESHOLD:
    announce()

6. Add Timeouts

# All I/O operations should timeout
sock.settimeout(2)
urllib.request.urlopen(req, timeout=2)
subprocess.run(cmd, timeout=5)

7. Validate Input

# When receiving JSON
try:
    msg = json.loads(line)
    # Validate structure
    if 'type' not in msg:
        raise ValueError("Missing type field")
except Exception as e:
    log(f"Invalid message: {e}")

8. Log Everything Important

log(f"[STATE] Service: {state}")
log(f"[ACTION] Announced {prefix}")
log(f"[ERROR] Health check failed: {error}")

9. Use Configuration

# Don't hardcode - use config file or environment
SERVICE_IP = os.getenv('SERVICE_IP', '100.10.0.100')
CHECK_INTERVAL = int(os.getenv('CHECK_INTERVAL', '5'))

10. Test Without ExaBGP

# Make your program testable independently
if __name__ == '__main__':
    # Can run without ExaBGP for testing
    main()

See Also


Ready for production? See Production Best Practices β†’


πŸ‘» Ghost written by Claude (Anthropic AI)

Clone this wiki locally