-
Notifications
You must be signed in to change notification settings - Fork 458
Writing API Programs
Complete guide to building robust ExaBGP API programs
- Introduction
- Language-Agnostic Patterns
- Choosing Text vs JSON API
- Program Structure
- STDIN/STDOUT Communication
- Error Handling Strategies
- State Management
- Production Patterns
- Complete Examples
- Testing Your Programs
- Common Pitfalls
- Best Practices
ExaBGP API programs are external processes that communicate with ExaBGP via STDIN/STDOUT pipes. This design is language-agnostic, simple, and powerful.
Essential characteristics:
- β Robust error handling - Don't crash on unexpected input
- β Proper buffering - Always flush STDOUT
- β State tracking - Avoid redundant announcements
- β Logging - Use STDERR for diagnostics
- β Signal handling - Graceful shutdown on SIGTERM
- β Health awareness - React to service state changes
Important principle:
π΄ ExaBGP does NOT manipulate RIB/FIB - Your program controls WHEN routes are announced. ExaBGP handles HOW they're sent via BGP. Route installation happens on the router, not in ExaBGP.
ExaBGP works with any language that can read/write streams.
All languages follow this pattern:
1. Wait for ExaBGP to be ready (sleep 2-5 seconds)
2. Initialize state (track what's announced)
3. Enter main loop:
a. Check service health
b. Compare with current state
c. Send commands if state changed
d. FLUSH output (critical!)
e. Sleep/wait for next check
Advantages:
- β Standard library (no dependencies needed)
- β Easy JSON parsing
- β Good string handling
- β Rich libraries (socket, subprocess, etc.)
Basic template:
#!/usr/bin/env python3
import sys
import time
# Wait for ExaBGP
time.sleep(2)
# Main loop
while True:
# Your logic here
command = "announce route 100.10.0.0/24 next-hop self"
sys.stdout.write(command + "\n")
sys.stdout.flush() # CRITICAL
time.sleep(5)Advantages:
- β No dependencies (available everywhere)
- β Simple for basic tasks
- β Easy to wrap existing tools
Disadvantages:
- β Limited JSON parsing
- β Harder to manage state
- β Less robust error handling
Basic template:
#!/bin/bash
# Wait for ExaBGP
sleep 2
# Main loop
while true; do
# Your logic here
echo "announce route 100.10.0.0/24 next-hop self"
sleep 5
doneAdvantages:
- β Fast and efficient
- β Static binary (no dependencies)
- β Excellent concurrency
- β Strong typing
Basic template:
package main
import (
"bufio"
"fmt"
"os"
"time"
)
func main() {
// Wait for ExaBGP
time.Sleep(2 * time.Second)
// Main loop
for {
// Your logic here
fmt.Println("announce route 100.10.0.0/24 next-hop self")
time.Sleep(5 * time.Second)
}
}Basic template:
#!/usr/bin/env ruby
# Wait for ExaBGP
sleep 2
# Main loop
loop do
# Your logic here
puts "announce route 100.10.0.0/24 next-hop self"
STDOUT.flush # CRITICAL
sleep 5
endBasic template:
#!/usr/bin/env node
// Wait for ExaBGP
setTimeout(() => {
// Main loop
setInterval(() => {
// Your logic here
console.log("announce route 100.10.0.0/24 next-hop self");
}, 5000);
}, 2000);Use text encoder when:
- β Only sending commands (announce/withdraw)
- β Simple use cases (static routes, basic health checks)
- β Want human-readable format
- β Testing manually
Configuration:
process announce {
run /etc/exabgp/api/announce.py;
encoder text;
}Example:
print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()Use JSON encoder when:
- β Receiving BGP updates from router
- β Need structured data parsing
- β Building complex integrations
- β Processing routes from peers
Configuration:
process receive {
run /etc/exabgp/api/receive.py;
encoder json;
receive {
parsed;
updates;
neighbor-changes;
}
}Example:
import json
for line in sys.stdin:
msg = json.loads(line)
if msg['type'] == 'update':
# Process BGP updates
passBest practice for complex scenarios:
# Process 1: Send commands (text)
process announce {
run /etc/exabgp/api/announce.py;
encoder text;
}
# Process 2: Receive updates (JSON)
process receive {
run /etc/exabgp/api/receive.py;
encoder json;
receive {
parsed;
updates;
}
}
neighbor 192.168.1.1 {
router-id 192.168.1.2;
local-address 192.168.1.2;
local-as 65001;
peer-as 65000;
api {
processes [ announce, receive ];
}
}Every API program should have:
#!/usr/bin/env python3
"""
program.py - Description of what this does
"""
import sys
import time
import signal
# Configuration
CONFIG = {
'check_interval': 5,
'service_ip': '100.10.0.100',
'service_port': 80
}
# Global state
announced = False
def log(message):
"""Log to STDERR (goes to ExaBGP log)"""
sys.stderr.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}\n")
sys.stderr.flush()
def signal_handler(signum, frame):
"""Handle SIGTERM gracefully"""
log(f"Received signal {signum}, shutting down")
sys.exit(0)
def check_health():
"""Check if service is healthy"""
# Your health check logic
return True
def announce_route(prefix):
"""Announce route to ExaBGP"""
global announced
sys.stdout.write(f"announce route {prefix} next-hop self\n")
sys.stdout.flush()
announced = True
log(f"Announced {prefix}")
def withdraw_route(prefix):
"""Withdraw route from ExaBGP"""
global announced
sys.stdout.write(f"withdraw route {prefix}\n")
sys.stdout.flush()
announced = False
log(f"Withdrawn {prefix}")
def main():
"""Main program loop"""
global announced
# Register signal handler
signal.signal(signal.SIGTERM, signal_handler)
# Wait for ExaBGP
log("Starting, waiting for ExaBGP...")
time.sleep(2)
log("Ready")
# Main loop
prefix = f"{CONFIG['service_ip']}/32"
while True:
try:
healthy = check_health()
if healthy and not announced:
announce_route(prefix)
elif not healthy and announced:
withdraw_route(prefix)
time.sleep(CONFIG['check_interval'])
except Exception as e:
log(f"Error in main loop: {e}")
time.sleep(CONFIG['check_interval'])
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
log("Interrupted by user")
sys.exit(0)
except Exception as e:
log(f"Fatal error: {e}")
sys.exit(1)For programs that process BGP updates:
#!/usr/bin/env python3
"""
receive.py - Process incoming BGP updates
"""
import sys
import json
def log(message):
"""Log to STDERR"""
sys.stderr.write(f"{message}\n")
sys.stderr.flush()
def handle_update(msg):
"""Process UPDATE messages"""
try:
update = msg['neighbor']['message']['update']
# Process announcements
if 'announce' in update:
if 'ipv4 unicast' in update['announce']:
routes = update['announce']['ipv4 unicast']
for prefix, attrs_list in routes.items():
for attrs in attrs_list:
nexthop = attrs.get('next-hop', 'unknown')
log(f"[ANNOUNCE] {prefix} via {nexthop}")
# Process withdrawals
if 'withdraw' in update:
if 'ipv4 unicast' in update['withdraw']:
for prefix in update['withdraw']['ipv4 unicast'].keys():
log(f"[WITHDRAW] {prefix}")
except Exception as e:
log(f"[ERROR] Failed to process update: {e}")
def handle_state(msg):
"""Process STATE messages"""
try:
peer = msg['neighbor']['address']['peer']
state = msg['neighbor']['state']
log(f"[STATE] BGP session with {peer}: {state}")
except Exception as e:
log(f"[ERROR] Failed to process state: {e}")
def handle_notification(msg):
"""Process NOTIFICATION messages"""
try:
peer = msg['neighbor']['address']['peer']
notification = msg['neighbor']['message']['notification']
code = notification.get('code', 'unknown')
log(f"[NOTIFICATION] From {peer}: code={code}")
except Exception as e:
log(f"[ERROR] Failed to process notification: {e}")
def main():
"""Main message processing loop"""
log("[INFO] Starting BGP message processor")
while True:
line = sys.stdin.readline()
if not line:
break # EOF
try:
msg = json.loads(line.strip())
msg_type = msg.get('type', 'unknown')
if msg_type == 'update':
handle_update(msg)
elif msg_type == 'state':
handle_state(msg)
elif msg_type == 'notification':
handle_notification(msg)
elif msg_type == 'keepalive':
# Usually don't need to process
pass
else:
log(f"[WARN] Unknown message type: {msg_type}")
except json.JSONDecodeError as e:
log(f"[ERROR] JSON parse error: {e}")
except Exception as e:
log(f"[ERROR] Processing error: {e}")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
log("[INFO] Interrupted")
sys.exit(0)ALWAYS flush STDOUT:
# WRONG - buffered, ExaBGP won't see it immediately
print("announce route 100.10.0.0/24 next-hop self")
# CORRECT - flushed
print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()
# ALSO CORRECT - flush parameter
print("announce route 100.10.0.0/24 next-hop self", flush=True)
# ALSO CORRECT - write + flush
sys.stdout.write("announce route 100.10.0.0/24 next-hop self\n")
sys.stdout.flush()Python unbuffered mode (recommended):
#!/usr/bin/env python3 -u
# The -u flag disables bufferingOr in configuration:
process announce {
run python3 -u /etc/exabgp/api/announce.py;
encoder text;
}Environment variable:
export PYTHONUNBUFFERED=1
python3 /etc/exabgp/api/announce.pyBlocking read (simple):
while True:
line = sys.stdin.readline()
if not line:
break # EOF
process(line)Non-blocking read with select() (advanced):
import select
# Check if data available with timeout
ready, _, _ = select.select([sys.stdin], [], [], 1.0)
if ready:
line = sys.stdin.readline()
process(line)ExaBGP 4.x and 5.x support ACK responses (enabled by default):
import sys
import select
import time
def wait_for_ack(expected_count=1, timeout=30):
"""
Wait for ACK responses with polling loop.
ExaBGP may not respond immediately, so we poll with sleep.
Handles both text and JSON encoder formats:
- Text: "done", "error", "shutdown"
- JSON: {"answer": "done|error|shutdown", "message": "..."}
"""
import json
received = 0
start_time = time.time()
while received < expected_count:
if time.time() - start_time >= timeout:
return False
ready, _, _ = select.select([sys.stdin], [], [], 0.1)
if ready:
line = sys.stdin.readline().strip()
# Parse response (could be text or JSON)
answer = None
if line.startswith('{'):
try:
data = json.loads(line)
answer = data.get('answer')
except:
pass
else:
answer = line
if answer == "done":
received += 1
elif answer == "error":
return False
elif answer == "shutdown":
raise SystemExit(0)
else:
time.sleep(0.1)
return True
def send_with_ack(command):
"""Send command and wait for ACK"""
sys.stdout.write(command + "\n")
sys.stdout.flush()
return wait_for_ack(expected_count=1)
# Use it
if send_with_ack("announce route 100.10.0.0/24 next-hop self"):
# Command succeeded
pass
else:
# Command failed - handle error
sys.exit(1)Always use try/except:
def check_health():
"""Check service health with error handling"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex(('100.10.0.100', 80))
sock.close()
return result == 0
except socket.error as e:
log(f"[ERROR] Socket error: {e}")
return False
except Exception as e:
log(f"[ERROR] Unexpected error: {e}")
return FalseRetry with exponential backoff:
def retry_with_backoff(func, max_retries=3, initial_delay=1):
"""Retry function with exponential backoff"""
delay = initial_delay
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
log(f"[ERROR] Failed after {max_retries} attempts: {e}")
raise
log(f"[WARN] Attempt {attempt + 1} failed: {e}, retrying in {delay}s")
time.sleep(delay)
delay *= 2 # Exponential backoff
# Use it
def announce():
sys.stdout.write("announce route 100.10.0.0/24 next-hop self\n")
sys.stdout.flush()
retry_with_backoff(announce, max_retries=3)Prevent cascade failures:
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
def call(self, func):
"""Call function with circuit breaker protection"""
if self.state == 'open':
# Check if timeout expired
if time.time() - self.last_failure_time >= self.timeout:
self.state = 'half-open'
log("[CIRCUIT] Half-open, trying again")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func()
# Success - reset circuit
if self.state == 'half-open':
self.state = 'closed'
self.failure_count = 0
log("[CIRCUIT] Closed, service recovered")
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
log(f"[CIRCUIT] OPEN after {self.failure_count} failures")
raise
# Use it
breaker = CircuitBreaker(failure_threshold=5, timeout=60)
while True:
try:
healthy = breaker.call(check_health)
if healthy:
announce_route()
except Exception as e:
log(f"[ERROR] Circuit breaker: {e}")
time.sleep(5)Avoid redundant announcements:
# Global state tracking
announced_routes = set()
def announce_if_needed(prefix):
"""Only announce if not already announced"""
global announced_routes
if prefix not in announced_routes:
sys.stdout.write(f"announce route {prefix} next-hop self\n")
sys.stdout.flush()
announced_routes.add(prefix)
log(f"[ANNOUNCE] {prefix}")
else:
log(f"[SKIP] {prefix} already announced")
def withdraw_if_needed(prefix):
"""Only withdraw if currently announced"""
global announced_routes
if prefix in announced_routes:
sys.stdout.write(f"withdraw route {prefix}\n")
sys.stdout.flush()
announced_routes.remove(prefix)
log(f"[WITHDRAW] {prefix}")
else:
log(f"[SKIP] {prefix} not announced")Formal state management:
class ServiceState:
def __init__(self):
self.current_state = 'unknown'
self.route_announced = False
def transition(self, new_state):
"""Handle state transitions"""
if new_state == self.current_state:
return # No change
log(f"[STATE] {self.current_state} -> {new_state}")
old_state = self.current_state
self.current_state = new_state
# Handle transitions
if new_state == 'healthy' and not self.route_announced:
self.announce()
elif new_state == 'unhealthy' and self.route_announced:
self.withdraw()
def announce(self):
"""Announce route"""
sys.stdout.write("announce route 100.10.0.100/32 next-hop self\n")
sys.stdout.flush()
self.route_announced = True
log("[ACTION] Route announced")
def withdraw(self):
"""Withdraw route"""
sys.stdout.write("withdraw route 100.10.0.100/32\n")
sys.stdout.flush()
self.route_announced = False
log("[ACTION] Route withdrawn")
# Use it
state = ServiceState()
while True:
healthy = check_health()
state.transition('healthy' if healthy else 'unhealthy')
time.sleep(5)HTTP health check:
import urllib.request
def http_health_check(url, timeout=2):
"""HTTP GET health check"""
try:
req = urllib.request.Request(url)
response = urllib.request.urlopen(req, timeout=timeout)
return response.status == 200
except Exception as e:
log(f"[HEALTH] HTTP check failed: {e}")
return False
# Use it
if http_health_check('http://127.0.0.1:8080/health'):
announce_route('100.10.0.100/32')TCP port check:
import socket
def tcp_port_check(host, port, timeout=2):
"""TCP connection check"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except Exception as e:
log(f"[HEALTH] TCP check failed: {e}")
return False
# Use it
if tcp_port_check('127.0.0.1', 80):
announce_route('100.10.0.100/32')Command execution check:
import subprocess
def command_health_check(command, timeout=5):
"""Execute command, check exit code"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
timeout=timeout
)
return result.returncode == 0
except Exception as e:
log(f"[HEALTH] Command check failed: {e}")
return False
# Use it
if command_health_check('curl -sf http://localhost/health'):
announce_route('100.10.0.100/32')Prevent route flapping:
class HealthTracker:
def __init__(self, threshold_up=3, threshold_down=2):
self.threshold_up = threshold_up
self.threshold_down = threshold_down
self.consecutive_up = 0
self.consecutive_down = 0
self.current_state = 'down'
def update(self, healthy):
"""Update health state with hysteresis"""
if healthy:
self.consecutive_up += 1
self.consecutive_down = 0
if self.consecutive_up >= self.threshold_up:
if self.current_state != 'up':
log(f"[HEALTH] Service UP (after {self.consecutive_up} checks)")
self.current_state = 'up'
else:
self.consecutive_down += 1
self.consecutive_up = 0
if self.consecutive_down >= self.threshold_down:
if self.current_state != 'down':
log(f"[HEALTH] Service DOWN (after {self.consecutive_down} checks)")
self.current_state = 'down'
return self.current_state
# Use it
health = HealthTracker(threshold_up=3, threshold_down=2)
while True:
check_result = check_health()
state = health.update(check_result)
if state == 'up':
announce_route('100.10.0.100/32')
else:
withdraw_route('100.10.0.100/32')
time.sleep(5)Handle SIGTERM properly:
import signal
def shutdown_handler(signum, frame):
"""Handle shutdown signal"""
log("[SHUTDOWN] Received SIGTERM, cleaning up...")
# Withdraw all announced routes
for prefix in announced_routes:
sys.stdout.write(f"withdraw route {prefix}\n")
sys.stdout.flush()
log(f"[CLEANUP] Withdrawn {prefix}")
log("[SHUTDOWN] Complete")
sys.exit(0)
# Register handler
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)#!/usr/bin/env python3
"""
healthcheck.py - Production-grade health check for ExaBGP
"""
import sys
import time
import socket
import signal
import urllib.request
# Configuration
SERVICE_IP = "100.10.0.100"
SERVICE_PORT = 80
CHECK_INTERVAL = 5
HEALTH_THRESHOLD_UP = 3
HEALTH_THRESHOLD_DOWN = 2
TIMEOUT = 2
# State
announced = False
consecutive_healthy = 0
consecutive_unhealthy = 0
def log(message):
"""Log to STDERR"""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
sys.stderr.write(f"[{timestamp}] {message}\n")
sys.stderr.flush()
def signal_handler(signum, frame):
"""Handle shutdown gracefully"""
log(f"Received signal {signum}, withdrawing routes and exiting")
if announced:
sys.stdout.write(f"withdraw route {SERVICE_IP}/32\n")
sys.stdout.flush()
sys.exit(0)
def check_tcp_port(host, port, timeout):
"""Check if TCP port is open"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except Exception as e:
log(f"TCP check error: {e}")
return False
def check_http(url, timeout):
"""Check HTTP endpoint"""
try:
req = urllib.request.Request(url)
response = urllib.request.urlopen(req, timeout=timeout)
return response.status == 200
except Exception as e:
log(f"HTTP check error: {e}")
return False
def is_healthy():
"""Perform health checks"""
# TCP port check
if not check_tcp_port('127.0.0.1', SERVICE_PORT, TIMEOUT):
return False
# HTTP health endpoint check
if not check_http(f'http://127.0.0.1:{SERVICE_PORT}/health', TIMEOUT):
return False
return True
def announce_route():
"""Announce route"""
global announced
sys.stdout.write(f"announce route {SERVICE_IP}/32 next-hop self\n")
sys.stdout.flush()
announced = True
log(f"ANNOUNCED {SERVICE_IP}/32")
def withdraw_route():
"""Withdraw route"""
global announced
sys.stdout.write(f"withdraw route {SERVICE_IP}/32\n")
sys.stdout.flush()
announced = False
log(f"WITHDRAWN {SERVICE_IP}/32")
def main():
"""Main loop with hysteresis"""
global consecutive_healthy, consecutive_unhealthy
# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
log("Starting health check daemon")
time.sleep(2) # Wait for ExaBGP
log("Ready")
while True:
try:
healthy = is_healthy()
if healthy:
consecutive_healthy += 1
consecutive_unhealthy = 0
# Announce after N consecutive healthy checks
if consecutive_healthy >= HEALTH_THRESHOLD_UP and not announced:
announce_route()
else:
consecutive_unhealthy += 1
consecutive_healthy = 0
# Withdraw after N consecutive unhealthy checks
if consecutive_unhealthy >= HEALTH_THRESHOLD_DOWN and announced:
withdraw_route()
time.sleep(CHECK_INTERVAL)
except Exception as e:
log(f"Error in main loop: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == '__main__':
main()#!/usr/bin/env python3
"""
route_monitor.py - Monitor BGP routes and trigger actions
"""
import sys
import json
import subprocess
# Track routes
routes = {}
def log(message):
"""Log to STDERR"""
sys.stderr.write(f"{message}\n")
sys.stderr.flush()
def trigger_script(action, prefix, nexthop):
"""Trigger external script on route change"""
script = f"/etc/exabgp/scripts/on_{action}.sh"
try:
subprocess.run([script, prefix, nexthop], timeout=5)
log(f"[TRIGGER] Executed {script} for {prefix}")
except Exception as e:
log(f"[ERROR] Script execution failed: {e}")
def handle_announcement(prefix, attrs):
"""Handle route announcement"""
nexthop = attrs.get('next-hop', 'unknown')
if prefix not in routes:
log(f"[NEW] {prefix} via {nexthop}")
routes[prefix] = attrs
trigger_script('announce', prefix, nexthop)
else:
log(f"[UPDATE] {prefix} via {nexthop}")
routes[prefix] = attrs
def handle_withdrawal(prefix):
"""Handle route withdrawal"""
if prefix in routes:
nexthop = routes[prefix].get('next-hop', 'unknown')
log(f"[WITHDRAWN] {prefix}")
del routes[prefix]
trigger_script('withdraw', prefix, nexthop)
def main():
"""Main message processing loop"""
log("[START] BGP route monitor")
while True:
line = sys.stdin.readline()
if not line:
break
try:
msg = json.loads(line.strip())
if msg['type'] == 'update':
update = msg['neighbor']['message']['update']
# Process announcements
if 'announce' in update:
if 'ipv4 unicast' in update['announce']:
for prefix, attrs_list in update['announce']['ipv4 unicast'].items():
handle_announcement(prefix, attrs_list[0])
# Process withdrawals
if 'withdraw' in update:
if 'ipv4 unicast' in update['withdraw']:
for prefix in update['withdraw']['ipv4 unicast'].keys():
handle_withdrawal(prefix)
elif msg['type'] == 'state':
peer = msg['neighbor']['address']['peer']
state = msg['neighbor']['state']
log(f"[STATE] BGP session {peer}: {state}")
except json.JSONDecodeError as e:
log(f"[ERROR] JSON parse error: {e}")
except Exception as e:
log(f"[ERROR] Processing error: {e}")
if __name__ == '__main__':
main()package main
import (
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"
)
const (
serviceIP = "100.10.0.100"
servicePort = 80
checkInterval = 5 * time.Second
)
var announced = false
func log(message string) {
timestamp := time.Now().Format("2006-01-02 15:04:05")
fmt.Fprintf(os.Stderr, "[%s] %s\n", timestamp, message)
}
func checkHealth() bool {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", servicePort), 2*time.Second)
if err != nil {
return false
}
conn.Close()
return true
}
func announceRoute() {
fmt.Printf("announce route %s/32 next-hop self\n", serviceIP)
announced = true
log(fmt.Sprintf("ANNOUNCED %s/32", serviceIP))
}
func withdrawRoute() {
fmt.Printf("withdraw route %s/32\n", serviceIP)
announced = false
log(fmt.Sprintf("WITHDRAWN %s/32", serviceIP))
}
func main() {
// Handle signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
sig := <-sigChan
log(fmt.Sprintf("Received signal %v, exiting", sig))
if announced {
withdrawRoute()
}
os.Exit(0)
}()
log("Starting health check daemon")
time.Sleep(2 * time.Second) // Wait for ExaBGP
log("Ready")
for {
healthy := checkHealth()
if healthy && !announced {
announceRoute()
} else if !healthy && announced {
withdrawRoute()
}
time.Sleep(checkInterval)
}
}#!/bin/bash
# healthcheck.sh - Simple health check in bash
SERVICE_IP="100.10.0.100"
SERVICE_PORT=80
CHECK_INTERVAL=5
ANNOUNCED=0
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}
check_health() {
# Use nc (netcat) or curl for health check
timeout 2 nc -z 127.0.0.1 $SERVICE_PORT 2>/dev/null
return $?
}
announce_route() {
echo "announce route ${SERVICE_IP}/32 next-hop self"
ANNOUNCED=1
log "ANNOUNCED ${SERVICE_IP}/32"
}
withdraw_route() {
echo "withdraw route ${SERVICE_IP}/32"
ANNOUNCED=0
log "WITHDRAWN ${SERVICE_IP}/32"
}
cleanup() {
log "Received SIGTERM, cleaning up"
if [ $ANNOUNCED -eq 1 ]; then
withdraw_route
fi
exit 0
}
trap cleanup SIGTERM SIGINT
log "Starting health check daemon"
sleep 2 # Wait for ExaBGP
log "Ready"
while true; do
if check_health; then
if [ $ANNOUNCED -eq 0 ]; then
announce_route
fi
else
if [ $ANNOUNCED -eq 1 ]; then
withdraw_route
fi
fi
sleep $CHECK_INTERVAL
doneTest commands without ExaBGP:
# Test your program's output
./announce.py | head -5
# Expected output:
# announce route 100.10.0.0/24 next-hop selfTest with mock stdin:
# Send test JSON to your receive program
echo '{"type":"state","neighbor":{"state":"up"}}' | ./receive.pyPython unittest example:
import unittest
from io import StringIO
import sys
class TestHealthCheck(unittest.TestCase):
def test_tcp_check(self):
"""Test TCP health check"""
result = tcp_port_check('127.0.0.1', 80, timeout=2)
self.assertIsInstance(result, bool)
def test_announce_format(self):
"""Test announce command format"""
output = StringIO()
sys.stdout = output
announce_route('100.10.0.0/24')
result = output.getvalue()
self.assertIn('announce route 100.10.0.0/24', result)
sys.stdout = sys.__stdout__
if __name__ == '__main__':
unittest.main()Test with ExaBGP:
# 1. Start ExaBGP in one terminal
exabgp /etc/exabgp/exabgp.conf
# 2. Check ExaBGP logs for your program output
tail -f /var/log/exabgp.log
# 3. Trigger health changes
# - Stop service -> Should withdraw route
# - Start service -> Should announce routeProblem:
print("announce route 100.10.0.0/24 next-hop self")
# Route never announced - stuck in bufferSolution:
print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush() # Always flush!Problem:
print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()
# Script exits, ExaBGP restarts it repeatedlySolution:
print("announce route 100.10.0.0/24 next-hop self")
sys.stdout.flush()
# Keep running
while True:
time.sleep(60)Problem:
# Crashes on any error
result = check_health()Solution:
try:
result = check_health()
except Exception as e:
log(f"Health check failed: {e}")
result = FalseProblem:
# Announces and withdraws on every tiny change
if check_health():
announce()
else:
withdraw()Solution:
# Use hysteresis (require N consecutive checks)
if consecutive_healthy >= 3:
announce()
elif consecutive_unhealthy >= 2:
withdraw()Problem:
# Announces same route repeatedly
while True:
announce_route('100.10.0.0/24') # Wastes bandwidth
time.sleep(5)Solution:
# Track state
if healthy and not announced:
announce_route('100.10.0.0/24')
announced = Truesys.stdout.flush() # After every command# STDOUT = commands to ExaBGP
# STDERR = logging (goes to ExaBGP log)
sys.stderr.write(f"[INFO] Service healthy\n")# Avoid redundant operations
if new_state != old_state:
take_action()signal.signal(signal.SIGTERM, shutdown_handler)# Require N consecutive checks before changing state
if consecutive_healthy >= THRESHOLD:
announce()# All I/O operations should timeout
sock.settimeout(2)
urllib.request.urlopen(req, timeout=2)
subprocess.run(cmd, timeout=5)# When receiving JSON
try:
msg = json.loads(line)
# Validate structure
if 'type' not in msg:
raise ValueError("Missing type field")
except Exception as e:
log(f"Invalid message: {e}")log(f"[STATE] Service: {state}")
log(f"[ACTION] Announced {prefix}")
log(f"[ERROR] Health check failed: {error}")# Don't hardcode - use config file or environment
SERVICE_IP = os.getenv('SERVICE_IP', '100.10.0.100')
CHECK_INTERVAL = int(os.getenv('CHECK_INTERVAL', '5'))# Make your program testable independently
if __name__ == '__main__':
# Can run without ExaBGP for testing
main()- API Overview - API architecture and concepts
- Text API Reference - Command syntax reference
- JSON API Reference - JSON message format
- Error Handling - Comprehensive error handling guide
- Production Best Practices - Production deployment
- Service High Availability - HA patterns
- Configuration Syntax - Process configuration
Ready for production? See Production Best Practices β
π» Ghost written by Claude (Anthropic AI)
π Home
π Getting Started
π§ API
π‘οΈ Use Cases
π Address Families
βοΈ Configuration
π Operations
π Reference
- Architecture
- BGP State Machine
- Communities (RFC)
- Extended Communities
- BGP Ecosystem
- Capabilities (AFI/SAFI)
- RFC Support
π Migration
π Community
π External
- GitHub Repo β
- Slack β
- Issues β
π» Ghost written by Claude (Anthropic AI)