Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ea0bc7d
feat: add selective log deletion functionality to improve log management
Aug 28, 2025
3237ed1
refactor: move conversation deletion to service layer with Celery tasks
Aug 28, 2025
0d1ba93
Fix conversation clearing and prevent 404 errors
Sep 2, 2025
642cd31
Merge upstream main and resolve conflicts
Sep 2, 2025
4e0979a
Optimize PR based on feedback - improve code quality and accessibility
Sep 4, 2025
8518150
Fix linter formatting issues
Sep 4, 2025
7870f70
Fix overly aggressive localStorage clearing - address PR feedback
Sep 4, 2025
5a7be42
Fix import error for CONVERSATION_ID_INFO constant
Sep 4, 2025
e774b84
Merge branch 'main' into feat/selective-log-deletion
crazywoola Sep 8, 2025
1f305b2
[autofix.ci] apply automated fixes
autofix-ci[bot] Sep 12, 2025
f3d11cf
Simplify conversation deletion parameter parsing and resolve merge co…
connermo Sep 13, 2025
b014204
Fix merge conflict in imports: add missing Account import
connermo Sep 13, 2025
d6f2b70
Merge branch 'main' into feat/selective-log-deletion
crazywoola Sep 16, 2025
d2b90cc
Merge branch 'main' into feat/selective-log-deletion
connermo Sep 16, 2025
19b38d8
Merge branch 'main' into feat/selective-log-deletion
crazywoola Oct 15, 2025
a2eba9a
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 15, 2025
a259b4c
Merge branch 'main' into feat/selective-log-deletion
asukaminato0721 Oct 19, 2025
034ce08
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 19, 2025
41844ec
refactor: consolidate conversation deletion logic and fix critical is…
connermo Nov 2, 2025
c7d6d92
Merge upstream/main into feat/selective-log-deletion
connermo Nov 2, 2025
31f6a06
fix: remove unused handleRowClick function
connermo Nov 2, 2025
ec9dc0a
fix: improve log clearing and prevent 404 errors
connermo Nov 3, 2025
8da334c
feat: prevent duplicate log clearing tasks with Redis locks and optim…
connermo Nov 3, 2025
5817c66
fix: enable auto-refresh for log list on focus and reconnect
connermo Nov 3, 2025
6349de7
fix: implement optimistic updates for log deletion
connermo Nov 3, 2025
1c7a4b6
fix: refresh log list when navigating from other pages
connermo Nov 3, 2025
f223eaa
fix: refresh conversation list when navigating to configuration page
connermo Nov 3, 2025
ce1f293
Merge branch 'main' into feat/selective-log-deletion
connermo Nov 3, 2025
f9f089c
fix: ensure conversation list refreshes after deletion in same tab
connermo Nov 3, 2025
b56b6d1
feat: implement universal conversation list sync mechanism
connermo Nov 3, 2025
8ae7537
Merge upstream/main into feat/selective-log-deletion
connermo Nov 4, 2025
43d6fb3
Merge upstream/main into feat/selective-log-deletion
connermo Nov 24, 2025
1a19809
Merge branch 'main' into feat/selective-log-deletion
connermo Nov 24, 2025
0983027
Merge branch 'main' into feat/selective-log-deletion
crazywoola Nov 26, 2025
5bbd7b0
Merge branch 'main' into feat/selective-log-deletion
connermo Nov 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions api/controllers/console/app/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,47 @@

return conversations

@api.doc("clear_completion_conversations")

Check failure on line 387 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
@api.doc(description="Clear completion conversations and related data")

Check failure on line 388 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
@api.doc(params={"app_id": "Application ID"})

Check failure on line 389 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
@api.expect(

Check failure on line 390 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
api.parser().add_argument(

Check failure on line 391 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
"conversation_ids",
type=list,
location="json",
required=False,
help="Optional list of conversation IDs to clear. If not provided, all conversations will be cleared.",
)
)
@api.response(202, "Clearing task queued successfully")

Check failure on line 399 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
@api.response(403, "Insufficient permissions")

Check failure on line 400 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=AppMode.COMPLETION)
@edit_permission_required
def delete(self, app_model):
from services.errors.conversation import ConversationClearInProgressError

current_user, _ = current_account_with_tenant()
parser = reqparse.RequestParser()

Check failure on line 410 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"reqparse" is not defined (reportUndefinedVariable)
parser.add_argument("conversation_ids", type=list, location="json", required=False, default=None)
args = parser.parse_args()

# Convert conversation IDs to strings if provided and non-empty
conversation_ids_raw = args.get("conversation_ids")
conversation_ids = (
[str(id) for id in conversation_ids_raw] if conversation_ids_raw and len(conversation_ids_raw) > 0 else None
)

try:
result = ConversationService.clear_conversations(
app_model=app_model, user=current_user, conversation_ids=conversation_ids
)
return result, 202
except ConversationClearInProgressError as e:
return {"message": str(e), "code": "task_in_progress"}, 409


@console_ns.route("/apps/<uuid:app_id>/completion-conversations/<uuid:conversation_id>")
class CompletionConversationDetailApi(Resource):
Expand Down Expand Up @@ -536,6 +577,47 @@

return conversations

@api.doc("clear_chat_conversations")

Check failure on line 580 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
@api.doc(description="Clear chat conversations and related data")

Check failure on line 581 in api/controllers/console/app/conversation.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"api" is not defined (reportUndefinedVariable)
@api.doc(params={"app_id": "Application ID"})
@api.expect(
api.parser().add_argument(
"conversation_ids",
type=list,
location="json",
required=False,
help="Optional list of conversation IDs to clear. If not provided, all conversations will be cleared.",
)
)
@api.response(202, "Clearing task queued successfully")
@api.response(403, "Insufficient permissions")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT])
@edit_permission_required
def delete(self, app_model):
from services.errors.conversation import ConversationClearInProgressError

current_user, _ = current_account_with_tenant()
parser = reqparse.RequestParser()
parser.add_argument("conversation_ids", type=list, location="json", required=False, default=None)
args = parser.parse_args()

# Convert conversation IDs to strings if provided and non-empty
conversation_ids_raw = args.get("conversation_ids")
conversation_ids = (
[str(id) for id in conversation_ids_raw] if conversation_ids_raw and len(conversation_ids_raw) > 0 else None
)

try:
result = ConversationService.clear_conversations(
app_model=app_model, user=current_user, conversation_ids=conversation_ids
)
return result, 202
except ConversationClearInProgressError as e:
return {"message": str(e), "code": "task_in_progress"}, 409


@console_ns.route("/apps/<uuid:app_id>/chat-conversations/<uuid:conversation_id>")
class ChatConversationDetailApi(Resource):
Expand Down
117 changes: 115 additions & 2 deletions api/services/conversation_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import hashlib
import logging
from collections.abc import Callable, Sequence
from typing import Any, Union
Expand All @@ -11,18 +12,21 @@
from core.variables.types import SegmentType
from core.workflow.nodes.variable_assigner.common.impl import conversation_variable_updater_factory
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from factories import variable_factory
from libs.datetime_utils import naive_utc_now
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models import Account, ConversationVariable
from models.model import App, Conversation, EndUser, Message
from services.errors.conversation import (
ConversationClearInProgressError,
ConversationNotExistsError,
ConversationVariableNotExistsError,
ConversationVariableTypeMismatchError,
LastConversationNotExistsError,
)
from services.errors.message import MessageNotExistsError
from tasks.clear_conversation_task import clear_conversations_task
from tasks.delete_conversation_task import delete_conversation_related_data

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -128,7 +132,18 @@ def rename(
else:
conversation.name = name
conversation.updated_at = naive_utc_now()
db.session.commit()
try:
db.session.commit()
except Exception as e:
# Handle case where conversation was deleted after we retrieved it
from sqlalchemy.orm.exc import StaleDataError

db.session.rollback()
if isinstance(e, StaleDataError):
# Conversation was likely deleted, raise ConversationNotExistsError
raise ConversationNotExistsError()
else:
raise

return conversation

Expand All @@ -152,7 +167,18 @@ def auto_generate_name(cls, app_model: App, conversation: Conversation):
)
conversation.name = name

db.session.commit()
try:
db.session.commit()
except Exception as e:
# Handle case where conversation was deleted after we retrieved it
from sqlalchemy.orm.exc import StaleDataError

db.session.rollback()
if isinstance(e, StaleDataError):
# Conversation was likely deleted, raise ConversationNotExistsError
raise ConversationNotExistsError()
else:
raise

return conversation

Expand Down Expand Up @@ -194,6 +220,93 @@ def delete(cls, app_model: App, conversation_id: str, user: Union[Account, EndUs
db.session.rollback()
raise e

@classmethod
def clear_conversations(
cls, app_model: App, user: Union[Account, EndUser] | None, conversation_ids: list[str] | None = None
) -> dict[str, Any]:
"""
Clear conversations and related data, optionally for specific conversation IDs.
Uses Celery task for handling large datasets.

Args:
app_model: The app model
user: The user (Account or EndUser)
conversation_ids: Optional list of specific conversation IDs to clear

Returns:
dict with task info and estimated counts

Raises:
ConversationClearInProgressError: If a clearing task is already in progress
"""
# Validate conversation ownership if specific IDs provided
if conversation_ids:
for conversation_id in conversation_ids:
cls.get_conversation(app_model, conversation_id, user)

# Get conversation mode for task
if app_model.mode == "completion":
mode = "completion"
else:
mode = "chat" # covers chat, agent-chat, advanced-chat

# Generate unique lock key to prevent duplicate tasks
# Key format: clear_conversations:{app_id}:{mode}[:{conversation_ids_hash}]
lock_key = f"clear_conversations:{app_model.id}:{mode}"
if conversation_ids:
# For selective deletion, include hash of conversation IDs
ids_hash = hashlib.md5(",".join(sorted(conversation_ids)).encode()).hexdigest()[:8]
lock_key += f":{ids_hash}"

# Try to acquire lock (expires in 10 minutes)
# nx=True means set only if key doesn't exist
lock_acquired = redis_client.set(lock_key, "1", ex=600, nx=True)
if not lock_acquired:
# Another clearing task is already in progress
raise ConversationClearInProgressError(
"A conversation clearing task is already in progress for this app. Please wait for it to complete."
)

try:
# Queue the Celery task with lock_key for cleanup
task = clear_conversations_task.delay(
app_id=app_model.id,
conversation_mode=mode,
conversation_ids=conversation_ids,
user_id=user.id if user else None,
user_type="account" if isinstance(user, Account) else "end_user" if isinstance(user, EndUser) else None,
lock_key=lock_key,
)

# Get estimated counts for response
if conversation_ids:
conversation_count = len(conversation_ids)
else:
# Estimate total conversations for this app
conversation_count = (
db.session.query(Conversation)
.filter(
Conversation.app_id == app_model.id,
Conversation.mode == mode,
Conversation.from_source == ("api" if isinstance(user, EndUser) else "console"),
Conversation.from_end_user_id == (user.id if isinstance(user, EndUser) else None),
Conversation.from_account_id == (user.id if isinstance(user, Account) else None),
Conversation.is_deleted == False,
)
.count()
)

return {
"task_id": task.id,
"status": "queued",
"estimated_conversations": conversation_count,
"mode": "selective" if conversation_ids else "all",
}
except Exception:
# Release lock if task queueing failed
redis_client.delete(lock_key)
raise

@classmethod
def get_conversational_variable(
cls,
Expand Down
6 changes: 6 additions & 0 deletions api/services/errors/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ class ConversationVariableNotExistsError(BaseServiceError):

class ConversationVariableTypeMismatchError(BaseServiceError):
pass


class ConversationClearInProgressError(BaseServiceError):
"""Raised when a conversation clear operation is already in progress for the same app/mode."""

pass
39 changes: 39 additions & 0 deletions api/tasks/clean_uploaded_files_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging

from celery import shared_task

from extensions.ext_database import db
from extensions.ext_storage import storage
from models.model import UploadFile

logger = logging.getLogger(__name__)


@shared_task(queue="dataset")
def clean_uploaded_files_task(upload_file_ids: list[str]):
"""
Asynchronously clean uploaded files from storage.
This task is called after database records have been successfully deleted.

Args:
upload_file_ids: List of upload file IDs to delete from storage
"""
if not upload_file_ids:
return

success_count = 0
failed_count = 0

for upload_file_id in upload_file_ids:
try:
upload_file = db.session.query(UploadFile).where(UploadFile.id == upload_file_id).first()

if upload_file and upload_file.key:
storage.delete(upload_file.key)
success_count += 1

except Exception:
logger.exception("Failed to delete upload file %s", upload_file_id)
failed_count += 1

return {"total": len(upload_file_ids), "success": success_count, "failed": failed_count}
Loading
Loading