Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 11, 2025

📄 17% (0.17x) speedup for ui_get_spend_by_tags in litellm/proxy/spend_tracking/spend_management_endpoints.py

⏱️ Runtime : 2.01 milliseconds 1.72 milliseconds (best of 114 runs)

📝 Explanation and details

The optimization achieves a 16% runtime speedup by replacing collections.defaultdict with plain dictionaries and optimizing data aggregation patterns.

Key optimizations:

  1. Eliminated defaultdict overhead: Replaced collections.defaultdict(float) and collections.defaultdict(int) with plain dict[str, float] and dict[str, int]. The line profiler shows defaultdict creation taking ~700ns total (lines with collections.defaultdict), which is completely eliminated.

  2. Optimized aggregation logic:

    • For the "all-tags" path: Uses dict.get(key, default) + value instead of defaultdict's += operator, reducing per-key lookup overhead
    • For the "specific-tags" path: Uses direct assignment since the SQL query already aggregates data (one row per tag), eliminating unnecessary += operations entirely
  3. Reduced dictionary method calls: The line profiler shows the original code's total_spend_per_tag[tag_name] += tag_spend taking ~368ns per hit, while the optimized version's get() + assignment pattern is more efficient for this access pattern.

Performance impact: Plain dictionaries have lower per-operation overhead than defaultdict for this use case where we know the access patterns upfront. The optimization is particularly effective for the "specific-tags" code path where direct assignment replaces unnecessary aggregation operations.

Test case benefits: The throughput improvement of 0.9% (95,760 vs 94,920 ops/sec) shows consistent gains across concurrent workloads. The optimization performs well across all test scenarios - from basic single calls to high-load concurrent execution with 500+ simultaneous requests.

This optimization maintains identical behavior while reducing CPU overhead in data structure operations, making it especially valuable for high-frequency spend tracking operations.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 838 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime

import asyncio # used to run async functions

function to test

import collections
from typing import TYPE_CHECKING, Any, List, Optional

import pytest # used for our unit tests
from fastapi import HTTPException
from litellm.proxy.spend_tracking.spend_management_endpoints import
ui_get_spend_by_tags

class DummyDB:
"""
Dummy DB class to simulate async query_raw behavior.
"""
def init(self, response_mapping):
self.response_mapping = response_mapping
self.calls = []

async def query_raw(self, sql_query, *args):
    # Store call for inspection
    self.calls.append((sql_query, args))
    # Return based on tags_list presence
    if len(args) == 2:
        # "all-tags" query
        return self.response_mapping.get("all_tags", [])
    elif len(args) == 3:
        # specific tags query
        # args[2] is tags_list
        tags_list = args[2]
        return self.response_mapping.get("specific_tags", [])
    return []

class DummyPrismaClient:
"""
Dummy PrismaClient to simulate prisma_client.db.query_raw
"""
def init(self, response_mapping):
self.db = DummyDB(response_mapping)
from litellm.proxy.spend_tracking.spend_management_endpoints import
ui_get_spend_by_tags

########## UNIT TESTS ##########

1. Basic Test Cases

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_basic_all_tags():
"""
Test basic functionality for 'all-tags' (should aggregate all tags).
"""
# Simulate two tags in DB response
response_mapping = {
"all_tags": [
{"individual_request_tag": "tagA", "spend_date": "2024-06-01", "log_count": 5, "total_spend": 10.25},
{"individual_request_tag": "tagB", "spend_date": "2024-06-01", "log_count": 3, "total_spend": 5.75},
]
}
prisma_client = DummyPrismaClient(response_mapping)
result = await ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "all-tags")

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_basic_specific_tags():
"""
Test basic functionality for specific tags.
"""
# Simulate DB response for specific tags
response_mapping = {
"specific_tags": [
{"individual_request_tag": "tagB", "log_count": 7, "total_spend": 14.0},
{"individual_request_tag": "tagC", "log_count": 2, "total_spend": 2.5},
]
}
prisma_client = DummyPrismaClient(response_mapping)
result = await ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "tagB,tagC")

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_basic_empty_tags_str():
"""
Test with empty tags_str (should behave as all-tags).
"""
response_mapping = {
"all_tags": [
{"individual_request_tag": "tagA", "spend_date": "2024-06-01", "log_count": 1, "total_spend": 2.0}
]
}
prisma_client = DummyPrismaClient(response_mapping)
result = await ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "")

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_basic_none_tags_str():
"""
Test with tags_str=None (should behave as all-tags).
"""
response_mapping = {
"all_tags": [
{"individual_request_tag": "tagA", "spend_date": "2024-06-01", "log_count": 4, "total_spend": 8.0}
]
}
prisma_client = DummyPrismaClient(response_mapping)
result = await ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, None)

2. Edge Test Cases

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_no_db_connected():
"""
Test when prisma_client is None (should raise HTTPException).
"""
with pytest.raises(HTTPException) as exc_info:
await ui_get_spend_by_tags("2024-06-01", "2024-06-30", None, "tagA")

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_all_tags_in_tags_list_with_other_tags():
"""
Test when tags_str includes 'all-tags' and other tags (should treat as all-tags).
"""
response_mapping = {
"all_tags": [
{"individual_request_tag": "tagA", "spend_date": "2024-06-01", "log_count": 2, "total_spend": 4.0},
{"individual_request_tag": "tagB", "spend_date": "2024-06-01", "log_count": 1, "total_spend": 2.0},
]
}
prisma_client = DummyPrismaClient(response_mapping)
result = await ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "all-tags,tagA,tagB")

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_tag_with_zero_spend_and_count():
"""
Test tags with zero spend and zero log_count.
"""
response_mapping = {
"specific_tags": [
{"individual_request_tag": "tagZero", "log_count": 0, "total_spend": 0.0},
]
}
prisma_client = DummyPrismaClient(response_mapping)
result = await ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "tagZero")

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_empty_db_response():
"""
Test when DB returns empty list (no tags found).
"""
response_mapping = {
"all_tags": []
}
prisma_client = DummyPrismaClient(response_mapping)
result = await ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "all-tags")

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_concurrent_execution():
"""
Test concurrent execution of multiple calls.
"""
response_mapping = {
"all_tags": [
{"individual_request_tag": "tagA", "spend_date": "2024-06-01", "log_count": 1, "total_spend": 2.0}
],
"specific_tags": [
{"individual_request_tag": "tagB", "log_count": 2, "total_spend": 3.0}
]
}
prisma_client = DummyPrismaClient(response_mapping)
# Run two calls concurrently
results = await asyncio.gather(
ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "all-tags"),
ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "tagB")
)

3. Large Scale Test Cases

@pytest.mark.asyncio

async def test_ui_get_spend_by_tags_large_concurrent_calls():
"""
Test many concurrent calls to the function.
"""
# Simulate 50 concurrent calls with different tags
response_mapping = {
"specific_tags": [
{"individual_request_tag": "tagX", "log_count": 5, "total_spend": 10.0}
]
}
prisma_client = DummyPrismaClient(response_mapping)
coros = [
ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "tagX")
for _ in range(50)
]
results = await asyncio.gather(*coros)
for result in results:
pass

4. Throughput Test Cases

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_throughput_small_load():
"""
Throughput test with small load (10 concurrent calls).
"""
response_mapping = {
"specific_tags": [
{"individual_request_tag": "tagSmall", "log_count": 2, "total_spend": 3.0}
]
}
prisma_client = DummyPrismaClient(response_mapping)
coros = [
ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "tagSmall")
for _ in range(10)
]
results = await asyncio.gather(*coros)
for result in results:
pass

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_throughput_medium_load():
"""
Throughput test with medium load (50 concurrent calls).
"""
response_mapping = {
"specific_tags": [
{"individual_request_tag": "tagMedium", "log_count": 7, "total_spend": 14.0}
]
}
prisma_client = DummyPrismaClient(response_mapping)
coros = [
ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "tagMedium")
for _ in range(50)
]
results = await asyncio.gather(*coros)
for result in results:
pass

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_throughput_high_load():
"""
Throughput test with high load (100 concurrent calls).
"""
response_mapping = {
"specific_tags": [
{"individual_request_tag": "tagHigh", "log_count": 20, "total_spend": 40.0}
]
}
prisma_client = DummyPrismaClient(response_mapping)
coros = [
ui_get_spend_by_tags("2024-06-01", "2024-06-30", prisma_client, "tagHigh")
for _ in range(100)
]
results = await asyncio.gather(*coros)
for result in results:
pass

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

#------------------------------------------------
import asyncio # used to run async functions

function to test

(copied exactly as provided)

import collections
from typing import TYPE_CHECKING, Any, List, Optional

import pytest # used for our unit tests
from fastapi import HTTPException
from litellm.proxy._types import TYPE_CHECKING, Any, List, Optional
from litellm.proxy.spend_tracking.spend_management_endpoints import
ui_get_spend_by_tags
from litellm.proxy.utils import PrismaClient

if TYPE_CHECKING:
from litellm.proxy.proxy_server import PrismaClient
else:
PrismaClient = Any
from litellm.proxy.spend_tracking.spend_management_endpoints import
ui_get_spend_by_tags

---- TESTS ----

Minimal mock for prisma_client.db.query_raw

class MockDB:
def init(self, response_map=None):
self.response_map = response_map or {}

async def query_raw(self, sql_query, *args):
    # Use args to select response
    # For test purposes, just return what is set in response_map, or default
    key = tuple(args)
    # If a specific response for this query, return it
    if key in self.response_map:
        return self.response_map[key]
    # Otherwise, return the default response
    return self.response_map.get("default", [])

class MockPrismaClient:
def init(self, response_map=None):
self.db = MockDB(response_map)

----------- BASIC TEST CASES -----------

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_all_tags_basic():
"""
Basic: Test spend for all tags (tags_str contains 'all-tags')
"""
mock_response = [
{"individual_request_tag": "tagA", "spend_date": "2024-06-01", "log_count": 5, "total_spend": 10.0},
{"individual_request_tag": "tagB", "spend_date": "2024-06-01", "log_count": 3, "total_spend": 5.5},
{"individual_request_tag": "tagC", "spend_date": "2024-06-01", "log_count": 2, "total_spend": 4.0},
]
prisma_client = MockPrismaClient(response_map={
("2024-06-01", "2024-06-30"): mock_response
})
result = await ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str="all-tags"
)

@pytest.mark.asyncio

async def test_ui_get_spend_by_tags_empty_tags_str_returns_all_tags():
"""
Basic: Test empty tags_str returns spend for all tags
"""
mock_response = [
{"individual_request_tag": "tagX", "spend_date": "2024-06-01", "log_count": 1, "total_spend": 2.0},
{"individual_request_tag": "tagY", "spend_date": "2024-06-01", "log_count": 2, "total_spend": 3.0},
]
prisma_client = MockPrismaClient(response_map={
("2024-06-01", "2024-06-30"): mock_response
})
result = await ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str=""
)

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_none_tags_str_returns_all_tags():
"""
Basic: Test None tags_str returns spend for all tags
"""
mock_response = [
{"individual_request_tag": "tag1", "spend_date": "2024-06-01", "log_count": 1, "total_spend": 1.0},
]
prisma_client = MockPrismaClient(response_map={
("2024-06-01", "2024-06-30"): mock_response
})
result = await ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str=None
)

----------- EDGE TEST CASES -----------

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_no_prisma_client_raises():
"""
Edge: Should raise HTTPException if prisma_client is None
"""
with pytest.raises(HTTPException) as exc_info:
await ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=None,
tags_str="tagA"
)

@pytest.mark.asyncio

async def test_ui_get_spend_by_tags_concurrent_execution():
"""
Edge: Test concurrent execution of multiple calls
"""
mock_response1 = [
{"individual_request_tag": "tagA", "spend_date": "2024-06-01", "log_count": 2, "total_spend": 4.0},
]
mock_response2 = [
{"individual_request_tag": "tagB", "spend_date": "2024-06-02", "log_count": 3, "total_spend": 6.0},
]
prisma_client = MockPrismaClient(response_map={
("2024-06-01", "2024-06-30"): mock_response1,
("2024-06-02", "2024-06-30"): mock_response2,
})
# Run two calls concurrently
results = await asyncio.gather(
ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str="all-tags"
),
ui_get_spend_by_tags(
start_date="2024-06-02",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str="all-tags"
),
)

@pytest.mark.asyncio

async def test_ui_get_spend_by_tags_empty_response():
"""
Edge: Test empty response from db
"""
prisma_client = MockPrismaClient(response_map={
("2024-06-01", "2024-06-30"): [],
})
result = await ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str="all-tags"
)

----------- LARGE SCALE TEST CASES -----------

@pytest.mark.asyncio

async def test_ui_get_spend_by_tags_throughput_small_load():
"""
Throughput: Test small load (10 concurrent requests)
"""
mock_response = [
{"individual_request_tag": "tagA", "spend_date": "2024-06-01", "log_count": 1, "total_spend": 2.0},
]
prisma_client = MockPrismaClient(response_map={
("2024-06-01", "2024-06-30"): mock_response
})
tasks = [
ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str="all-tags"
)
for _ in range(10)
]
results = await asyncio.gather(*tasks)
for result in results:
pass

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_throughput_medium_load():
"""
Throughput: Test medium load (100 concurrent requests)
"""
mock_response = [
{"individual_request_tag": "tagB", "spend_date": "2024-06-01", "log_count": 2, "total_spend": 4.0},
]
prisma_client = MockPrismaClient(response_map={
("2024-06-01", "2024-06-30"): mock_response
})
tasks = [
ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str="all-tags"
)
for _ in range(100)
]
results = await asyncio.gather(*tasks)
for result in results:
pass

@pytest.mark.asyncio
async def test_ui_get_spend_by_tags_throughput_large_load():
"""
Throughput: Test large load (500 concurrent requests)
"""
mock_response = [
{"individual_request_tag": "tagC", "spend_date": "2024-06-01", "log_count": 3, "total_spend": 6.0},
]
prisma_client = MockPrismaClient(response_map={
("2024-06-01", "2024-06-30"): mock_response
})
tasks = [
ui_get_spend_by_tags(
start_date="2024-06-01",
end_date="2024-06-30",
prisma_client=prisma_client,
tags_str="all-tags"
)
for _ in range(500)
]
results = await asyncio.gather(*tasks)
for result in results:
pass

@pytest.mark.asyncio

To edit these changes git checkout codeflash/optimize-ui_get_spend_by_tags-mhu1rwwo and push.

Codeflash Static Badge

The optimization achieves a **16% runtime speedup** by replacing `collections.defaultdict` with plain dictionaries and optimizing data aggregation patterns.

**Key optimizations:**

1. **Eliminated defaultdict overhead**: Replaced `collections.defaultdict(float)` and `collections.defaultdict(int)` with plain `dict[str, float]` and `dict[str, int]`. The line profiler shows defaultdict creation taking ~700ns total (lines with `collections.defaultdict`), which is completely eliminated.

2. **Optimized aggregation logic**: 
   - For the "all-tags" path: Uses `dict.get(key, default) + value` instead of defaultdict's `+=` operator, reducing per-key lookup overhead
   - For the "specific-tags" path: Uses direct assignment since the SQL query already aggregates data (one row per tag), eliminating unnecessary `+=` operations entirely

3. **Reduced dictionary method calls**: The line profiler shows the original code's `total_spend_per_tag[tag_name] += tag_spend` taking ~368ns per hit, while the optimized version's `get()` + assignment pattern is more efficient for this access pattern.

**Performance impact**: Plain dictionaries have lower per-operation overhead than defaultdict for this use case where we know the access patterns upfront. The optimization is particularly effective for the "specific-tags" code path where direct assignment replaces unnecessary aggregation operations.

**Test case benefits**: The throughput improvement of 0.9% (95,760 vs 94,920 ops/sec) shows consistent gains across concurrent workloads. The optimization performs well across all test scenarios - from basic single calls to high-load concurrent execution with 500+ simultaneous requests.

This optimization maintains identical behavior while reducing CPU overhead in data structure operations, making it especially valuable for high-frequency spend tracking operations.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 11, 2025 04:04
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant