|
60 | 60 | ticks*: Deque[AsyncCallback] |
61 | 61 | trackers*: Table[string, TrackerBase] |
62 | 62 | counters*: Table[string, TrackerCounter] |
63 | | - |
64 | | -proc sentinelCallbackImpl(arg: pointer) {.gcsafe, noreturn.} = |
65 | | - raiseAssert "Sentinel callback MUST not be scheduled" |
66 | | - |
67 | | -const |
68 | | - SentinelCallback = AsyncCallback(function: sentinelCallbackImpl, |
69 | | - udata: nil) |
70 | | - |
71 | | -proc isSentinel(acb: AsyncCallback): bool = |
72 | | - acb == SentinelCallback |
| 63 | + polling*: bool |
| 64 | + ## The event loop is currently running |
73 | 65 |
|
74 | 66 | proc `<`(a, b: TimerCallback): bool = |
75 | 67 | result = a.finishAt < b.finishAt |
76 | 68 |
|
| 69 | +template preparePoll(loop: PDispatcherBase) = |
| 70 | + # If you hit this assert, you've called `poll`, `runForever` or `waitFor` |
| 71 | + # from within an async function which is not supported due to the difficulty |
| 72 | + # to control stack depth and event ordering |
| 73 | + # If you're using `waitFor`, switch to `await` and / or propagate the |
| 74 | + # up the call stack. |
| 75 | + doAssert not loop.polling, "The event loop and chronos functions in general are not reentrant" |
| 76 | + |
| 77 | + loop.polling = true |
| 78 | + defer: loop.polling = false |
| 79 | + |
77 | 80 | func getAsyncTimestamp*(a: Duration): auto {.inline.} = |
78 | 81 | ## Return rounded up value of duration with milliseconds resolution. |
79 | 82 | ## |
@@ -138,10 +141,10 @@ template processTicks(loop: untyped) = |
138 | 141 | loop.callbacks.addLast(loop.ticks.popFirst()) |
139 | 142 |
|
140 | 143 | template processCallbacks(loop: untyped) = |
141 | | - while true: |
142 | | - let callable = loop.callbacks.popFirst() # len must be > 0 due to sentinel |
143 | | - if isSentinel(callable): |
144 | | - break |
| 144 | + # Process existing callbacks but not those that follow, to allow the network |
| 145 | + # to regain control regularly |
| 146 | + for _ in 0..<loop.callbacks.len(): |
| 147 | + let callable = loop.callbacks.popFirst() |
145 | 148 | if not(isNil(callable.function)): |
146 | 149 | callable.function(callable.udata) |
147 | 150 |
|
@@ -333,7 +336,6 @@ elif defined(windows): |
333 | 336 | trackers: initTable[string, TrackerBase](), |
334 | 337 | counters: initTable[string, TrackerCounter]() |
335 | 338 | ) |
336 | | - res.callbacks.addLast(SentinelCallback) |
337 | 339 | initAPI(res) |
338 | 340 | res |
339 | 341 |
|
@@ -581,16 +583,13 @@ elif defined(windows): |
581 | 583 |
|
582 | 584 | proc poll*() = |
583 | 585 | let loop = getThreadDispatcher() |
| 586 | + loop.preparePoll() |
| 587 | + |
584 | 588 | var |
585 | 589 | curTime = Moment.now() |
586 | 590 | curTimeout = DWORD(0) |
587 | 591 | events: array[MaxEventsCount, osdefs.OVERLAPPED_ENTRY] |
588 | 592 |
|
589 | | - # On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`, |
590 | | - # complete pending work of the outer `processCallbacks` call. |
591 | | - # On non-reentrant `poll` calls, this only removes sentinel element. |
592 | | - processCallbacks(loop) |
593 | | - |
594 | 593 | # Moving expired timers to `loop.callbacks` and calculate timeout |
595 | 594 | loop.processTimersGetTimeout(curTimeout) |
596 | 595 |
|
@@ -656,14 +655,10 @@ elif defined(windows): |
656 | 655 | # We move tick callbacks to `loop.callbacks` always. |
657 | 656 | processTicks(loop) |
658 | 657 |
|
659 | | - # All callbacks which will be added during `processCallbacks` will be |
660 | | - # scheduled after the sentinel and are processed on next `poll()` call. |
661 | | - loop.callbacks.addLast(SentinelCallback) |
| 658 | + # Process the callbacks currently scheduled - new callbacks scheduled during |
| 659 | + # callback execution will run in the next poll iteration |
662 | 660 | processCallbacks(loop) |
663 | 661 |
|
664 | | - # All callbacks done, skip `processCallbacks` at start. |
665 | | - loop.callbacks.addFirst(SentinelCallback) |
666 | | - |
667 | 662 | proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = |
668 | 663 | ## Closes a socket and ensures that it is unregistered. |
669 | 664 | let loop = getThreadDispatcher() |
@@ -754,7 +749,6 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or |
754 | 749 | trackers: initTable[string, TrackerBase](), |
755 | 750 | counters: initTable[string, TrackerCounter]() |
756 | 751 | ) |
757 | | - res.callbacks.addLast(SentinelCallback) |
758 | 752 | initAPI(res) |
759 | 753 | res |
760 | 754 |
|
@@ -1010,14 +1004,11 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or |
1010 | 1004 | proc poll*() {.gcsafe.} = |
1011 | 1005 | ## Perform single asynchronous step. |
1012 | 1006 | let loop = getThreadDispatcher() |
| 1007 | + loop.preparePoll() |
| 1008 | + |
1013 | 1009 | var curTime = Moment.now() |
1014 | 1010 | var curTimeout = 0 |
1015 | 1011 |
|
1016 | | - # On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`, |
1017 | | - # complete pending work of the outer `processCallbacks` call. |
1018 | | - # On non-reentrant `poll` calls, this only removes sentinel element. |
1019 | | - processCallbacks(loop) |
1020 | | - |
1021 | 1012 | # Moving expired timers to `loop.callbacks` and calculate timeout. |
1022 | 1013 | loop.processTimersGetTimeout(curTimeout) |
1023 | 1014 |
|
@@ -1064,14 +1055,10 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or |
1064 | 1055 | # We move tick callbacks to `loop.callbacks` always. |
1065 | 1056 | processTicks(loop) |
1066 | 1057 |
|
1067 | | - # All callbacks which will be added during `processCallbacks` will be |
1068 | | - # scheduled after the sentinel and are processed on next `poll()` call. |
1069 | | - loop.callbacks.addLast(SentinelCallback) |
| 1058 | + # Process the callbacks currently scheduled - new callbacks scheduled during |
| 1059 | + # callback execution will run in the next poll iteration |
1070 | 1060 | processCallbacks(loop) |
1071 | 1061 |
|
1072 | | - # All callbacks done, skip `processCallbacks` at start. |
1073 | | - loop.callbacks.addFirst(SentinelCallback) |
1074 | | - |
1075 | 1062 | else: |
1076 | 1063 | proc initAPI() = discard |
1077 | 1064 | proc globalInit() = discard |
|
0 commit comments