Skip to content

Commit 74a2076

Browse files
authored
Merge pull request #64 from nineaiyu/dev
Dev
2 parents b7c687e + 557783c commit 74a2076

File tree

9 files changed

+357
-95
lines changed

9 files changed

+357
-95
lines changed

common/base/magic.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from importlib import import_module
1212

1313
from django.core.cache import cache
14-
from django.db import close_old_connections
14+
from django.db import close_old_connections, connection
1515
from django.http.response import HttpResponse
1616

1717
from common.utils import get_logger
@@ -341,3 +341,39 @@ def wrapper(*_args, **_kwargs):
341341
return wrapper
342342

343343
return decorator
344+
345+
346+
import functools
347+
348+
349+
def timeit(func):
350+
@functools.wraps(func)
351+
def wrapper(*args, **kwargs):
352+
start_time = time.time()
353+
result = func(*args, **kwargs)
354+
end_time = time.time()
355+
logger.info(f"{func.__name__} run time:{end_time - start_time}")
356+
return result
357+
358+
return wrapper
359+
360+
361+
class SQLCounter:
362+
def __init__(self):
363+
self.count = 0
364+
365+
def __call__(self, execute, sql, params, many, context):
366+
self.count += 1
367+
return execute(sql, params, many, context)
368+
369+
370+
def count_sql_queries(func):
371+
@functools.wraps(func)
372+
def wrapper(*args, **kwargs):
373+
sql_counter = SQLCounter()
374+
with connection.execute_wrapper(sql_counter):
375+
result = func(*args, **kwargs)
376+
logger.info(f"{func.__name__} sql queries count: {sql_counter.count}")
377+
return result
378+
379+
return wrapper

common/cache/redis.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ def format_input(data):
3030

3131
class CacheList(object):
3232

33-
def __init__(self, key, max_size=1024):
33+
def __init__(self, key, max_size=1024, timeout=None):
3434
self.connect = get_redis_connection("default")
3535
self.key = key
3636
self.max_size = max_size
37+
self.timeout = timeout
3738

3839
def auto_ltrim(self):
3940
stop = self.connect.llen(self.key)
@@ -44,6 +45,8 @@ def auto_ltrim(self):
4445
def push(self, json_data, *args):
4546
self.connect.lpush(self.key, json.dumps(json_data), *[json.dumps(x) for x in args])
4647
self.auto_ltrim()
48+
if self.timeout is not None:
49+
self.connect.expire(self.key, self.timeout)
4750

4851
def pop(self):
4952
try:
@@ -56,10 +59,11 @@ def pop(self):
5659
def delete(self):
5760
self.connect.delete(self.key)
5861

62+
def len(self):
63+
return self.connect.llen(self.key)
5964

60-
class RobotMsgCache(CacheList):
61-
def __init__(self, key='', max_size=1024):
62-
super().__init__(f'ai_robot_chat_msg_{key}', max_size)
65+
def get_all(self):
66+
return [format_return(k) for k in self.connect.lrange(self.key, 0, -1)]
6367

6468

6569
class CacheSet(object):

common/core/filter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from rest_framework.exceptions import NotAuthenticated
2020
from rest_framework.filters import BaseFilterBackend
2121

22+
from common.base.magic import timeit, count_sql_queries
2223
from common.cache.storage import CommonResourceIDsCache
2324
from common.core.db.utils import RelatedManager
2425
from common.utils import get_logger
@@ -140,6 +141,8 @@ def get_filter_q_base(model, permission, user_obj=None, dept_obj=None):
140141
return q1
141142

142143

144+
@timeit
145+
@count_sql_queries
143146
def get_filter_queryset(queryset: QuerySet, user_obj: UserInfo):
144147
"""
145148
1.获取所有数据权限规则

common/core/modelset.py

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
# filename : modelset
55
# author : ly_13
66
# date : 6/2/2023
7+
import itertools
78
import json
9+
import uuid
810
from hashlib import md5
911
from typing import Callable
1012

13+
import math
1114
from django.conf import settings
1215
from django.db import transaction
1316
from django.forms.widgets import SelectMultiple, DateTimeInput
@@ -29,15 +32,32 @@
2932
from common.core.config import SysConfig
3033
from common.core.response import ApiResponse
3134
from common.core.serializers import BasePrimaryKeyRelatedField
32-
from common.core.utils import get_query_post_pks
3335
from common.drf.renders.csv import CSVFileRenderer
3436
from common.drf.renders.excel import ExcelFileRenderer
3537
from common.swagger.utils import get_default_response_schema
38+
from common.tasks import background_task_view_set_job
3639
from common.utils import get_logger
3740

3841
logger = get_logger(__name__)
3942

4043

44+
def run_view_by_celery_task(view, request, kwargs, list_data, batch_length=100):
45+
task = kwargs.get("task", request.query_params.get('task', 'true').lower() in ['true', '1', 'yes']) # 默认为任务异步导入
46+
if task:
47+
view_str = f"{view.__class__.__module__}.{view.__class__.__name__}"
48+
meta = request.META
49+
task_id = uuid.uuid4()
50+
meta["task_count"] = math.ceil(len(list_data) / batch_length)
51+
meta["action"] = view.action
52+
for index, batch in enumerate(itertools.batched(list_data, batch_length)):
53+
meta["task_id"] = f"{task_id}_{index}"
54+
meta["task_index"] = index
55+
res = background_task_view_set_job.apply_async(args=(view_str, meta, json.dumps(batch), view.action_map),
56+
task_id=meta["task_id"])
57+
logger.info(f"add {view_str} task success. {res}")
58+
return ApiResponse(detail=_("Task add success"))
59+
60+
4161
class CacheDetailResponseMixin(object):
4262
def get_cache_key(self, view_instance, view_method, request, args, kwargs):
4363
func_name = f'{view_instance.__class__.__name__}_{view_method.__name__}'
@@ -106,20 +126,14 @@ class RankAction(object):
106126
get_queryset: Callable
107127

108128
@extend_schema(
109-
request=OpenApiRequest(
110-
build_object_type(
111-
properties={'pks': build_array_type(build_basic_type(OpenApiTypes.STR))},
112-
required=['pks'],
113-
description="主键列表"
114-
)
115-
),
129+
request=OpenApiRequest(build_array_type(build_basic_type(OpenApiTypes.STR))),
116130
responses=get_default_response_schema()
117131
)
118132
@action(methods=['post'], detail=False, url_path='rank')
119133
def rank(self, request, *args, **kwargs):
120134
"""{cls}排序"""
121135
rank = 1
122-
for pk in get_query_post_pks(request):
136+
for pk in request.data:
123137
self.filter_queryset(self.get_queryset()).filter(pk=pk).update(rank=rank)
124138
rank += 1
125139
return ApiResponse(detail=_("Sorting saved successfully"))
@@ -336,7 +350,7 @@ def get_input_type(value, info):
336350
return ApiResponse(data=results)
337351

338352

339-
class BaseViewSet(GenericViewSet):
353+
class BaseViewSet(object):
340354
action: Callable
341355
extra_filter_class = []
342356

@@ -373,30 +387,26 @@ class BatchDestroyAction(object):
373387
perform_destroy: Callable
374388

375389
@extend_schema(
376-
request=OpenApiRequest(
377-
build_object_type(
378-
properties={'pks': build_array_type(build_basic_type(OpenApiTypes.STR))},
379-
required=['pks'],
380-
description="主键列表"
381-
)
382-
),
390+
request=OpenApiRequest(build_array_type(build_basic_type(OpenApiTypes.STR))),
383391
responses=get_default_response_schema()
384392
)
385393
@action(methods=['post'], detail=False, url_path='batch-destroy')
386394
def batch_destroy(self, request, *args, **kwargs):
387395
"""批量删除{cls}"""
388-
pks = get_query_post_pks(request)
389-
if not pks:
390-
return ApiResponse(code=1003, detail=_("Operation failed. Primary key list does not exist"))
396+
397+
# response = run_view_by_celery_task(self, request, kwargs, request.data, batch_length=30)
398+
# if response:
399+
# return response
400+
391401
# queryset delete() 方法进行批量删除,并不调用模型上的任何 delete() 方法,需要通过循环对象进行删除
392402
count = 0
393-
for instance in self.filter_queryset(self.get_queryset()).filter(pk__in=pks):
403+
for instance in self.filter_queryset(self.get_queryset()).filter(pk__in=request.data):
394404
try:
395405
deleted, _rows_count = self.perform_destroy(instance)
396406
if deleted:
397407
count += 1
398-
except Exception:
399-
pass
408+
except Exception as e:
409+
logger.error(f"failed to destroy instance {instance} with error {e}")
400410
return ApiResponse(detail=_("Operation successful. Batch deleted {} data").format(count))
401411

402412

@@ -481,6 +491,11 @@ class ImportExportDataAction(CreateAction, UpdateAction, OnlyExportDataAction):
481491
@transaction.atomic
482492
def import_data(self, request, *args, **kwargs):
483493
"""导入{cls}数据"""
494+
495+
response = run_view_by_celery_task(self, request, kwargs, request.data)
496+
if response:
497+
return response
498+
484499
act = request.query_params.get('action')
485500
ignore_error = request.query_params.get('ignore_error', 'false') == 'true'
486501
if act and request.data:
@@ -509,25 +524,25 @@ def import_data(self, request, *args, **kwargs):
509524
return ApiResponse(detail=_("Operation failed. Abnormal data"), code=1001)
510525

511526

512-
class DetailUpdateModelSet(UpdateAction, DetailAction, BaseViewSet):
527+
class DetailUpdateModelSet(BaseViewSet, UpdateAction, DetailAction, GenericViewSet):
513528
pass
514529

515530

516-
class OnlyListModelSet(ListAction, SearchFieldsAction, SearchColumnsAction, BaseViewSet):
531+
class OnlyListModelSet(BaseViewSet, ListAction, SearchFieldsAction, SearchColumnsAction, GenericViewSet):
517532
pass
518533

519534

520535
# 全部 ViewSet 包含增删改查
521-
class BaseModelSet(CreateAction, DestroyAction, UpdateAction, ListAction, DetailAction, SearchFieldsAction,
522-
SearchColumnsAction, BatchDestroyAction, BaseViewSet):
536+
class BaseModelSet(BaseViewSet, CreateAction, DestroyAction, UpdateAction, ListAction, DetailAction, SearchFieldsAction,
537+
SearchColumnsAction, BatchDestroyAction, GenericViewSet):
523538
pass
524539

525540

526541
# 只允许读和删除,不允许创建和修改
527-
class ListDeleteModelSet(DestroyAction, ListAction, DetailAction, SearchFieldsAction, SearchColumnsAction,
528-
BatchDestroyAction, BaseViewSet):
542+
class ListDeleteModelSet(BaseViewSet, DestroyAction, ListAction, DetailAction, SearchFieldsAction, SearchColumnsAction,
543+
BatchDestroyAction, GenericViewSet):
529544
pass
530545

531546

532-
class NoDetailModelSet(UpdateAction, DetailAction, SearchColumnsAction, BaseViewSet):
547+
class NoDetailModelSet(BaseViewSet, UpdateAction, DetailAction, SearchColumnsAction, GenericViewSet):
533548
pass

common/notifications.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from common.models import Monitor
77
from notifications.backends import BACKEND
88
from notifications.models import SystemMsgSubscription
9-
from notifications.notifications import SystemMessage
9+
from notifications.notifications import SystemMessage, UserMessage
1010
from system.models import UserInfo
1111

1212

@@ -128,3 +128,39 @@ def get_monitor_latest_average_value(num=3):
128128

129129
def initial_terminals(self):
130130
self._terminals = [self.get_monitor_latest_average_value()]
131+
132+
133+
class TaskMessage(object):
134+
def get_html_msg(self) -> dict:
135+
context = dict(
136+
subject=self.subject,
137+
name=self.user.nickname,
138+
**self.task,
139+
)
140+
message = render_to_string('notify/msg_task.html', context)
141+
return {
142+
'subject': self.subject,
143+
'message': message
144+
}
145+
146+
147+
class ImportDataMessage(TaskMessage, UserMessage):
148+
category = 'Task Message'
149+
category_label = _('Task Message')
150+
message_type_label = _('Import data message')
151+
152+
def __init__(self, user, task):
153+
super().__init__(user)
154+
self.task = task
155+
self.subject = _('Import {} data {} message').format(self.task.get("view_doc"), self.task.get("status"))
156+
157+
158+
class BatchDeleteDataMessage(TaskMessage, UserMessage):
159+
category = 'Task Message'
160+
category_label = _('Task Message')
161+
message_type_label = _('Batch delete data message')
162+
163+
def __init__(self, user, task):
164+
super().__init__(user)
165+
self.task = task
166+
self.subject = _('Batch delete {} data {} message').format(self.task.get("view_doc"), self.task.get("status"))

common/tasks.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,25 @@
66
# date : 7/30/2024
77
import datetime
88
import os
9+
from io import BytesIO
910

1011
from celery import shared_task
1112
from celery.utils.log import get_task_logger
1213
from django.conf import settings
14+
from django.core.handlers.wsgi import WSGIRequest
1315
from django.core.mail import send_mail, EmailMultiAlternatives, get_connection
1416
from django.utils import timezone
17+
from django.utils.module_loading import import_string
1518
from django.utils.translation import gettext_lazy as _
1619
from django_celery_beat.models import PeriodicTask
1720

21+
from common.cache.redis import CacheList
1822
from common.celery.decorator import register_as_period_task, after_app_ready_start
1923
from common.celery.utils import delete_celery_periodic_task, disable_celery_periodic_task, get_celery_periodic_task, \
2024
create_or_update_celery_periodic_tasks
2125
from common.models import Monitor
22-
from common.notifications import ServerPerformanceCheckUtil
26+
from common.notifications import ServerPerformanceCheckUtil, ImportDataMessage, BatchDeleteDataMessage
27+
from common.utils.timezone import local_now_display
2328
from server.celery import app
2429

2530
logger = get_task_logger(__name__)
@@ -130,3 +135,42 @@ def create_or_update_registered_periodic_tasks():
130135
@register_as_period_task(interval=60)
131136
def check_server_performance_period():
132137
ServerPerformanceCheckUtil().check_and_publish()
138+
139+
140+
@shared_task(verbose_name=_("Run background task view set"))
141+
def background_task_view_set_job(view: str, meta: dict, data: str, action_map: dict):
142+
cache = CacheList(f"view_task_{meta.get("task_id").split("_")[0]}", timeout=3600 * 24)
143+
task_info = {
144+
"start_time": local_now_display(),
145+
"task_id": meta.get("task_id"),
146+
"task_index": meta.get("task_index")
147+
}
148+
view_func = import_string(view)
149+
b_data = data.encode("utf-8")
150+
meta["wsgi.input"] = BytesIO(b_data)
151+
meta["CONTENT_TYPE"] = "application/json"
152+
meta["CONTENT_LENGTH"] = len(b_data)
153+
request = WSGIRequest(meta)
154+
result = view_func.as_view(action_map)(request, task=False)
155+
task_info["result"] = result.data.get("detail", result.data)
156+
task_info["end_time"] = local_now_display()
157+
task_info["status"] = result.data.get("code") == 1000
158+
cache.push(task_info)
159+
if cache.len() == meta["task_count"]:
160+
task_results = cache.get_all()
161+
cache.delete()
162+
state = all([task["status"] for task in task_results])
163+
task_info = {
164+
"task_name": view,
165+
"view_doc": view_func.__doc__,
166+
"state": state,
167+
"status": _("Operation successful") if state else _("Operation failed"),
168+
"tasks": sorted(task_results, key=lambda task: task["task_index"])
169+
}
170+
match meta["action"]:
171+
case "import_data":
172+
ImportDataMessage(getattr(request, "user"), task_info).publish()
173+
case "batch_destroy":
174+
BatchDeleteDataMessage(getattr(request, "user"), task_info).publish()
175+
176+
return task_info

0 commit comments

Comments
 (0)