@@ -57,11 +57,7 @@ resource.rate_limit.burst: {{resource_rate_limit_burst}}
5757{{ #if enable_request_tracer }}
5858resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson"
5959{{ /if }}
60-
6160tags:
62- {{ #if preserve_original_event }}
63- - preserve_original_event
64- {{ /if }}
6561{{ #each tags as |tag |}}
6662 - {{ tag }}
6763{{ /each }}
@@ -75,27 +71,44 @@ processors:
7571
7672state:
7773 want_more: false
74+ next_page_sessions: ""
75+ next: 0
76+ worklist: []
77+ prev_sess: []
78+ sync_delay: "5s"
7879 base:
7980 tenant_id: "{{ azure_tenant_id }} "
80- period : "{{ period }} "
81+ initial_interval : "{{ initial_interval }} "
8182
8283redact:
8384 fields:
8485 - base.tenant_id
8586
8687program: |
87- (
88- has(state.worklist) && size(state.worklist) > 0 ?
88+ (
89+ // Check if pagination is complete for call records or needs to continue
90+ has(state.next_page) && state.next_page == "" ?
8991 state
9092 :
93+ // If there is a next page or it's the initial request, proceed with data fetching
9194 state.with(
9295 request(
9396 "GET",
94- state.url.trim_right("/") + "/communications/callRecords"
97+ // use next_page link if available, otherwise construct the initial query
98+ has(state.next_page) && state.next_page != "" ?
99+ state.next_page
100+ :
101+ state.url.trim_right("/") + "/communications/callRecords?" + "$filter=startDateTime%20ge%20"+ string(now - duration(state.base.initial_interval) - duration(state.sync_delay))
95102 ).do_request().as(resp, resp.StatusCode == 200 ?
96103 bytes(resp.Body).decode_json().as(body, {
97- "worklist": body.value.collate("id"),
98- "next": 0,
104+ //Collect call record IDs into the worklist
105+ "worklist" : state.worklist + body.value.map(mp, {
106+ "id": mp.id,
107+ "start_date_time": mp.startDateTime,
108+ "end_date_time": mp.endDateTime,
109+ }),
110+ "want_more": "@odata.nextLink" in body,
111+ "next_page": "@odata.nextLink" in body ? body["@odata.nextLink"] : "",
99112 })
100113 :
101114 {
@@ -111,55 +124,187 @@ program: |
111124 ),
112125 },
113126 },
114- "want_more": false,
127+ "want_more": false
115128 }
116129 ))
117- ).as(state, state.with(
118- !has(state.worklist) ? state : // Exit early due to GET failure.
119- state.worklist[?state.?next.orValue(-1)].hasValue() ?
120- request(
121- "GET",
122- has(state.next_page) && state.next_page != "" ?
123- state.next_page
124- :
125- state.url + "/communications/callRecords/" + state.worklist[state.next] + "/sessions?$expand=segments"
126- ).do_request().as(resp, resp.StatusCode == 200 ?
127- bytes(resp.Body).decode_json().as(body,{
128- "events": (
129- has(body.value) && size(body.value) > 0 ?
130- body.value.map(e, {
131- "message": e.encode_json()
132- })
133- :
134- [{"message":"retry"}]
135- ),
136- "worklist": int(state.next) + 1 < size(state.worklist) ? state.worklist : [],
137- "next": int(state.next) + 1 < size(state.worklist) ? (has(state.next_page) && state.next_page != "" ? int(state.next) : int(state.next) + 1) : 0,
138- "want_more": int(state.next) + 1 < size(state.worklist) || "@odata.nextLink" in body,
139- "next_page": "@odata.nextLink" in body ? body["@odata.nextLink"] : "",
140- })
130+ ).as(state,
131+ // If worklist has items to process, proceed to session-level data fetching using call_record_id(id)
132+ has(state.worklist) && size(state.worklist) > int(state.next) ?
133+ state.with(
134+ request(
135+ "GET",
136+ has(state.next_page_sessions) && state.next_page_sessions != "" ?
137+ state.next_page_sessions
141138 :
142- {
143- "events": {
144- "error": {
145- "code": string(resp.StatusCode),
146- "id": string(resp.Status),
147- "message": "GET /communications/callRecords/"+state.worklist[state.next].name+"/alerts:"+(
148- size(resp.Body) != 0 ?
149- string(resp.Body)
150- :
151- string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
152- ),
139+ state.url + "/communications/callRecords/" + state.worklist[state.next].id + "/sessions?$expand=segments"
140+ ).do_request().as(resp, resp.StatusCode == 200 ?
141+ bytes(resp.Body).decode_json().as(sess, sess.value.map(session,
142+ {
143+ // Map each session and its segments
144+ "session_id": session.id,
145+ "modalities" : session.modalities,
146+ "segments": session.segments.map(seg,{
147+ "segment_id": seg.id,
148+ "start_date_time": seg.startDateTime,
149+ "end_date_time": seg.endDateTime,
150+ "failure_info": seg.failureInfo,
151+ "caller": has(seg.caller) && has(seg.caller.associatedIdentity) ? {
152+ "associated_identity": {
153+ "id": has(seg.caller.associatedIdentity.id) ? seg.caller.associatedIdentity.id : null,
154+ "display_name": has(seg.caller.associatedIdentity.displayName) ? seg.caller.associatedIdentity.displayName : null,
155+ "tenant_id": has(seg.caller.associatedIdentity.tenantId) ? seg.caller.associatedIdentity.tenantId : null,
156+ "user_principal_name": has(seg.caller.associatedIdentity.userPrincipalName) ? seg.caller.associatedIdentity.userPrincipalName : null
157+ }
158+ } : {},
159+ "callee": has(seg.callee) && has(seg.callee.associatedIdentity) ? {
160+ "associated_identity": {
161+ "id": has(seg.callee.associatedIdentity.id) ? seg.callee.associatedIdentity.id : null,
162+ "display_name": has(seg.callee.associatedIdentity.displayName) ? seg.callee.associatedIdentity.displayName : null,
163+ "tenant_id": has(seg.callee.associatedIdentity.tenantId) ? seg.callee.associatedIdentity.tenantId : null,
164+ "user_principal_name": has(seg.callee.associatedIdentity.userPrincipalName) ? seg.callee.associatedIdentity.userPrincipalName : null
165+ }
166+ } : {},
167+ "media" : seg.media.map(med,
168+ !(med.label in ["data", "unknown"]) ?
169+ {
170+ "label": med.label,
171+ "caller_network": {
172+ "bandwidth_low_event_ratio": med.callerNetwork.bandwidthLowEventRatio,
173+ "basic_service_set_identifier": med.callerNetwork.basicServiceSetIdentifier,
174+ "connection_type": med.callerNetwork.connectionType,
175+ "delay_event_ratio": med.callerNetwork.delayEventRatio,
176+ "dns_suffix": med.callerNetwork.dnsSuffix,
177+ "ip_address": med.callerNetwork.ipAddress,
178+ "link_speed": med.callerNetwork.linkSpeed,
179+ "mac_address": med.callerNetwork.macAddress,
180+ "network_transport_protocol": med.callerNetwork.networkTransportProtocol,
181+ "port": med.callerNetwork.port,
182+ "received_quality_event_ratio": med.callerNetwork.receivedQualityEventRatio,
183+ "reflexive_ip_address": med.callerNetwork.reflexiveIPAddress,
184+ "relay_ip_address": med.callerNetwork.relayIPAddress,
185+ "relay_port": med.callerNetwork.relayPort,
186+ "sent_quality_event_ratio": med.callerNetwork.sentQualityEventRatio,
187+ "subnet": med.callerNetwork.subnet
153188 },
154- },
155- "want_more": false,
189+ "callee_network": {
190+ "bandwidth_low_event_ratio": med.calleeNetwork.bandwidthLowEventRatio,
191+ "basic_service_set_identifier": med.calleeNetwork.basicServiceSetIdentifier,
192+ "connection_type": med.calleeNetwork.connectionType,
193+ "delay_event_ratio": med.calleeNetwork.delayEventRatio,
194+ "dns_suffix": med.calleeNetwork.dnsSuffix,
195+ "ip_address": med.calleeNetwork.ipAddress,
196+ "link_speed": med.calleeNetwork.linkSpeed,
197+ "mac_address": med.calleeNetwork.macAddress,
198+ "network_transport_protocol": med.calleeNetwork.networkTransportProtocol,
199+ "port": med.calleeNetwork.port,
200+ "received_quality_event_ratio": med.calleeNetwork.receivedQualityEventRatio,
201+ "reflexive_ip_address": med.calleeNetwork.reflexiveIPAddress,
202+ "relay_ip_address": med.calleeNetwork.relayIPAddress,
203+ "relay_port": med.calleeNetwork.relayPort,
204+ "sent_quality_event_ratio": med.calleeNetwork.sentQualityEventRatio,
205+ "subnet": med.calleeNetwork.subnet
206+ },
207+ "streams": med.streams.map(stream,{
208+ "audio_codec" : stream.audioCodec,
209+ "average_audio_degradation" : stream.averageAudioDegradation,
210+ "average_audio_network_jitter" : stream.averageAudioNetworkJitter,
211+ "average_bandwidth_estimate" : stream.averageBandwidthEstimate,
212+ "average_freeze_duration" : stream.averageFreezeDuration,
213+ "average_jitter" : stream.averageJitter,
214+ "average_packet_loss_rate" : stream.averagePacketLossRate,
215+ "average_ratio_of_concealed_samples" : stream.averageRatioOfConcealedSamples,
216+ "average_received_frame_rate" : stream.averageReceivedFrameRate,
217+ "average_round_trip_time" : stream.averageRoundTripTime,
218+ "average_video_frame_loss_percentage" : stream.averageVideoFrameLossPercentage,
219+ "average_video_frame_rate" : stream.averageVideoFrameRate,
220+ "average_video_packet_loss_rate" : stream.averageVideoPacketLossRate,
221+ "end_date_time" : stream.endDateTime,
222+ "is_audio_forward_error_correction_used" : stream.isAudioForwardErrorCorrectionUsed,
223+ "low_frame_rate_ratio" : stream.lowFrameRateRatio,
224+ "low_video_processing_capability_ratio" : stream.lowVideoProcessingCapabilityRatio,
225+ "max_audio_network_jitter" : stream.maxAudioNetworkJitter,
226+ "max_jitter" : stream.maxJitter,
227+ "max_packet_loss_rate" : stream.maxPacketLossRate,
228+ "max_ratio_of_concealed_samples" : stream.maxRatioOfConcealedSamples,
229+ "max_round_trip_time" : stream.maxRoundTripTime,
230+ "packet_utilization" : stream.packetUtilization,
231+ "post_forward_error_correction_packet_loss_rate" : stream.postForwardErrorCorrectionPacketLossRate,
232+ "rms_freeze_duration" : stream.rmsFreezeDuration,
233+ "start_date_time" : stream.startDateTime,
234+ "stream_direction" : stream.streamDirection,
235+ "stream_id" : stream.streamId,
236+ "video_codec" : stream.videoCodec,
237+ "was_media_bypassed" : stream.wasMediaBypassed
238+ })
239+ }
240+ : {}
241+ ).filter(m, size(m) > 0)
156242 }
157- )
243+ )
244+ })
245+ .as(mapped_sessions,
246+
247+ // If there's a next link for session pagination, continue fetching
248+ // but do not emit for this case. Populate events
249+ // with a place-holder to be discarded by the ingest
250+ // pipeline. Keep storing sessions in prev_sess for same call_record
251+
252+ ("@odata.nextLink" in sess) ?
253+ {
254+ "events": [{"message": "want_more"}],
255+ "want_more": "@odata.nextLink" in sess || size(state.worklist) > 0,
256+ "next_page_sessions": "@odata.nextLink" in sess ? sess["@odata.nextLink"] : "",
257+ "worklist": state.worklist,
258+ "prev_sess": mapped_sessions + state.prev_sess,
259+ }
260+ :
261+ // If no more session pages, emit final event
262+ {
263+ "events": [
264+ {
265+ "o365":{
266+ "metrics": {
267+ "teams": {
268+ "call":{
269+ "quality": {
270+ "call_record_id" : string(state.worklist[state.next].id),
271+ "start_date_time" : state.worklist[state.next].start_date_time,
272+ "end_date_time" : state.worklist[state.next].end_date_time,
273+ "sessions": mapped_sessions + state.prev_sess
274+ }
275+ }
276+ }
277+ }
278+ }
279+ }
280+ ],
281+ "want_more": "@odata.nextLink" in sess || size(state.worklist) > 0,
282+ "next_page_sessions": "@odata.nextLink" in sess ? sess["@odata.nextLink"] : "",
283+ // Remove the processed call_record_id from the worklist after publishing its event.
284+ "worklist": tail(state.worklist),
285+ "prev_sess": [],
286+ }))
158287 :
159288 {
160- "events": [],
289+ "events": {
290+ "error": {
291+ "code": string(resp.StatusCode),
292+ "id": string(resp.Status),
293+ "message": "GET /sessions?$expand=segments" + (
294+ size(resp.Body) != 0 ?
295+ string(resp.Body)
296+ :
297+ string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
298+ ),
299+ },
300+ },
161301 "want_more": false,
162- "next_page": {},
163302 }
303+ )
164304 )
165- )
305+ :
306+ {
307+ "events": [],
308+ "want_more": false
309+ }
310+ )
0 commit comments