-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
MakeDevice Backend: Python Performance Optimizations
Current Performance Bottlenecks Analysis
Based on the codebase analysis, the following performance bottlenecks have been identified:
1. Concurrency Limitations ⚠️
- Single-threaded routing: Each job runs in a separate thread but routing is CPU-intensive
- GIL (Global Interpreter Lock): Python's GIL prevents true parallel execution
- File I/O blocking: Frequent file operations for progress tracking and keepalive
- Sequential processing: Left and right routing happen sequentially
2. Algorithmic Inefficiencies ⚠️
- A pathfinding*: Runs for each socket individually
- Grid recreation: New grid created for each pathfinding operation
- Redundant calculations: Same geometric calculations repeated
- Memory allocation: Large NumPy arrays created frequently
3. I/O Operations ⚠️
- File-based progress tracking: Writing to disk for each progress update
- Image generation: Debug images created during routing
- Keepalive mechanism: File system polling every few seconds
- Large file operations: Gerber file processing and generation
4. Resource Management ⚠️
- Memory leaks: Large objects not properly garbage collected
- File descriptor limits: Multiple file operations without proper cleanup
- CPU utilization: Single-threaded routing underutilizes multi-core systems
Recommended Optimizations
1. Concurrency & Parallelization 🚀
A. Multiprocessing for CPU-Intensive Tasks
# Current: Single-threaded routing
left_router = BusRouter(board, tracks_layer=top_layer, buses_layer=bottom_layer, side="left")
left_router.route()
right_router = BusRouter(board, tracks_layer=bottom_layer, buses_layer=top_layer, side="right")
right_router.route()
# Optimized: Parallel routing
from multiprocessing import Process, Queue
import multiprocessing as mp
def parallel_routing(board_data, side, result_queue):
# Create router instance in separate process
router = BusRouter(board_data, tracks_layer, buses_layer, side)
result = router.route()
result_queue.put((side, result))
# Run left and right routing in parallel
mp.set_start_method('spawn', force=True)
result_queue = Queue()
left_process = Process(target=parallel_routing, args=(board_data, "left", result_queue))
right_process = Process(target=parallel_routing, args=(board_data, "right", result_queue))
left_process.start()
right_process.start()
left_process.join()
right_process.join()B. Thread Pool for I/O Operations
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
class OptimizedServer:
def __init__(self):
self.thread_pool = ThreadPoolExecutor(max_workers=10)
self.process_pool = ProcessPoolExecutor(max_workers=4)
async def handle_routing_request(self, project_data):
# Submit routing to process pool
future = self.process_pool.submit(self.run_routing, project_data)
# Handle progress updates in thread pool
progress_future = self.thread_pool.submit(self.monitor_progress, job_id)
return await asyncio.wrap_future(future)C. Async/Await for I/O Operations
import aiofiles
import asyncio
class AsyncProgressTracker:
def __init__(self, job_id):
self.job_id = job_id
self.progress_file = f"storage/jobs/{job_id}/progress.txt"
async def update_progress(self, progress):
async with aiofiles.open(self.progress_file, 'w') as f:
await f.write(str(progress))
async def check_keepalive(self):
async with aiofiles.open(f"storage/jobs/{job_id}/keepalive_time", 'r') as f:
content = await f.read()
return content.strip()2. Algorithm Optimizations 🧠
A. Batch Pathfinding
class OptimizedBusRouter:
def __init__(self, board, tracks_layer, buses_layer, side):
self.board = board
self.tracks_layer = tracks_layer
self.buses_layer = buses_layer
self.side = side
# Pre-compute and cache grids
self.grid_cache = {}
self.path_cache = {}
def route_batch(self, socket_groups):
"""Route multiple sockets in batches for better performance"""
results = []
# Group sockets by proximity for batch processing
batched_sockets = self._group_sockets_by_proximity(socket_groups)
for batch in batched_sockets:
# Create shared grid for batch
shared_grid = self._create_shared_grid(batch)
# Route all sockets in batch using shared grid
batch_results = self._route_batch_on_grid(shared_grid, batch)
results.extend(batch_results)
return results
def _group_sockets_by_proximity(self, socket_groups):
"""Group sockets that are close to each other for efficient routing"""
# Implementation: Use spatial partitioning (quadtree/octree)
passB. Grid Optimization
import numpy as np
from numba import jit
class OptimizedGrid:
def __init__(self, dimensions, resolution):
self.dimensions = dimensions
self.resolution = resolution
self.grid = np.zeros(dimensions, dtype=np.int8)
self.obstacle_cache = {}
@jit(nopython=True)
def mark_obstacles(self, obstacles):
"""JIT-compiled obstacle marking for better performance"""
for obstacle in obstacles:
x, y = obstacle
if 0 <= x < self.grid.shape[0] and 0 <= y < self.grid.shape[1]:
self.grid[x, y] = 1
def get_subgrid(self, bounds):
"""Extract subgrid for localized pathfinding"""
x_min, x_max, y_min, y_max = bounds
return self.grid[x_min:x_max, y_min:y_max].copy()C. Caching and Memoization
from functools import lru_cache
import hashlib
class PathfindingCache:
def __init__(self, max_size=1000):
self.cache = {}
self.max_size = max_size
def get_cache_key(self, start, end, obstacles_hash):
"""Generate cache key for pathfinding operations"""
return hashlib.md5(f"{start}_{end}_{obstacles_hash}".encode()).hexdigest()
@lru_cache(maxsize=1000)
def find_path_cached(self, start, end, obstacles_hash):
"""Cached pathfinding with obstacle hash"""
return self._find_path(start, end, obstacles_hash)3. Memory Management 💾
A. Object Pooling
class ObjectPool:
def __init__(self, object_class, pool_size=100):
self.object_class = object_class
self.pool = Queue(maxsize=pool_size)
self._initialize_pool(pool_size)
def get_object(self):
"""Get object from pool or create new one"""
try:
return self.pool.get_nowait()
except Empty:
return self.object_class()
def return_object(self, obj):
"""Return object to pool"""
try:
obj.reset() # Reset object state
self.pool.put_nowait(obj)
except Full:
pass # Pool is full, discard objectB. Memory-Mapped Files
import mmap
class MemoryMappedProgress:
def __init__(self, job_id):
self.file_path = f"storage/jobs/{job_id}/progress.mmap"
self._create_mmap_file()
def _create_mmap_file(self):
"""Create memory-mapped file for progress tracking"""
with open(self.file_path, 'wb') as f:
f.write(b'\x00' * 1024) # 1KB buffer
self.mmap_file = open(self.file_path, 'r+b')
self.mmap = mmap.mmap(self.mmap_file.fileno(), 1024)
def update_progress(self, progress):
"""Update progress without file I/O"""
progress_str = f"{progress:.4f}".encode()
self.mmap.seek(0)
self.mmap.write(progress_str.ljust(1024, b'\x00'))
self.mmap.flush()4. I/O Optimizations 📁
A. Asynchronous File Operations
import aiofiles
import aiofiles.os
class AsyncFileManager:
def __init__(self, job_id):
self.job_id = job_id
self.base_path = f"storage/jobs/{job_id}"
async def write_progress(self, progress):
"""Asynchronous progress writing"""
async with aiofiles.open(f"{self.base_path}/progress.txt", 'w') as f:
await f.write(str(progress))
async def read_progress(self):
"""Asynchronous progress reading"""
try:
async with aiofiles.open(f"{self.base_path}/progress.txt", 'r') as f:
return float(await f.read().strip())
except FileNotFoundError:
return 0.0
async def cleanup_job_files(self):
"""Asynchronous cleanup of job files"""
await aiofiles.os.remove(f"{self.base_path}/progress.txt")
await aiofiles.os.remove(f"{self.base_path}/keepalive_time")B. Buffered Writing
class BufferedProgressTracker:
def __init__(self, job_id, buffer_size=100):
self.job_id = job_id
self.buffer_size = buffer_size
self.progress_buffer = []
self.last_write = 0
def update_progress(self, progress):
"""Buffer progress updates and write periodically"""
self.progress_buffer.append(progress)
if len(self.progress_buffer) >= self.buffer_size:
self._flush_buffer()
def _flush_buffer(self):
"""Flush buffered progress updates"""
if self.progress_buffer:
latest_progress = self.progress_buffer[-1]
with open(f"storage/jobs/{self.job_id}/progress.txt", 'w') as f:
f.write(str(latest_progress))
self.progress_buffer.clear()5. Server Architecture Improvements 🏗️
A. FastAPI Migration
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/routingStart")
async def routing_start(request: RoutingStartRequest, background_tasks: BackgroundTasks):
job_id = generate_id()
# Start routing in background
background_tasks.add_task(run_routing_async, job_id, request.project)
return RoutingStartResponse(endpoint="routingStart", result={"jobId": job_id})
@app.post("/routingProgress")
async def routing_progress(request: RoutingProgressRequest):
progress = await get_progress_async(request.jobId)
return RoutingProgressResponse(
endpoint="routingProgress",
result=progress
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=3333, workers=4)B. Redis for State Management
import redis
import json
class RedisJobManager:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
def create_job(self, job_id, project_data):
"""Store job data in Redis"""
job_data = {
'status': 'running',
'progress': 0.0,
'project': project_data,
'created_at': time.time()
}
self.redis_client.hset(f"job:{job_id}", mapping=job_data)
self.redis_client.expire(f"job:{job_id}", 3600) # 1 hour TTL
def update_progress(self, job_id, progress):
"""Update job progress in Redis"""
self.redis_client.hset(f"job:{job_id}", 'progress', progress)
def get_job_status(self, job_id):
"""Get job status from Redis"""
job_data = self.redis_client.hgetall(f"job:{job_id}")
if job_data:
return {
'status': job_data[b'status'].decode(),
'progress': float(job_data[b'progress']),
'created_at': float(job_data[b'created_at'])
}
return None6. Docker & Deployment Optimizations 🐳
A. Multi-Stage Docker Build
# Build stage
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user -r requirements.txt
# Runtime stage
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY . .
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1
# Use uvicorn for better performance
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3333", "--workers", "4"]B. Resource Limits and Monitoring
# docker-compose.yml
version: '3.8'
services:
makedevice-backend:
image: makedevice-backend:latest
ports:
- "3333:3333"
environment:
- MAX_WORKERS=4
- REDIS_URL=redis://redis:6379
depends_on:
- redis
deploy:
resources:
limits:
cpus: '4.0'
memory: 8G
reservations:
cpus: '2.0'
memory: 4G
redis:
image: redis:alpine
ports:
- "6379:6379"7. Monitoring & Profiling 📊
A. Performance Monitoring
import time
import psutil
import logging
from functools import wraps
class PerformanceMonitor:
def __init__(self):
self.metrics = {}
def monitor(self, operation_name):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
start_memory = psutil.Process().memory_info().rss
result = func(*args, **kwargs)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
duration = end_time - start_time
memory_used = end_memory - start_memory
self.metrics[operation_name] = {
'duration': duration,
'memory_used': memory_used,
'timestamp': start_time
}
logging.info(f"{operation_name}: {duration:.2f}s, {memory_used/1024/1024:.2f}MB")
return result
return wrapper
return decorator
# Usage
monitor = PerformanceMonitor()
@monitor.monitor("routing_operation")
def route_board(board):
# Routing logic
passB. Health Checks
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": time.time(),
"memory_usage": psutil.Process().memory_info().rss / 1024 / 1024,
"cpu_usage": psutil.Process().cpu_percent(),
"active_jobs": len(active_jobs)
}Implementation Priority
Phase 1: Quick Wins (1-2 weeks)
- Async file operations - Replace blocking file I/O
- Progress buffering - Reduce file write frequency
- Memory-mapped progress - Eliminate file I/O for progress
- Object pooling - Reduce garbage collection overhead
Phase 2: Algorithm Improvements (2-4 weeks)
- Batch pathfinding - Route multiple sockets together
- Grid caching - Reuse grids between operations
- JIT compilation - Use Numba for critical loops
- Spatial partitioning - Group nearby sockets
Phase 3: Architecture Overhaul (4-8 weeks)
- FastAPI migration - Better async support
- Redis integration - Centralized state management
- Multiprocessing - True parallel routing
- Microservices - Separate routing from web server
Phase 4: Advanced Optimizations (8-12 weeks)
- Custom pathfinding - Optimized for PCB routing
- GPU acceleration - CUDA/OpenCL for large grids
- Distributed processing - Multiple server instances
- Predictive caching - Pre-compute common patterns
Expected Performance Improvements
| Optimization | Expected Improvement | Implementation Effort |
|---|---|---|
| Async I/O | 20-30% faster | Low |
| Multiprocessing | 2-4x faster (multi-core) | Medium |
| Algorithm optimization | 50-100% faster | High |
| Memory optimization | 30-50% less memory | Medium |
| FastAPI migration | 10-20% faster | Low |
| Redis caching | 40-60% faster | Medium |
Monitoring & Validation
Performance Metrics to Track
- Request latency (p50, p95, p99)
- Throughput (requests/second)
- Memory usage (peak and average)
- CPU utilization (per core)
- Job completion time (average and distribution)
- Error rates (routing failures, timeouts)
Load Testing
import asyncio
import aiohttp
import time
async def load_test():
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(100): # 100 concurrent requests
task = asyncio.create_task(send_routing_request(session, i))
tasks.append(task)
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"Completed {len(results)} requests in {end_time - start_time:.2f}s")
print(f"Average response time: {(end_time - start_time) / len(results):.2f}s")Conclusion
These optimizations can significantly improve the performance of the MakeDevice backend under high demand. The key is to implement them incrementally, starting with the quick wins that provide immediate benefits, then moving to more complex architectural changes.
The most impactful optimizations will be:
- Multiprocessing for routing - Leverages multi-core systems
- Async I/O operations - Reduces blocking
- Algorithm improvements - More efficient pathfinding
- Memory optimization - Better resource utilization
With these optimizations, the system should be able to handle significantly higher concurrent load while maintaining response times and reliability.
Metadata
Metadata
Assignees
Labels
No labels