Skip to content

Commit df8561b

Browse files
committed
Add example to illustrate how to achieve concurrency with blocking websocket subscriptions by using a simple thread an wrapping the problematic async call.
1 parent e2633f0 commit df8561b

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import asyncio
2+
import threading
3+
from typing import Any, Dict
4+
5+
from dydx_v4_client.indexer.candles_resolution import CandlesResolution
6+
from dydx_v4_client.indexer.socket.websocket import IndexerSocket
7+
from dydx_v4_client.network import TESTNET
8+
9+
ETH_USD = "ETH-USD"
10+
RESOLUTION = CandlesResolution.ONE_MINUTE
11+
12+
13+
class LiveCandleRepresentation:
14+
def __init__(self):
15+
self._ws = IndexerSocket(
16+
TESTNET.websocket_indexer,
17+
on_message=self.handle_message,
18+
)
19+
self._count = 1
20+
self.representation: Dict[str, Any] = {}
21+
22+
def wrap_async_func(self) -> None:
23+
# NOTE: ._ws.connect() is a blocking async function call
24+
asyncio.run(self._ws.connect())
25+
26+
def start_websocket_connection(self) -> None:
27+
t = threading.Thread(target=self.wrap_async_func)
28+
t.start()
29+
30+
def handle_message(self, ws: IndexerSocket, message: dict):
31+
if message["type"] == "connected":
32+
ws.candles.subscribe(ETH_USD, RESOLUTION)
33+
34+
if message["type"] == "channel_batch_data":
35+
if candle_dict := message["contents"][0]:
36+
self.representation = candle_dict
37+
print(f"Received {RESOLUTION.value}-candle update #{self._count}.\n")
38+
self._count += 1
39+
40+
41+
async def some_candle_query(live_candle: LiveCandleRepresentation):
42+
while True:
43+
if candle := live_candle.representation:
44+
print(f"Query current candle: {candle}\n")
45+
await asyncio.sleep(20) # Query every 20 seconds
46+
47+
48+
async def test():
49+
live_candle = LiveCandleRepresentation()
50+
live_candle.start_websocket_connection()
51+
52+
tasks = [asyncio.create_task(some_candle_query(live_candle))]
53+
await asyncio.gather(*tasks)
54+
55+
56+
asyncio.run(test())

0 commit comments

Comments
 (0)