From 6669ba65909228677cf259fbb84922d38f77cbb7 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Tue, 23 Aug 2022 16:15:35 -0300 Subject: [PATCH] Switch back to using async for and dont install signal handlers on cryptofeed --- piker/brokers/deribit/api.py | 8 ++++++-- piker/brokers/deribit/feed.py | 15 +++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 5e2ed10b..32db7c01 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -613,7 +613,9 @@ class CryptoFeedRelay: }) if not self._fh.running: - self._fh.run(start_loop=False) + self._fh.run( + start_loop=False, + install_signal_handlers=False) self._loop = asyncio.get_event_loop() # sync with trio @@ -664,7 +666,9 @@ class CryptoFeedRelay: }) if not self._fh.running: - self._fh.run(start_loop=False) + self._fh.run( + start_loop=False, + install_signal_handlers=False) self._loop = asyncio.get_event_loop() # sync with trio diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 8d2967db..b3daed7d 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -154,10 +154,10 @@ async def stream_quotes( if len(last_trades) == 0: last_trade = None - while not last_trade: - typ, quote = await stream.receive() + async for typ, quote in stream: if typ == 'trade': last_trade = Trade(**(quote['data'])) + break else: last_trade = Trade(**(last_trades[0])) @@ -177,14 +177,9 @@ async def stream_quotes( feed_is_live.set() - try: - while True: - typ, quote = await stream.receive() - topic = quote['symbol'] - await send_chan.send({topic: quote}) - - except trio.ClosedResourceError: - ... + async for typ, quote in stream: + topic = quote['symbol'] + await send_chan.send({topic: quote}) @tractor.context