Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions chronos/asyncsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ proc popLastImpl[T](aq: AsyncQueue[T]): T =
aq.putters.wakeupNext()
res

proc peakFirstImpl[T](aq: AsyncQueue[T]): T =
aq.queue.peekFirst()

proc peakLastImpl[T](aq: AsyncQueue[T]): T =
aq.queue.peekLast()

proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
## Put an item ``item`` to the beginning of the queue ``aq`` immediately.
Expand Down Expand Up @@ -293,6 +299,26 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.popLastImpl()

proc peakFirstNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Get an item from the beginning of the queue ``aq`` immediately but without
## removing it.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.peakFirstImpl()

proc peakLastNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Get an item from the end of the queue ``aq`` immediately but without
## removing it.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.peakLastImpl()

proc addFirst*[T](aq: AsyncQueue[T], item: T) {.
async: (raises: [CancelledError]).} =
## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full,
Expand Down Expand Up @@ -357,6 +383,42 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.
raise exc
aq.popLastImpl()

proc peakFirst*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Return an ``item`` without removing it from the beginning of the queue
## ``aq``. If the queue is empty, wait until an item is available.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.peakFirst")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
let res = aq.peakFirstImpl()
aq.getters.wakeupNext()
res

proc peakLast*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Return an ``item`` without removing it from the end of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.peakLast")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
let res = aq.peakLastImpl()
aq.getters.wakeupNext()
res

proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
## Alias of ``addLastNoWait()``.
Expand All @@ -367,6 +429,11 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T {.
## Alias of ``popFirstNoWait()``.
aq.popFirstNoWait()

proc peakNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Alias of ``peakFirstNoWait()``.
aq.peakFirstNoWait()

proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``addLast()``.
Expand All @@ -377,6 +444,11 @@ proc get*[T](aq: AsyncQueue[T]): Future[T] {.
## Alias of ``popFirst()``.
aq.popFirst()

proc peak*[T](aq: AsyncQueue[T]): Future[T] {.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
proc peak*[T](aq: AsyncQueue[T]): Future[T] {.
proc peek*[T](aq: AsyncQueue[T]): Future[T] {.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, will fix the spelling shortly.

async: (raw: true, raises: [CancelledError]).} =
## Alias of ``peakFirst()``.
aq.peakFirst()

proc clear*[T](aq: AsyncQueue[T]) {.inline.} =
## Clears all elements of queue ``aq``.
aq.queue.clear()
Expand Down
13 changes: 13 additions & 0 deletions tests/testsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,19 @@ suite "Asynchronous sync primitives test suite":
test "AsyncQueue() contains test":
check test9() == true

test "AsyncQueue() peak test":
let q = newAsyncQueue[int]()
q.putNoWait(1)
q.putNoWait(2)

check:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there needs to be a test here that ensures that the following also works:

let a = q.peekFirst()
let b = q.popFirst()

both in that order and in the reverse order - where peek-after-pop should probably not trigger

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more tests as suggested.

q.peakNoWait() == 1
q.peakFirstNoWait() == 1
q.peakLastNoWait() == 2
(waitFor q.peak()) == 1
(waitFor q.peakFirst()) == 1
(waitFor q.peakLast()) == 2

test "AsyncEventQueue() behavior test":
let eventQueue = newAsyncEventQueue[int]()
let key = eventQueue.register()
Expand Down