diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index 5fab9b2ab..505915b3c 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -257,6 +257,12 @@ proc popLastImpl[T](aq: AsyncQueue[T]): T = aq.putters.wakeupNext() res +proc peekFirstImpl[T](aq: AsyncQueue[T]): T = + aq.queue.peekFirst() + +proc peekLastImpl[T](aq: AsyncQueue[T]): T = + aq.queue.peekLast() + proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {. raises: [AsyncQueueFullError].} = ## Put an item ``item`` to the beginning of the queue ``aq`` immediately. @@ -293,6 +299,26 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {. raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") aq.popLastImpl() +proc peekFirstNoWait*[T](aq: AsyncQueue[T]): T {. + raises: [AsyncQueueEmptyError].} = + ## Get an item from the beginning of the queue ``aq`` immediately but without + ## removing it. + ## + ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. + if aq.empty(): + raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") + aq.peekFirstImpl() + +proc peekLastNoWait*[T](aq: AsyncQueue[T]): T {. + raises: [AsyncQueueEmptyError].} = + ## Get an item from the end of the queue ``aq`` immediately but without + ## removing it. + ## + ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. + if aq.empty(): + raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") + aq.peekLastImpl() + proc addFirst*[T](aq: AsyncQueue[T], item: T) {. async: (raises: [CancelledError]).} = ## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full, @@ -357,6 +383,42 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {. raise exc aq.popLastImpl() +proc peekFirst*[T](aq: AsyncQueue[T]): Future[T] {. + async: (raises: [CancelledError]).} = + ## Return an ``item`` without removing it from the beginning of the queue + ## ``aq``. If the queue is empty, wait until an item is available. + while aq.empty(): + let getter = + Future[void].Raising([CancelledError]).init("AsyncQueue.peekFirst") + aq.getters.add(getter) + try: + await getter + except CancelledError as exc: + if not(aq.empty()) and not(getter.cancelled()): + aq.getters.wakeupNext() + raise exc + let res = aq.peekFirstImpl() + aq.getters.wakeupNext() + res + +proc peekLast*[T](aq: AsyncQueue[T]): Future[T] {. + async: (raises: [CancelledError]).} = + ## Return an ``item`` without removing it from the end of the queue ``aq``. + ## If the queue is empty, wait until an item is available. + while aq.empty(): + let getter = + Future[void].Raising([CancelledError]).init("AsyncQueue.peekLast") + aq.getters.add(getter) + try: + await getter + except CancelledError as exc: + if not(aq.empty()) and not(getter.cancelled()): + aq.getters.wakeupNext() + raise exc + let res = aq.peekLastImpl() + aq.getters.wakeupNext() + res + proc putNoWait*[T](aq: AsyncQueue[T], item: T) {. raises: [AsyncQueueFullError].} = ## Alias of ``addLastNoWait()``. @@ -367,6 +429,11 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T {. ## Alias of ``popFirstNoWait()``. aq.popFirstNoWait() +proc peekNoWait*[T](aq: AsyncQueue[T]): T {. + raises: [AsyncQueueEmptyError].} = + ## Alias of ``peekFirstNoWait()``. + aq.peekFirstNoWait() + proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {. async: (raw: true, raises: [CancelledError]).} = ## Alias of ``addLast()``. @@ -377,6 +444,11 @@ proc get*[T](aq: AsyncQueue[T]): Future[T] {. ## Alias of ``popFirst()``. aq.popFirst() +proc peek*[T](aq: AsyncQueue[T]): Future[T] {. + async: (raw: true, raises: [CancelledError]).} = + ## Alias of ``peekFirst()``. + aq.peekFirst() + proc clear*[T](aq: AsyncQueue[T]) {.inline.} = ## Clears all elements of queue ``aq``. aq.queue.clear() diff --git a/tests/testsync.nim b/tests/testsync.nim index 4ade20531..85dd19a67 100644 --- a/tests/testsync.nim +++ b/tests/testsync.nim @@ -353,6 +353,57 @@ suite "Asynchronous sync primitives test suite": test "AsyncQueue() contains test": check test9() == true + test "AsyncQueue() peek test": + let q = newAsyncQueue[int]() + q.putNoWait(1) + q.putNoWait(2) + + check: + q.peekNoWait() == 1 + q.peekFirstNoWait() == 1 + q.peekLastNoWait() == 2 + (waitFor q.peek()) == 1 + (waitFor q.peekFirst()) == 1 + (waitFor q.peekLast()) == 2 + + test "AsyncQueue() peek before pop test": + let q = newAsyncQueue[int]() + q.putNoWait(1) + + let + a = q.peekFirst() + b = q.popFirst() + + check: + a.completed() == true + b.completed() == true + a.read() == 1 + b.read() == 1 + q.len() == 0 + + test "AsyncQueue() peek after pop test": + let q = newAsyncQueue[int]() + q.putNoWait(1) + + let + a = q.popFirst() + b = q.peekFirst() + + check: + a.completed() == true + b.completed() == false + a.read() == 1 + q.len() == 0 + + q.putNoWait(2) + poll() + + check: + a.completed() == true + b.completed() == true + b.read() == 2 + q.len() == 1 + test "AsyncEventQueue() behavior test": let eventQueue = newAsyncEventQueue[int]() let key = eventQueue.register()