Skip to content

Commit 202d11f

Browse files
committed
flush on home button
1 parent 2797716 commit 202d11f

File tree

5 files changed

+132
-24
lines changed

5 files changed

+132
-24
lines changed

Sources/Common/Task+Extensions.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ extension Task where Success == Never, Failure == Never {
1212
/// try await Task.sleep(seconds: 3.0)
1313
///
1414
public static func sleep(seconds: TimeInterval) async throws {
15-
try await Task.sleep(nanoseconds: UInt64(seconds * Double(NSEC_PER_SEC)))
15+
try await Task.sleep(nanoseconds: seconds.nanoseconds)
1616
}
1717
}

Sources/Common/Timestamp.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,8 @@ extension TimeInterval {
1010
public var milliseconds: Int64 {
1111
Int64(self * 1000.0)
1212
}
13+
14+
public var nanoseconds: UInt64 {
15+
UInt64(self * Double(NSEC_PER_SEC))
16+
}
1317
}

Sources/LaunchDarklyObservability/Transport/BatchWorker.swift

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public final actor BatchWorker {
88
enum Constants {
99
static let maxConcurrentCost: Int = 30000
1010
static let maxConcurrentItems: Int = 100
11-
static let maxConcurrentExporters: Int = 2
11+
static let maxConcurrentExporters: Int = 3
1212
static let baseBackoffSeconds: TimeInterval = 2
1313
static let maxBackoffSeconds: TimeInterval = 60
1414
static let backoffJitter: Double = 0.2
@@ -28,7 +28,8 @@ public final actor BatchWorker {
2828
private var costInFlight = 0
2929
private var exportersInFlight = Set<ObjectIdentifier>()
3030
private var exporterBackoff = [ObjectIdentifier: BackOffExporterInfo]()
31-
31+
private var flushableWorker: FlushableWorker?
32+
3233
public init(eventQueue: EventQueue,
3334
log: OSLog) {
3435
self.eventQueue = eventQueue
@@ -40,27 +41,22 @@ public final actor BatchWorker {
4041
exporters[exporterId] = exporter
4142
}
4243

43-
public func start() {
44-
guard task == nil else { return }
45-
46-
task = Task.detached(priority: .background) { [weak self] in
47-
while !Task.isCancelled {
48-
guard let self else { return }
49-
let scheduledCost = await sendQueueItems()
50-
try? await Task.sleep(seconds: scheduledCost > 0 ? minInterval : interval)
51-
}
44+
public func start() async {
45+
if flushableWorker == nil {
46+
flushableWorker = FlushableWorker(interval: 1.5, work: sendQueueItems)
5247
}
48+
await flushableWorker?.start()
5349
}
54-
55-
func sendQueueItems() async -> Int {
50+
51+
func sendQueueItems(isFlushing: Bool) async {
5652
var scheduledCost = 0
5753

5854
while true {
5955
let remainingExporterSlots = Constants.maxConcurrentExporters - exportersInFlight.count
60-
guard remainingExporterSlots > 0 else { break }
56+
guard remainingExporterSlots > 0 || isFlushing else { break }
6157

6258
let budget = Constants.maxConcurrentCost - costInFlight
63-
guard costInFlight == 0 || budget > 0 else { break }
59+
guard costInFlight == 0 || (budget > 0 || isFlushing) else { break }
6460

6561
let now = DispatchTime.now()
6662
var except = exportersInFlight
@@ -96,8 +92,6 @@ public final actor BatchWorker {
9692
scheduledCost += cost
9793
}
9894
}
99-
100-
return scheduledCost
10195
}
10296

10397
private func tryReserve(exporterId: ObjectIdentifier, cost: Int) -> Bool {
@@ -128,8 +122,11 @@ public final actor BatchWorker {
128122
costInFlight -= cost
129123
}
130124

131-
public func stop() {
132-
task?.cancel()
133-
task = nil
125+
public func stop() async {
126+
await flushableWorker?.stop()
127+
}
128+
129+
public func flush() async {
130+
await flushableWorker?.flush()
134131
}
135132
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import Foundation
2+
#if !LD_COCOAPODS
3+
import Common
4+
#endif
5+
6+
actor FlushableWorker {
7+
typealias Work = @Sendable (_ isFlushing: Bool) async -> Void
8+
private enum Trigger {
9+
case tick
10+
case flush
11+
}
12+
13+
private var task: Task<Void, Never>? = nil
14+
private let interval: TimeInterval
15+
private var work: Work
16+
private var continuation: AsyncStream<Trigger>.Continuation? = nil
17+
private var pending: Trigger? = nil
18+
19+
init(interval: TimeInterval, work: @escaping Work) {
20+
self.interval = interval
21+
self.work = work
22+
}
23+
24+
func start() {
25+
guard task == nil else { return }
26+
27+
let stream = AsyncStream<Trigger>(bufferingPolicy: .bufferingNewest(1)) { [weak self] cont in
28+
Task {
29+
await self?.setContinuation(cont)
30+
}
31+
}
32+
33+
let task = Task { [weak self] in
34+
guard let self else { return }
35+
36+
let tickTask = Task {
37+
while !Task.isCancelled {
38+
try? await Task.sleep(seconds: interval)
39+
doTrigger(.tick)
40+
}
41+
}
42+
defer {
43+
tickTask.cancel()
44+
}
45+
46+
for await trigger in stream {
47+
if Task.isCancelled {
48+
break
49+
}
50+
await work(trigger == .flush)
51+
await clearPending()
52+
}
53+
}
54+
}
55+
56+
func stop() {
57+
task?.cancel()
58+
task = nil
59+
continuation?.finish()
60+
continuation = nil
61+
pending = nil
62+
}
63+
64+
func flush() async {
65+
doTrigger(.flush)
66+
}
67+
68+
private func doTrigger(_ next: Trigger) {
69+
guard pending != .flush else {
70+
// flush is already next
71+
return
72+
}
73+
74+
if next == .flush || pending == nil {
75+
pending = next
76+
continuation?.yield(next)
77+
}
78+
}
79+
80+
private func clearPending() {
81+
pending = nil
82+
}
83+
84+
private func setContinuation(_ continuation: AsyncStream<Trigger>.Continuation) {
85+
self.continuation = continuation
86+
}
87+
}
88+

Sources/LaunchDarklyObservability/Transport/TransportService.swift

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import Foundation
2+
import Combine
23

34
public protocol EventSource: AnyObject {
45
func start()
@@ -16,13 +17,31 @@ final class TransportService: TransportServicing {
1617
public let eventQueue: EventQueue
1718
public let sessionManager: SessionManaging
1819
public private(set) var isRunnung: Bool = false
19-
2020
public var batchWorker: BatchWorker
21-
22-
public init(eventQueue: EventQueue, batchWorker: BatchWorker, sessionManager: SessionManaging) {
21+
private let appLifecycleManager: AppLifecycleManaging
22+
private var cancellables = Set<AnyCancellable>()
23+
24+
public init(eventQueue: EventQueue,
25+
batchWorker: BatchWorker,
26+
sessionManager: SessionManaging,
27+
appLifecycleManager: AppLifecycleManaging) {
2328
self.eventQueue = eventQueue
2429
self.batchWorker = batchWorker
2530
self.sessionManager = sessionManager
31+
self.appLifecycleManager = appLifecycleManager
32+
33+
34+
appLifecycleManager
35+
.publisher()
36+
.receive(on: DispatchQueue.global())
37+
.sink { [weak self] event in
38+
if event == .willResignActive {
39+
Task { [weak self] in
40+
await self?.batchWorker.flush()
41+
}
42+
}
43+
}
44+
.store(in: &cancellables)
2645
}
2746

2847
public func start() {

0 commit comments

Comments
 (0)