Skip to content

Routing speed #8

@james1236

Description

@james1236

MakeDevice Backend: Python Performance Optimizations

Current Performance Bottlenecks Analysis

Based on the codebase analysis, the following performance bottlenecks have been identified:

1. Concurrency Limitations ⚠️

  • Single-threaded routing: Each job runs in a separate thread but routing is CPU-intensive
  • GIL (Global Interpreter Lock): Python's GIL prevents true parallel execution
  • File I/O blocking: Frequent file operations for progress tracking and keepalive
  • Sequential processing: Left and right routing happen sequentially

2. Algorithmic Inefficiencies ⚠️

  • A pathfinding*: Runs for each socket individually
  • Grid recreation: New grid created for each pathfinding operation
  • Redundant calculations: Same geometric calculations repeated
  • Memory allocation: Large NumPy arrays created frequently

3. I/O Operations ⚠️

  • File-based progress tracking: Writing to disk for each progress update
  • Image generation: Debug images created during routing
  • Keepalive mechanism: File system polling every few seconds
  • Large file operations: Gerber file processing and generation

4. Resource Management ⚠️

  • Memory leaks: Large objects not properly garbage collected
  • File descriptor limits: Multiple file operations without proper cleanup
  • CPU utilization: Single-threaded routing underutilizes multi-core systems

Recommended Optimizations

1. Concurrency & Parallelization 🚀

A. Multiprocessing for CPU-Intensive Tasks

# Current: Single-threaded routing
left_router = BusRouter(board, tracks_layer=top_layer, buses_layer=bottom_layer, side="left")
left_router.route()
right_router = BusRouter(board, tracks_layer=bottom_layer, buses_layer=top_layer, side="right")
right_router.route()

# Optimized: Parallel routing
from multiprocessing import Process, Queue
import multiprocessing as mp

def parallel_routing(board_data, side, result_queue):
    # Create router instance in separate process
    router = BusRouter(board_data, tracks_layer, buses_layer, side)
    result = router.route()
    result_queue.put((side, result))

# Run left and right routing in parallel
mp.set_start_method('spawn', force=True)
result_queue = Queue()
left_process = Process(target=parallel_routing, args=(board_data, "left", result_queue))
right_process = Process(target=parallel_routing, args=(board_data, "right", result_queue))

left_process.start()
right_process.start()
left_process.join()
right_process.join()

B. Thread Pool for I/O Operations

from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio

class OptimizedServer:
    def __init__(self):
        self.thread_pool = ThreadPoolExecutor(max_workers=10)
        self.process_pool = ProcessPoolExecutor(max_workers=4)
    
    async def handle_routing_request(self, project_data):
        # Submit routing to process pool
        future = self.process_pool.submit(self.run_routing, project_data)
        
        # Handle progress updates in thread pool
        progress_future = self.thread_pool.submit(self.monitor_progress, job_id)
        
        return await asyncio.wrap_future(future)

C. Async/Await for I/O Operations

import aiofiles
import asyncio

class AsyncProgressTracker:
    def __init__(self, job_id):
        self.job_id = job_id
        self.progress_file = f"storage/jobs/{job_id}/progress.txt"
    
    async def update_progress(self, progress):
        async with aiofiles.open(self.progress_file, 'w') as f:
            await f.write(str(progress))
    
    async def check_keepalive(self):
        async with aiofiles.open(f"storage/jobs/{job_id}/keepalive_time", 'r') as f:
            content = await f.read()
            return content.strip()

2. Algorithm Optimizations 🧠

A. Batch Pathfinding

class OptimizedBusRouter:
    def __init__(self, board, tracks_layer, buses_layer, side):
        self.board = board
        self.tracks_layer = tracks_layer
        self.buses_layer = buses_layer
        self.side = side
        
        # Pre-compute and cache grids
        self.grid_cache = {}
        self.path_cache = {}
    
    def route_batch(self, socket_groups):
        """Route multiple sockets in batches for better performance"""
        results = []
        
        # Group sockets by proximity for batch processing
        batched_sockets = self._group_sockets_by_proximity(socket_groups)
        
        for batch in batched_sockets:
            # Create shared grid for batch
            shared_grid = self._create_shared_grid(batch)
            
            # Route all sockets in batch using shared grid
            batch_results = self._route_batch_on_grid(shared_grid, batch)
            results.extend(batch_results)
        
        return results
    
    def _group_sockets_by_proximity(self, socket_groups):
        """Group sockets that are close to each other for efficient routing"""
        # Implementation: Use spatial partitioning (quadtree/octree)
        pass

B. Grid Optimization

import numpy as np
from numba import jit

class OptimizedGrid:
    def __init__(self, dimensions, resolution):
        self.dimensions = dimensions
        self.resolution = resolution
        self.grid = np.zeros(dimensions, dtype=np.int8)
        self.obstacle_cache = {}
    
    @jit(nopython=True)
    def mark_obstacles(self, obstacles):
        """JIT-compiled obstacle marking for better performance"""
        for obstacle in obstacles:
            x, y = obstacle
            if 0 <= x < self.grid.shape[0] and 0 <= y < self.grid.shape[1]:
                self.grid[x, y] = 1
    
    def get_subgrid(self, bounds):
        """Extract subgrid for localized pathfinding"""
        x_min, x_max, y_min, y_max = bounds
        return self.grid[x_min:x_max, y_min:y_max].copy()

C. Caching and Memoization

from functools import lru_cache
import hashlib

class PathfindingCache:
    def __init__(self, max_size=1000):
        self.cache = {}
        self.max_size = max_size
    
    def get_cache_key(self, start, end, obstacles_hash):
        """Generate cache key for pathfinding operations"""
        return hashlib.md5(f"{start}_{end}_{obstacles_hash}".encode()).hexdigest()
    
    @lru_cache(maxsize=1000)
    def find_path_cached(self, start, end, obstacles_hash):
        """Cached pathfinding with obstacle hash"""
        return self._find_path(start, end, obstacles_hash)

3. Memory Management 💾

A. Object Pooling

class ObjectPool:
    def __init__(self, object_class, pool_size=100):
        self.object_class = object_class
        self.pool = Queue(maxsize=pool_size)
        self._initialize_pool(pool_size)
    
    def get_object(self):
        """Get object from pool or create new one"""
        try:
            return self.pool.get_nowait()
        except Empty:
            return self.object_class()
    
    def return_object(self, obj):
        """Return object to pool"""
        try:
            obj.reset()  # Reset object state
            self.pool.put_nowait(obj)
        except Full:
            pass  # Pool is full, discard object

B. Memory-Mapped Files

import mmap

class MemoryMappedProgress:
    def __init__(self, job_id):
        self.file_path = f"storage/jobs/{job_id}/progress.mmap"
        self._create_mmap_file()
    
    def _create_mmap_file(self):
        """Create memory-mapped file for progress tracking"""
        with open(self.file_path, 'wb') as f:
            f.write(b'\x00' * 1024)  # 1KB buffer
        
        self.mmap_file = open(self.file_path, 'r+b')
        self.mmap = mmap.mmap(self.mmap_file.fileno(), 1024)
    
    def update_progress(self, progress):
        """Update progress without file I/O"""
        progress_str = f"{progress:.4f}".encode()
        self.mmap.seek(0)
        self.mmap.write(progress_str.ljust(1024, b'\x00'))
        self.mmap.flush()

4. I/O Optimizations 📁

A. Asynchronous File Operations

import aiofiles
import aiofiles.os

class AsyncFileManager:
    def __init__(self, job_id):
        self.job_id = job_id
        self.base_path = f"storage/jobs/{job_id}"
    
    async def write_progress(self, progress):
        """Asynchronous progress writing"""
        async with aiofiles.open(f"{self.base_path}/progress.txt", 'w') as f:
            await f.write(str(progress))
    
    async def read_progress(self):
        """Asynchronous progress reading"""
        try:
            async with aiofiles.open(f"{self.base_path}/progress.txt", 'r') as f:
                return float(await f.read().strip())
        except FileNotFoundError:
            return 0.0
    
    async def cleanup_job_files(self):
        """Asynchronous cleanup of job files"""
        await aiofiles.os.remove(f"{self.base_path}/progress.txt")
        await aiofiles.os.remove(f"{self.base_path}/keepalive_time")

B. Buffered Writing

class BufferedProgressTracker:
    def __init__(self, job_id, buffer_size=100):
        self.job_id = job_id
        self.buffer_size = buffer_size
        self.progress_buffer = []
        self.last_write = 0
    
    def update_progress(self, progress):
        """Buffer progress updates and write periodically"""
        self.progress_buffer.append(progress)
        
        if len(self.progress_buffer) >= self.buffer_size:
            self._flush_buffer()
    
    def _flush_buffer(self):
        """Flush buffered progress updates"""
        if self.progress_buffer:
            latest_progress = self.progress_buffer[-1]
            with open(f"storage/jobs/{self.job_id}/progress.txt", 'w') as f:
                f.write(str(latest_progress))
            self.progress_buffer.clear()

5. Server Architecture Improvements 🏗️

A. FastAPI Migration

from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

app = FastAPI()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.post("/routingStart")
async def routing_start(request: RoutingStartRequest, background_tasks: BackgroundTasks):
    job_id = generate_id()
    
    # Start routing in background
    background_tasks.add_task(run_routing_async, job_id, request.project)
    
    return RoutingStartResponse(endpoint="routingStart", result={"jobId": job_id})

@app.post("/routingProgress")
async def routing_progress(request: RoutingProgressRequest):
    progress = await get_progress_async(request.jobId)
    return RoutingProgressResponse(
        endpoint="routingProgress",
        result=progress
    )

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=3333, workers=4)

B. Redis for State Management

import redis
import json

class RedisJobManager:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    def create_job(self, job_id, project_data):
        """Store job data in Redis"""
        job_data = {
            'status': 'running',
            'progress': 0.0,
            'project': project_data,
            'created_at': time.time()
        }
        self.redis_client.hset(f"job:{job_id}", mapping=job_data)
        self.redis_client.expire(f"job:{job_id}", 3600)  # 1 hour TTL
    
    def update_progress(self, job_id, progress):
        """Update job progress in Redis"""
        self.redis_client.hset(f"job:{job_id}", 'progress', progress)
    
    def get_job_status(self, job_id):
        """Get job status from Redis"""
        job_data = self.redis_client.hgetall(f"job:{job_id}")
        if job_data:
            return {
                'status': job_data[b'status'].decode(),
                'progress': float(job_data[b'progress']),
                'created_at': float(job_data[b'created_at'])
            }
        return None

6. Docker & Deployment Optimizations 🐳

A. Multi-Stage Docker Build

# Build stage
FROM python:3.11-slim as builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --user -r requirements.txt

# Runtime stage
FROM python:3.11-slim

WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY . .

ENV PATH=/root/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1

# Use uvicorn for better performance
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3333", "--workers", "4"]

B. Resource Limits and Monitoring

# docker-compose.yml
version: '3.8'
services:
  makedevice-backend:
    image: makedevice-backend:latest
    ports:
      - "3333:3333"
    environment:
      - MAX_WORKERS=4
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    deploy:
      resources:
        limits:
          cpus: '4.0'
          memory: 8G
        reservations:
          cpus: '2.0'
          memory: 4G
  
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

7. Monitoring & Profiling 📊

A. Performance Monitoring

import time
import psutil
import logging
from functools import wraps

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {}
    
    def monitor(self, operation_name):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                start_memory = psutil.Process().memory_info().rss
                
                result = func(*args, **kwargs)
                
                end_time = time.time()
                end_memory = psutil.Process().memory_info().rss
                
                duration = end_time - start_time
                memory_used = end_memory - start_memory
                
                self.metrics[operation_name] = {
                    'duration': duration,
                    'memory_used': memory_used,
                    'timestamp': start_time
                }
                
                logging.info(f"{operation_name}: {duration:.2f}s, {memory_used/1024/1024:.2f}MB")
                return result
            return wrapper
        return decorator

# Usage
monitor = PerformanceMonitor()

@monitor.monitor("routing_operation")
def route_board(board):
    # Routing logic
    pass

B. Health Checks

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "timestamp": time.time(),
        "memory_usage": psutil.Process().memory_info().rss / 1024 / 1024,
        "cpu_usage": psutil.Process().cpu_percent(),
        "active_jobs": len(active_jobs)
    }

Implementation Priority

Phase 1: Quick Wins (1-2 weeks)

  1. Async file operations - Replace blocking file I/O
  2. Progress buffering - Reduce file write frequency
  3. Memory-mapped progress - Eliminate file I/O for progress
  4. Object pooling - Reduce garbage collection overhead

Phase 2: Algorithm Improvements (2-4 weeks)

  1. Batch pathfinding - Route multiple sockets together
  2. Grid caching - Reuse grids between operations
  3. JIT compilation - Use Numba for critical loops
  4. Spatial partitioning - Group nearby sockets

Phase 3: Architecture Overhaul (4-8 weeks)

  1. FastAPI migration - Better async support
  2. Redis integration - Centralized state management
  3. Multiprocessing - True parallel routing
  4. Microservices - Separate routing from web server

Phase 4: Advanced Optimizations (8-12 weeks)

  1. Custom pathfinding - Optimized for PCB routing
  2. GPU acceleration - CUDA/OpenCL for large grids
  3. Distributed processing - Multiple server instances
  4. Predictive caching - Pre-compute common patterns

Expected Performance Improvements

Optimization Expected Improvement Implementation Effort
Async I/O 20-30% faster Low
Multiprocessing 2-4x faster (multi-core) Medium
Algorithm optimization 50-100% faster High
Memory optimization 30-50% less memory Medium
FastAPI migration 10-20% faster Low
Redis caching 40-60% faster Medium

Monitoring & Validation

Performance Metrics to Track

  • Request latency (p50, p95, p99)
  • Throughput (requests/second)
  • Memory usage (peak and average)
  • CPU utilization (per core)
  • Job completion time (average and distribution)
  • Error rates (routing failures, timeouts)

Load Testing

import asyncio
import aiohttp
import time

async def load_test():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(100):  # 100 concurrent requests
            task = asyncio.create_task(send_routing_request(session, i))
            tasks.append(task)
        
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        
        print(f"Completed {len(results)} requests in {end_time - start_time:.2f}s")
        print(f"Average response time: {(end_time - start_time) / len(results):.2f}s")

Conclusion

These optimizations can significantly improve the performance of the MakeDevice backend under high demand. The key is to implement them incrementally, starting with the quick wins that provide immediate benefits, then moving to more complex architectural changes.

The most impactful optimizations will be:

  1. Multiprocessing for routing - Leverages multi-core systems
  2. Async I/O operations - Reduces blocking
  3. Algorithm improvements - More efficient pathfinding
  4. Memory optimization - Better resource utilization

With these optimizations, the system should be able to handle significantly higher concurrent load while maintaining response times and reliability.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions