Skip to content

Commit a51b033

Browse files
connermoclaude
andcommitted
feat: prevent duplicate log clearing tasks with Redis locks and optimistic UI updates
This commit addresses the reviewer's concern about duplicate task submissions by implementing a multi-layered prevention strategy. ## Backend improvements: 1. **Redis-based task deduplication** - Added Redis lock mechanism in ConversationService.clear_conversations() - Lock key format: `clear_conversations:{app_id}:{mode}[:{conversation_ids_hash}]` - Lock expires in 10 minutes, preventing stale locks - Returns 409 Conflict if clearing task already in progress 2. **New exception handling** - Added ConversationClearInProgressError exception - Controllers return 409 status with clear error message - Lock is automatically released on task completion or max retries 3. **Celery task improvements** - Task accepts lock_key parameter for cleanup - Releases lock on successful completion - Releases lock after max retries to prevent deadlock ## Frontend improvements: 1. **Optimistic UI updates** - List immediately cleared from UI when user clicks "Clear All" - For selective deletion, removed items are filtered out instantly - Prevents user confusion and repeat submissions 2. **Improved user experience** - No need to wait for backend processing to see results - Clear visual feedback that operation started - Backend revalidation happens after optimistic update ## How it works: 1. User clicks "Clear All" → List immediately disappears (optimistic update) 2. Frontend sends DELETE request → Backend checks Redis lock 3. If lock exists → Return 409 error (task already running) 4. If no lock → Acquire lock and queue Celery task 5. Celery task processes deletion in background 6. Task releases lock when done or after max retries 7. Frontend revalidates data after request completes This prevents duplicate tasks even in edge cases like: - Cross-tab submissions - Slow network + multiple clicks - Page refresh during operation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 4454c47 commit a51b033

File tree

6 files changed

+158
-50
lines changed

6 files changed

+158
-50
lines changed

api/controllers/console/app/conversation.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ def get(self, app_model):
149149
@get_app_model(mode=AppMode.COMPLETION)
150150
@edit_permission_required
151151
def delete(self, app_model):
152+
from services.errors.conversation import ConversationClearInProgressError
153+
152154
current_user, _ = current_account_with_tenant()
153155
parser = reqparse.RequestParser()
154156
parser.add_argument("conversation_ids", type=list, location="json", required=False, default=None)
@@ -160,11 +162,13 @@ def delete(self, app_model):
160162
[str(id) for id in conversation_ids_raw] if conversation_ids_raw and len(conversation_ids_raw) > 0 else None
161163
)
162164

163-
result = ConversationService.clear_conversations(
164-
app_model=app_model, user=current_user, conversation_ids=conversation_ids
165-
)
166-
167-
return result, 202
165+
try:
166+
result = ConversationService.clear_conversations(
167+
app_model=app_model, user=current_user, conversation_ids=conversation_ids
168+
)
169+
return result, 202
170+
except ConversationClearInProgressError as e:
171+
return {"message": str(e), "code": "task_in_progress"}, 409
168172

169173

170174
@console_ns.route("/apps/<uuid:app_id>/completion-conversations/<uuid:conversation_id>")
@@ -394,6 +398,8 @@ def get(self, app_model):
394398
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT])
395399
@edit_permission_required
396400
def delete(self, app_model):
401+
from services.errors.conversation import ConversationClearInProgressError
402+
397403
current_user, _ = current_account_with_tenant()
398404
parser = reqparse.RequestParser()
399405
parser.add_argument("conversation_ids", type=list, location="json", required=False, default=None)
@@ -405,11 +411,13 @@ def delete(self, app_model):
405411
[str(id) for id in conversation_ids_raw] if conversation_ids_raw and len(conversation_ids_raw) > 0 else None
406412
)
407413

408-
result = ConversationService.clear_conversations(
409-
app_model=app_model, user=current_user, conversation_ids=conversation_ids
410-
)
411-
412-
return result, 202
414+
try:
415+
result = ConversationService.clear_conversations(
416+
app_model=app_model, user=current_user, conversation_ids=conversation_ids
417+
)
418+
return result, 202
419+
except ConversationClearInProgressError as e:
420+
return {"message": str(e), "code": "task_in_progress"}, 409
413421

414422

415423
@console_ns.route("/apps/<uuid:app_id>/chat-conversations/<uuid:conversation_id>")

api/services/conversation_service.py

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import contextlib
2+
import hashlib
23
import logging
34
from collections.abc import Callable, Sequence
45
from typing import Any, Union
@@ -11,12 +12,14 @@
1112
from core.variables.types import SegmentType
1213
from core.workflow.nodes.variable_assigner.common.impl import conversation_variable_updater_factory
1314
from extensions.ext_database import db
15+
from extensions.ext_redis import redis_client
1416
from factories import variable_factory
1517
from libs.datetime_utils import naive_utc_now
1618
from libs.infinite_scroll_pagination import InfiniteScrollPagination
1719
from models import Account, ConversationVariable
1820
from models.model import App, Conversation, EndUser, Message
1921
from services.errors.conversation import (
22+
ConversationClearInProgressError,
2023
ConversationNotExistsError,
2124
ConversationVariableNotExistsError,
2225
ConversationVariableTypeMismatchError,
@@ -232,6 +235,9 @@ def clear_conversations(
232235
233236
Returns:
234237
dict with task info and estimated counts
238+
239+
Raises:
240+
ConversationClearInProgressError: If a clearing task is already in progress
235241
"""
236242
# Validate conversation ownership if specific IDs provided
237243
if conversation_ids:
@@ -244,39 +250,62 @@ def clear_conversations(
244250
else:
245251
mode = "chat" # covers chat, agent-chat, advanced-chat
246252

247-
# Queue the Celery task
248-
task = clear_conversations_task.delay(
249-
app_id=app_model.id,
250-
conversation_mode=mode,
251-
conversation_ids=conversation_ids,
252-
user_id=user.id if user else None,
253-
user_type="account" if isinstance(user, Account) else "end_user" if isinstance(user, EndUser) else None,
254-
)
255-
256-
# Get estimated counts for response
253+
# Generate unique lock key to prevent duplicate tasks
254+
# Key format: clear_conversations:{app_id}:{mode}[:{conversation_ids_hash}]
255+
lock_key = f"clear_conversations:{app_model.id}:{mode}"
257256
if conversation_ids:
258-
conversation_count = len(conversation_ids)
259-
else:
260-
# Estimate total conversations for this app
261-
conversation_count = (
262-
db.session.query(Conversation)
263-
.filter(
264-
Conversation.app_id == app_model.id,
265-
Conversation.mode == mode,
266-
Conversation.from_source == ("api" if isinstance(user, EndUser) else "console"),
267-
Conversation.from_end_user_id == (user.id if isinstance(user, EndUser) else None),
268-
Conversation.from_account_id == (user.id if isinstance(user, Account) else None),
269-
Conversation.is_deleted == False,
270-
)
271-
.count()
257+
# For selective deletion, include hash of conversation IDs
258+
ids_hash = hashlib.md5(",".join(sorted(conversation_ids)).encode()).hexdigest()[:8]
259+
lock_key += f":{ids_hash}"
260+
261+
# Try to acquire lock (expires in 10 minutes)
262+
# nx=True means set only if key doesn't exist
263+
lock_acquired = redis_client.set(lock_key, "1", ex=600, nx=True)
264+
if not lock_acquired:
265+
# Another clearing task is already in progress
266+
raise ConversationClearInProgressError(
267+
"A conversation clearing task is already in progress for this app. Please wait for it to complete."
268+
)
269+
270+
try:
271+
# Queue the Celery task with lock_key for cleanup
272+
task = clear_conversations_task.delay(
273+
app_id=app_model.id,
274+
conversation_mode=mode,
275+
conversation_ids=conversation_ids,
276+
user_id=user.id if user else None,
277+
user_type="account" if isinstance(user, Account) else "end_user" if isinstance(user, EndUser) else None,
278+
lock_key=lock_key,
272279
)
273280

274-
return {
275-
"task_id": task.id,
276-
"status": "queued",
277-
"estimated_conversations": conversation_count,
278-
"mode": "selective" if conversation_ids else "all",
279-
}
281+
# Get estimated counts for response
282+
if conversation_ids:
283+
conversation_count = len(conversation_ids)
284+
else:
285+
# Estimate total conversations for this app
286+
conversation_count = (
287+
db.session.query(Conversation)
288+
.filter(
289+
Conversation.app_id == app_model.id,
290+
Conversation.mode == mode,
291+
Conversation.from_source == ("api" if isinstance(user, EndUser) else "console"),
292+
Conversation.from_end_user_id == (user.id if isinstance(user, EndUser) else None),
293+
Conversation.from_account_id == (user.id if isinstance(user, Account) else None),
294+
Conversation.is_deleted == False,
295+
)
296+
.count()
297+
)
298+
299+
return {
300+
"task_id": task.id,
301+
"status": "queued",
302+
"estimated_conversations": conversation_count,
303+
"mode": "selective" if conversation_ids else "all",
304+
}
305+
except Exception:
306+
# Release lock if task queueing failed
307+
redis_client.delete(lock_key)
308+
raise
280309

281310
@classmethod
282311
def get_conversational_variable(

api/services/errors/conversation.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,9 @@ class ConversationVariableNotExistsError(BaseServiceError):
1919

2020
class ConversationVariableTypeMismatchError(BaseServiceError):
2121
pass
22+
23+
24+
class ConversationClearInProgressError(BaseServiceError):
25+
"""Raised when a conversation clear operation is already in progress for the same app/mode."""
26+
27+
pass

api/tasks/clear_conversation_task.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from sqlalchemy.orm import sessionmaker
66

77
from extensions.ext_database import db
8+
from extensions.ext_redis import redis_client
89
from extensions.ext_storage import storage
910
from models import (
1011
Conversation,
@@ -32,6 +33,7 @@ def clear_conversations_task(
3233
conversation_ids: list[str] | None = None,
3334
user_id: str | None = None,
3435
user_type: str | None = None,
36+
lock_key: str | None = None,
3537
):
3638
"""
3739
Celery task to clear conversations and related data.
@@ -42,6 +44,7 @@ def clear_conversations_task(
4244
conversation_ids: Optional list of specific conversation IDs to clear
4345
user_id: The user ID for permission validation
4446
user_type: 'account' or 'end_user'
47+
lock_key: Redis lock key to release after completion
4548
"""
4649
start_time = time.time()
4750
total_deleted = {
@@ -160,6 +163,14 @@ def clear_conversations_task(
160163
execution_time = time.time() - start_time
161164
logger.info("Conversation cleanup completed in %.2fs. Deleted: %s", execution_time, total_deleted)
162165

166+
# Release Redis lock on successful completion
167+
if lock_key:
168+
try:
169+
redis_client.delete(lock_key)
170+
logger.info("Released lock: %s", lock_key)
171+
except Exception as e:
172+
logger.warning("Failed to release lock %s: %s", lock_key, e)
173+
163174
return {
164175
"status": "completed",
165176
"execution_time": execution_time,
@@ -178,6 +189,13 @@ def clear_conversations_task(
178189
raise self.retry(exc=exc, countdown=60 * (2**self.request.retries))
179190
except MaxRetriesExceededError:
180191
logger.exception("Max retries exceeded for conversation cleanup, app_id=%s", app_id)
192+
# Release lock after max retries
193+
if lock_key:
194+
try:
195+
redis_client.delete(lock_key)
196+
logger.info("Released lock after max retries: %s", lock_key)
197+
except Exception as e:
198+
logger.warning("Failed to release lock %s: %s", lock_key, e)
181199
raise
182200

183201

web/app/components/app/log/index.tsx

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ const Logs: FC<ILogsProps> = ({ appDetail }) => {
107107
if (conversationIds) {
108108
// Remove specific conversation IDs from selection
109109
setSelectedItems(prev => prev.filter(id => !conversationIds.includes(id)))
110-
} else {
110+
}
111+
else {
111112
// Clear all selections
112113
setSelectedItems([])
113114
}
@@ -129,13 +130,13 @@ const Logs: FC<ILogsProps> = ({ appDetail }) => {
129130
{total === undefined
130131
? <Loading type='app' />
131132
: total > 0
132-
? <List
133-
logs={isChatMode ? chatConversations : completionConversations}
134-
appDetail={appDetail}
135-
onRefresh={isChatMode ? mutateChatList : mutateCompletionList}
136-
selectedItems={selectedItems}
137-
onSelectionChange={setSelectedItems}
138-
/>
133+
? <List
134+
logs={isChatMode ? chatConversations : completionConversations}
135+
appDetail={appDetail}
136+
onRefresh={isChatMode ? mutateChatList : mutateCompletionList}
137+
selectedItems={selectedItems}
138+
onSelectionChange={setSelectedItems}
139+
/>
139140
: <EmptyElement appUrl={`${appDetail.site.app_base_url}${basePath}/${getWebAppType(appDetail.mode)}/${appDetail.site.access_token}`} />
140141
}
141142
{/* Show Pagination only if the total is more than the limit */}

web/service/log.ts

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,29 @@ export const clearChatConversations = async ({ appId, conversationIds }: { appId
8888
? { conversation_ids: conversationIds }
8989
: { conversation_ids: [] }
9090

91+
// Optimistic update: immediately clear the list from UI
92+
// This prevents users from thinking the operation failed and clicking again
93+
mutate(
94+
key =>
95+
typeof key === 'object' && key !== null && 'url' in key
96+
&& key.url === `/apps/${appId}/chat-conversations`,
97+
(data: any) => {
98+
if (!data)
99+
return { data: [], total: 0 }
100+
// For selective deletion, filter out deleted conversations
101+
if (conversationIds && conversationIds.length > 0) {
102+
return {
103+
...data,
104+
data: data.data?.filter((conv: any) => !conversationIds.includes(conv.id)) || [],
105+
total: Math.max(0, (data.total || 0) - conversationIds.length),
106+
}
107+
}
108+
// For "clear all", return empty list
109+
return { data: [], total: 0 }
110+
},
111+
{ revalidate: false },
112+
)
113+
91114
const result = await del<any>(`/apps/${appId}/chat-conversations`, { body })
92115

93116
// Clear localStorage to prevent 404 errors
@@ -96,7 +119,7 @@ export const clearChatConversations = async ({ appId, conversationIds }: { appId
96119
// Clear SWR caches to force reload of conversation lists
97120
await Promise.all([
98121
// Clear log list caches (key is an object with url and params)
99-
// Force cache invalidation with populateCache: false
122+
// Now revalidate after the deletion is confirmed
100123
mutate(
101124
key =>
102125
typeof key === 'object' && key !== null && 'url' in key
@@ -141,6 +164,29 @@ export const clearCompletionConversations = async ({ appId, conversationIds }: {
141164
? { conversation_ids: conversationIds }
142165
: { conversation_ids: [] }
143166

167+
// Optimistic update: immediately clear the list from UI
168+
// This prevents users from thinking the operation failed and clicking again
169+
mutate(
170+
key =>
171+
typeof key === 'object' && key !== null && 'url' in key
172+
&& key.url === `/apps/${appId}/completion-conversations`,
173+
(data: any) => {
174+
if (!data)
175+
return { data: [], total: 0 }
176+
// For selective deletion, filter out deleted conversations
177+
if (conversationIds && conversationIds.length > 0) {
178+
return {
179+
...data,
180+
data: data.data?.filter((conv: any) => !conversationIds.includes(conv.id)) || [],
181+
total: Math.max(0, (data.total || 0) - conversationIds.length),
182+
}
183+
}
184+
// For "clear all", return empty list
185+
return { data: [], total: 0 }
186+
},
187+
{ revalidate: false },
188+
)
189+
144190
const result = await del<any>(`/apps/${appId}/completion-conversations`, { body })
145191

146192
// Clear localStorage to prevent 404 errors
@@ -149,7 +195,7 @@ export const clearCompletionConversations = async ({ appId, conversationIds }: {
149195
// Clear SWR caches to force reload of conversation lists
150196
await Promise.all([
151197
// Clear log list caches (key is an object with url and params)
152-
// Force cache invalidation with populateCache: false
198+
// Now revalidate after the deletion is confirmed
153199
mutate(
154200
key =>
155201
typeof key === 'object' && key !== null && 'url' in key

0 commit comments

Comments
 (0)