WIP - ensure `asyncio` pumps the event loop each send

small_kucoin_fixes
Tyler Goodlet 2023-03-03 16:24:44 -05:00 committed by jaredgoldman
parent c751c36a8b
commit 8e91e215b3
2 changed files with 78 additions and 34 deletions

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Jared Goldman (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Jared Goldman (in stewardship for piker0) # Copyright (C) Jared Goldman (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -103,13 +103,16 @@ async def mk_stream_quotes(
async with ( async with (
open_cached_client(exchange.lower()) as client, open_cached_client(exchange.lower()) as client,
send_chan as send_chan # send_chan as send_chan,
): ):
pairs = await client.cache_pairs() pairs = await client.cache_pairs()
pair_data = pairs[sym] pair_data = pairs[sym]
async with maybe_open_price_feed(pair_data, exchange, channels) as stream: async with maybe_open_price_feed(
pair_data,
exchange,
channels,
) as stream:
init_msgs = { init_msgs = {
sym: { sym: {
@ -121,30 +124,41 @@ async def mk_stream_quotes(
quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []} quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []}
task_status.started((init_msgs, quote_msg)) task_status.started((init_msgs, quote_msg))
feed_is_live.set() feed_is_live.set()
async for typ, quote in stream:
topic = quote["symbol"]
await send_chan.send({topic: quote})
log.info(
f'sending {typ} quote:\n'
f'{quote}'
)
# try: # try:
# async for typ, quote in stream:
# print(f'streaming {typ} quote: {quote}')
# topic = quote["symbobl"]
# await send_chan.send({topic: quote})
# finally: # finally:
# breakpoint() # breakpoint()
while True: # while True:
with trio.move_on_after(4) as cancel_scope: # with trio.move_on_after(16) as cancel_scope:
log.warning(f'WAITING FOR MESSAGE')
msg = await stream.receive() # log.warning(f'WAITING FOR MESSAGE')
log.warning(f'RECEIVED MSG: {msg}') # typ, quote = await stream.receive()
topic = msg["symbol"]
await send_chan.send({topic: msg}) # log.warning(f'RECEIVED MSG: {quote}')
log.warning(f'SENT TO CHAN')
if cancel_scope.cancelled_caught: # topic = quote["symbol"]
await tractor.breakpoint() # await send_chan.send({topic: quote})
# log.warning(f'SENT TO CHAN')
# if cancel_scope.cancelled_caught:
# await tractor.breakpoint()
@acm @acm
async def maybe_open_price_feed( async def maybe_open_price_feed(
pair_data: Symbol, exchange: str, channels pair_data: Symbol,
exchange: str,
channels,
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
# TODO: ensure we can dynamically pass down args here # TODO: ensure we can dynamically pass down args here
@ -166,7 +180,13 @@ async def open_price_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler(exchange) as fh: async with maybe_open_feed_handler(exchange) as fh:
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
partial(aio_price_feed_relay, pair_data, exchange, channels, fh) partial(
aio_price_feed_relay,
pair_data,
exchange,
channels,
fh,
)
) as (first, chan): ) as (first, chan):
yield chan yield chan
@ -195,9 +215,15 @@ async def aio_price_feed_relay(
exchange: str, exchange: str,
channels: list[str], channels: list[str],
fh: FeedHandler, fh: FeedHandler,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
# sync with trio
to_trio.send_nowait(None)
async def _trade(data: dict, receipt_timestamp): async def _trade(data: dict, receipt_timestamp):
data = data.to_dict() data = data.to_dict()
message = ( message = (
@ -214,13 +240,23 @@ async def aio_price_feed_relay(
}], }],
}, },
) )
print(f'trade message: {message}') try:
# try:
to_trio.send_nowait(message) to_trio.send_nowait(message)
# except trio.WouldBlock as e: await asyncio.sleep(0.001)
#breakpoint() except trio.WouldBlock as e:
log.exception(
'l1: OVERRUN ASYNCIO -> TRIO\n'
f'TO_TRIO.stats -> {to_trio.statistics()}'
)
await asyncio.sleep(0)
async def _l1(
data: dict,
receipt_timestamp: str | None,
) -> None:
log.info(f'RECV L1 {receipt_timestamp}')
async def _l1(data: dict, receipt_timestamp):
bid = data.book.to_dict()['bid'] bid = data.book.to_dict()['bid']
ask = data.book.to_dict()['ask'] ask = data.book.to_dict()['ask']
l1_ask_price, l1_ask_size = next(iter(ask.items())) l1_ask_price, l1_ask_size = next(iter(ask.items()))
@ -256,8 +292,16 @@ async def aio_price_feed_relay(
) )
try: try:
to_trio.send_nowait(message) to_trio.send_nowait(message)
await asyncio.sleep(0.001)
except trio.WouldBlock as e: except trio.WouldBlock as e:
print(e) log.exception(
'l1: OVERRUN ASYNCIO -> TRIO\n'
f'TO_TRIO.stats -> {to_trio.statistics()}'
)
await asyncio.sleep(0)
# breakpoint()
# raise
fh.add_feed( fh.add_feed(
exchange, exchange,
@ -269,7 +313,4 @@ async def aio_price_feed_relay(
if not fh.running: if not fh.running:
fh.run(start_loop=False, install_signal_handlers=False) fh.run(start_loop=False, install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float("inf")) await asyncio.sleep(float("inf"))