|
11 | 11 | import logging |
12 | 12 | from collections import defaultdict |
13 | 13 | from datetime import datetime, timedelta |
| 14 | +from itertools import islice |
14 | 15 |
|
| 16 | +from blueapps.core.celery.celery import app |
15 | 17 | from celery.schedules import crontab |
16 | 18 | from django.core.cache import cache |
17 | 19 | from django.utils.translation import gettext as _ |
18 | 20 |
|
19 | 21 | from backend.components import CCApi |
20 | 22 | from backend.db_dirty.models import DirtyMachine |
21 | 23 | from backend.db_meta.models import Machine |
| 24 | +from backend.db_periodic_task.local_tasks.context_manager import start_new_span |
22 | 25 | from backend.db_periodic_task.local_tasks.register import register_periodic_task |
| 26 | +from backend.db_periodic_task.utils import TimeUnit, calculate_countdown |
23 | 27 | from backend.db_proxy.constants import DB_CLOUD_MACHINE_EXPIRE_TIME |
24 | 28 | from backend.utils.redis import RedisConn |
25 | 29 |
|
@@ -187,3 +191,67 @@ def sync_machine_ip_cache(): |
187 | 191 | RedisConn.expire(cache_key, DB_CLOUD_MACHINE_EXPIRE_TIME) |
188 | 192 |
|
189 | 193 | logger.info("cache machine task is finished, number is %s", len(hosts)) |
| 194 | + |
| 195 | + |
| 196 | +def batch_generator_large(queryset, batch_size=2): |
| 197 | + iterator = queryset.iterator() |
| 198 | + while batch := list(islice(iterator, batch_size)): |
| 199 | + yield batch |
| 200 | + |
| 201 | + |
| 202 | +@register_periodic_task(run_every=crontab(minute=30)) |
| 203 | +def update_all_host_property(): |
| 204 | + try: |
| 205 | + machines = Machine.objects.all() |
| 206 | + machine_count = machines.count() |
| 207 | + task_count = machine_count // 2 if not machine_count % 2 else machine_count // 2 + 1 |
| 208 | + |
| 209 | + for index, batch in enumerate(batch_generator_large(machines, 2)): |
| 210 | + countdown = calculate_countdown(count=task_count, index=index, duration=12 * TimeUnit.MINUTE) |
| 211 | + with start_new_span(update_machine_field): |
| 212 | + bk_host_ids = [machine.bk_host_id for machine in batch] |
| 213 | + update_machine_field.apply_async(kwargs={"bk_host_ids": bk_host_ids}, countdown=countdown) |
| 214 | + |
| 215 | + except Exception as e: |
| 216 | + logger.exception(f"Error during sync_update_host_property: {e}") |
| 217 | + |
| 218 | + |
| 219 | +@app.task |
| 220 | +def update_machine_field(bk_host_ids): |
| 221 | + logger.info(f"111111111111111111111, begin to update machine field:{bk_host_ids}") |
| 222 | + # 初始化请求参数 |
| 223 | + params = {"bk_fields": DEFAULT_BK_FIELDS, "page": {"start": 0, "limit": 10}} |
| 224 | + batch_machines = Machine.objects.filter(bk_host_id__in=bk_host_ids) |
| 225 | + machine_ip_map = {machine.bk_host_id: machine for machine in batch_machines} |
| 226 | + machine_fields = [ |
| 227 | + ("bk_os_name", "bk_os_name"), |
| 228 | + ("bk_idc_area", "bk_idc_area"), |
| 229 | + ("bk_idc_area_id", "bk_idc_area_id"), |
| 230 | + ("bk_sub_zone", "sub_zone"), |
| 231 | + ("bk_sub_zone_id", "sub_zone_id"), |
| 232 | + ("bk_rack", "rack"), |
| 233 | + ("bk_rack_id", "rack_id"), |
| 234 | + ("bk_svr_device_cls_name", "bk_svr_device_cls_name"), |
| 235 | + ("bk_city_id", "idc_city_id"), |
| 236 | + ] |
| 237 | + |
| 238 | + params["host_property_filter"] = { |
| 239 | + "condition": "AND", |
| 240 | + "rules": [{"field": "bk_host_id", "operator": "in", "value": bk_host_ids}], |
| 241 | + } |
| 242 | + res = CCApi.list_hosts_without_biz(params, use_admin=True) |
| 243 | + host_infos = res.get("info", []) |
| 244 | + host_updates = { |
| 245 | + host["bk_host_id"]: { |
| 246 | + # 确保包含_id的字段不为None 否则更新会出错 |
| 247 | + field_name: 0 if (field_name.endswith("_id") and host.get(detail_name) is None) else host.get(detail_name) |
| 248 | + for field_name, detail_name in machine_fields |
| 249 | + } |
| 250 | + for host in host_infos |
| 251 | + } |
| 252 | + # 批量更新machine属性 |
| 253 | + machines_to_update = update_hosts(machine_ip_map, host_updates) |
| 254 | + if machines_to_update: |
| 255 | + Machine.objects.bulk_update( |
| 256 | + machines_to_update, fields=[field for field, __ in machine_fields if hasattr(Machine, field)] |
| 257 | + ) |
0 commit comments