WIP - ensure `asyncio` pumps the event loop each send

emit_clear_ticks_only_on_ts_change
Tyler Goodlet 2023-03-03 16:24:44 -05:00 committed by jaredgoldman
parent f830a776ab
commit dc02c115ba
2 changed files with 78 additions and 34 deletions

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Jared Goldman (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Jared Goldman (in stewardship for piker0)
# Copyright (C) Jared Goldman (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -103,13 +103,16 @@ async def mk_stream_quotes(
async with (
open_cached_client(exchange.lower()) as client,
send_chan as send_chan
# send_chan as send_chan,
):
pairs = await client.cache_pairs()
pair_data = pairs[sym]
async with maybe_open_price_feed(pair_data, exchange, channels) as stream:
async with maybe_open_price_feed(
pair_data,
exchange,
channels,
) as stream:
init_msgs = {
sym: {
@ -121,30 +124,41 @@ async def mk_stream_quotes(
quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []}
task_status.started((init_msgs, quote_msg))
feed_is_live.set()
async for typ, quote in stream:
topic = quote["symbol"]
await send_chan.send({topic: quote})
log.info(
f'sending {typ} quote:\n'
f'{quote}'
)
# try:
# async for typ, quote in stream:
# print(f'streaming {typ} quote: {quote}')
# topic = quote["symbobl"]
# await send_chan.send({topic: quote})
# finally:
# breakpoint()
while True:
with trio.move_on_after(4) as cancel_scope:
log.warning(f'WAITING FOR MESSAGE')
msg = await stream.receive()
log.warning(f'RECEIVED MSG: {msg}')
topic = msg["symbol"]
await send_chan.send({topic: msg})
log.warning(f'SENT TO CHAN')
if cancel_scope.cancelled_caught:
await tractor.breakpoint()
# while True:
# with trio.move_on_after(16) as cancel_scope:
# log.warning(f'WAITING FOR MESSAGE')
# typ, quote = await stream.receive()
# log.warning(f'RECEIVED MSG: {quote}')
# topic = quote["symbol"]
# await send_chan.send({topic: quote})
# log.warning(f'SENT TO CHAN')
# if cancel_scope.cancelled_caught:
# await tractor.breakpoint()
@acm
async def maybe_open_price_feed(
pair_data: Symbol, exchange: str, channels
pair_data: Symbol,
exchange: str,
channels,
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
# TODO: ensure we can dynamically pass down args here
@ -166,7 +180,13 @@ async def open_price_feed(
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler(exchange) as fh:
async with to_asyncio.open_channel_from(
partial(aio_price_feed_relay, pair_data, exchange, channels, fh)
partial(
aio_price_feed_relay,
pair_data,
exchange,
channels,
fh,
)
) as (first, chan):
yield chan
@ -195,9 +215,15 @@ async def aio_price_feed_relay(
exchange: str,
channels: list[str],
fh: FeedHandler,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
# sync with trio
to_trio.send_nowait(None)
async def _trade(data: dict, receipt_timestamp):
data = data.to_dict()
message = (
@ -214,13 +240,23 @@ async def aio_price_feed_relay(
}],
},
)
print(f'trade message: {message}')
# try:
try:
to_trio.send_nowait(message)
# except trio.WouldBlock as e:
#breakpoint()
await asyncio.sleep(0.001)
except trio.WouldBlock as e:
log.exception(
'l1: OVERRUN ASYNCIO -> TRIO\n'
f'TO_TRIO.stats -> {to_trio.statistics()}'
)
await asyncio.sleep(0)
async def _l1(
data: dict,
receipt_timestamp: str | None,
) -> None:
log.info(f'RECV L1 {receipt_timestamp}')
async def _l1(data: dict, receipt_timestamp):
bid = data.book.to_dict()['bid']
ask = data.book.to_dict()['ask']
l1_ask_price, l1_ask_size = next(iter(ask.items()))
@ -256,8 +292,16 @@ async def aio_price_feed_relay(
)
try:
to_trio.send_nowait(message)
await asyncio.sleep(0.001)
except trio.WouldBlock as e:
print(e)
log.exception(
'l1: OVERRUN ASYNCIO -> TRIO\n'
f'TO_TRIO.stats -> {to_trio.statistics()}'
)
await asyncio.sleep(0)
# breakpoint()
# raise
fh.add_feed(
exchange,
@ -269,7 +313,4 @@ async def aio_price_feed_relay(
if not fh.running:
fh.run(start_loop=False, install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float("inf"))