Switch back to using async for and dont install signal handlers on cryptofeed
parent
73fcd72256
commit
a5481e6746
|
@ -613,7 +613,9 @@ class CryptoFeedRelay:
|
||||||
})
|
})
|
||||||
|
|
||||||
if not self._fh.running:
|
if not self._fh.running:
|
||||||
self._fh.run(start_loop=False)
|
self._fh.run(
|
||||||
|
start_loop=False,
|
||||||
|
install_signal_handlers=False)
|
||||||
self._loop = asyncio.get_event_loop()
|
self._loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
# sync with trio
|
# sync with trio
|
||||||
|
@ -664,7 +666,9 @@ class CryptoFeedRelay:
|
||||||
})
|
})
|
||||||
|
|
||||||
if not self._fh.running:
|
if not self._fh.running:
|
||||||
self._fh.run(start_loop=False)
|
self._fh.run(
|
||||||
|
start_loop=False,
|
||||||
|
install_signal_handlers=False)
|
||||||
self._loop = asyncio.get_event_loop()
|
self._loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
# sync with trio
|
# sync with trio
|
||||||
|
|
|
@ -154,10 +154,10 @@ async def stream_quotes(
|
||||||
|
|
||||||
if len(last_trades) == 0:
|
if len(last_trades) == 0:
|
||||||
last_trade = None
|
last_trade = None
|
||||||
while not last_trade:
|
async for typ, quote in stream:
|
||||||
typ, quote = await stream.receive()
|
|
||||||
if typ == 'trade':
|
if typ == 'trade':
|
||||||
last_trade = Trade(**(quote['data']))
|
last_trade = Trade(**(quote['data']))
|
||||||
|
break
|
||||||
|
|
||||||
else:
|
else:
|
||||||
last_trade = Trade(**(last_trades[0]))
|
last_trade = Trade(**(last_trades[0]))
|
||||||
|
@ -177,14 +177,9 @@ async def stream_quotes(
|
||||||
|
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
|
||||||
try:
|
async for typ, quote in stream:
|
||||||
while True:
|
topic = quote['symbol']
|
||||||
typ, quote = await stream.receive()
|
await send_chan.send({topic: quote})
|
||||||
topic = quote['symbol']
|
|
||||||
await send_chan.send({topic: quote})
|
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
Loading…
Reference in New Issue