1+ # analytics.py (версия с устранением задержки и оптимизациями)
2+ import cv2
3+ import sys
4+ import json
5+ import time
6+ import base64
7+ import threading # <--- Импортируем модуль для работы с потоками
8+
9+ try :
10+ from ultralytics import YOLO
11+ except ImportError :
12+ print (json .dumps ({
13+ "status" : "error" ,
14+ "message" : "Ultralytics YOLO library not found. Please run 'pip install ultralytics'."
15+ }), flush = True )
16+ sys .exit (1 )
17+
18+ # Класс для чтения видеопотока в отдельном потоке
19+ class FrameGrabber :
20+ """
21+ Класс для захвата кадров из видеопотока в отдельном потоке,
22+ чтобы избежать накопления задержки в буфере cv2.VideoCapture.
23+ """
24+ def __init__ (self , src = 0 ):
25+ self .stream = cv2 .VideoCapture (src )
26+ if not self .stream .isOpened ():
27+ raise IOError ("Cannot open video stream" )
28+
29+ self .ret , self .frame = self .stream .read ()
30+ self .stopped = False
31+ self .thread = threading .Thread (target = self .update , args = ())
32+ self .thread .daemon = True
33+
34+ def start (self ):
35+ self .stopped = False
36+ self .thread .start ()
37+
38+ def update (self ):
39+ while not self .stopped :
40+ ret , frame = self .stream .read ()
41+ if not ret :
42+ self .stop ()
43+ break
44+ self .ret = ret
45+ self .frame = frame
46+
47+ def read (self ):
48+ return self .ret , self .frame
49+
50+ def stop (self ):
51+ self .stopped = True
52+ if self .thread .is_alive ():
53+ self .thread .join (timeout = 1.0 )
54+ self .stream .release ()
55+
56+
57+ def run_analytics (rtsp_url , config_str ):
58+ try :
59+ model = YOLO ("yolov8n.pt" )
60+ except Exception as e :
61+ print (json .dumps ({"status" : "error" , "message" : f"Failed to load YOLOv8 model: { e } " }), flush = True )
62+ sys .exit (1 )
63+
64+ config = {}
65+ if config_str :
66+ try :
67+ config_json = base64 .b64decode (config_str ).decode ('utf-8' )
68+ config = json .loads (config_json )
69+ except Exception as e :
70+ print (json .dumps ({"status" : "error" , "message" : f"Invalid config provided: { e } " }), flush = True )
71+
72+ roi = config .get ('roi' )
73+ objects_to_detect = config .get ('objects' , None )
74+ confidence_threshold = config .get ('confidence' , 0.5 )
75+ frame_skip = int (config .get ('frame_skip' , 5 ))
76+ if frame_skip < 1 :
77+ frame_skip = 1
78+ resize_width = int (config .get ('resize_width' , 640 ))
79+
80+ # Используем наш новый класс FrameGrabber
81+ try :
82+ frame_grabber = FrameGrabber (rtsp_url )
83+ frame_grabber .start ()
84+ time .sleep (2 ) # Даем время на подключение и заполнение первого кадра
85+ except IOError as e :
86+ print (json .dumps ({"status" : "error" , "message" : str (e )}), flush = True )
87+ sys .exit (1 )
88+
89+ frame_count = 0
90+
91+ # Основной цикл и блок finally для корректного завершения
92+ try :
93+ while not frame_grabber .stopped :
94+ ret , frame = frame_grabber .read ()
95+ if not ret or frame is None :
96+ # Поток мог завершиться, даем ему немного времени и проверяем снова
97+ time .sleep (0.5 )
98+ if frame_grabber .stopped :
99+ break
100+ continue
101+
102+ frame_count += 1
103+ if frame_count % frame_skip != 0 :
104+ # VVV ИЗМЕНЕНИЕ: Замена sleep на continue VVV
105+ # Это более эффективно, так как не вносит искусственную задержку.
106+ # Цикл просто перейдет к следующей итерации.
107+ continue
108+ # ^^^ КОНЕЦ ИЗМЕНЕНИЯ ^^^
109+
110+ original_height , original_width = frame .shape [:2 ]
111+
112+ scale_x , scale_y = 1.0 , 1.0
113+ if resize_width > 0 and original_width > resize_width :
114+ scale_x = original_width / resize_width
115+ new_height = int (original_height / scale_x )
116+ scale_y = original_height / new_height
117+ frame_to_process = cv2 .resize (frame , (resize_width , new_height ), interpolation = cv2 .INTER_AREA )
118+ else :
119+ frame_to_process = frame
120+
121+ results = model (frame_to_process , verbose = False , conf = confidence_threshold )
122+
123+ detected_objects = []
124+ for box in results [0 ].boxes :
125+ class_id = int (box .cls [0 ])
126+ label = model .names [class_id ]
127+
128+ x1 , y1 , x2 , y2 = box .xyxy [0 ]
129+ x , y , w , h = int (x1 ), int (y1 ), int (x2 - x1 ), int (y2 - y1 )
130+
131+ detected_objects .append ({
132+ 'label' : label ,
133+ 'confidence' : float (box .conf [0 ]),
134+ 'box' : {
135+ 'x' : int (x * scale_x ),
136+ 'y' : int (y * scale_y ),
137+ 'w' : int (w * scale_x ),
138+ 'h' : int (h * scale_y )
139+ }
140+ })
141+
142+ filtered_objects = []
143+ for obj in detected_objects :
144+ if objects_to_detect and obj ['label' ] not in objects_to_detect :
145+ continue
146+
147+ if roi :
148+ box = obj ['box' ]
149+ obj_center_x = box ['x' ] + box ['w' ] / 2
150+ obj_center_y = box ['y' ] + box ['h' ] / 2
151+
152+ roi_x1 = roi ['x' ] * original_width
153+ roi_y1 = roi ['y' ] * original_height
154+ roi_x2 = (roi ['x' ] + roi ['w' ]) * original_width
155+ roi_y2 = (roi ['y' ] + roi ['h' ]) * original_height
156+
157+ if not (roi_x1 < obj_center_x < roi_x2 and roi_y1 < obj_center_y < roi_y2 ):
158+ continue
159+
160+ filtered_objects .append (obj )
161+
162+ if len (filtered_objects ) > 0 :
163+ result = {
164+ "status" : "objects_detected" ,
165+ "timestamp" : time .time (),
166+ "objects" : filtered_objects
167+ }
168+ print (json .dumps (result ), flush = True )
169+
170+ finally :
171+ print (json .dumps ({"status" : "info" , "message" : "Analytics process stopping." }), flush = True )
172+ frame_grabber .stop ()
173+
174+ if __name__ == "__main__" :
175+ if len (sys .argv ) > 1 :
176+ rtsp_stream_url = sys .argv [1 ]
177+ config_arg = sys .argv [2 ] if len (sys .argv ) > 2 else None
178+ run_analytics (rtsp_stream_url , config_arg )
179+ else :
180+ print (json .dumps ({"status" : "error" , "message" : "RTSP URL not provided" }), flush = True )
181+ sys .exit (1 )
0 commit comments