From 8e91e215b3d142d31308cad4d4f2011b42864114 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Mar 2023 16:24:44 -0500 Subject: [PATCH] WIP - ensure `asyncio` pumps the event loop each send --- piker/brokers/kucoin.py | 3 ++ piker/data/cryptofeeds.py | 109 ++++++++++++++++++++++++++------------ 2 files changed, 78 insertions(+), 34 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index edf5d508..14929d53 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -1,3 +1,6 @@ +# piker: trading gear for hackers +# Copyright (C) Jared Goldman (in stewardship for pikers) + # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or diff --git a/piker/data/cryptofeeds.py b/piker/data/cryptofeeds.py index 7e6e9de2..5605993d 100644 --- a/piker/data/cryptofeeds.py +++ b/piker/data/cryptofeeds.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Jared Goldman (in stewardship for piker0) +# Copyright (C) Jared Goldman (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -102,14 +102,17 @@ async def mk_stream_quotes( sym = symbols[0] async with ( - open_cached_client(exchange.lower()) as client, - send_chan as send_chan + open_cached_client(exchange.lower()) as client, + # send_chan as send_chan, ): pairs = await client.cache_pairs() - pair_data = pairs[sym] - async with maybe_open_price_feed(pair_data, exchange, channels) as stream: + async with maybe_open_price_feed( + pair_data, + exchange, + channels, + ) as stream: init_msgs = { sym: { @@ -121,30 +124,41 @@ async def mk_stream_quotes( quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []} task_status.started((init_msgs, quote_msg)) - feed_is_live.set() + + async for typ, quote in stream: + topic = quote["symbol"] + await send_chan.send({topic: quote}) + log.info( + f'sending {typ} quote:\n' + f'{quote}' + ) # try: - # async for typ, quote in stream: - # print(f'streaming {typ} quote: {quote}') - # topic = quote["symbobl"] - # await send_chan.send({topic: quote}) # finally: # breakpoint() - - while True: - with trio.move_on_after(4) as cancel_scope: - log.warning(f'WAITING FOR MESSAGE') - msg = await stream.receive() - log.warning(f'RECEIVED MSG: {msg}') - topic = msg["symbol"] - await send_chan.send({topic: msg}) - log.warning(f'SENT TO CHAN') - if cancel_scope.cancelled_caught: - await tractor.breakpoint() + + # while True: + # with trio.move_on_after(16) as cancel_scope: + + # log.warning(f'WAITING FOR MESSAGE') + # typ, quote = await stream.receive() + + # log.warning(f'RECEIVED MSG: {quote}') + + # topic = quote["symbol"] + # await send_chan.send({topic: quote}) + + # log.warning(f'SENT TO CHAN') + + # if cancel_scope.cancelled_caught: + # await tractor.breakpoint() @acm async def maybe_open_price_feed( - pair_data: Symbol, exchange: str, channels + pair_data: Symbol, + exchange: str, + channels, + ) -> trio.abc.ReceiveStream: # TODO: add a predicate to maybe_open_context # TODO: ensure we can dynamically pass down args here @@ -166,7 +180,13 @@ async def open_price_feed( ) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler(exchange) as fh: async with to_asyncio.open_channel_from( - partial(aio_price_feed_relay, pair_data, exchange, channels, fh) + partial( + aio_price_feed_relay, + pair_data, + exchange, + channels, + fh, + ) ) as (first, chan): yield chan @@ -195,9 +215,15 @@ async def aio_price_feed_relay( exchange: str, channels: list[str], fh: FeedHandler, + from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, + ) -> None: + + # sync with trio + to_trio.send_nowait(None) + async def _trade(data: dict, receipt_timestamp): data = data.to_dict() message = ( @@ -214,18 +240,28 @@ async def aio_price_feed_relay( }], }, ) - print(f'trade message: {message}') - # try: - to_trio.send_nowait(message) - # except trio.WouldBlock as e: - #breakpoint() + try: + to_trio.send_nowait(message) + await asyncio.sleep(0.001) + except trio.WouldBlock as e: + log.exception( + 'l1: OVERRUN ASYNCIO -> TRIO\n' + f'TO_TRIO.stats -> {to_trio.statistics()}' + + ) + await asyncio.sleep(0) + + async def _l1( + data: dict, + receipt_timestamp: str | None, + ) -> None: + log.info(f'RECV L1 {receipt_timestamp}') - async def _l1(data: dict, receipt_timestamp): bid = data.book.to_dict()['bid'] ask = data.book.to_dict()['ask'] l1_ask_price, l1_ask_size = next(iter(ask.items())) l1_bid_price, l1_bid_size = next(iter(bid.items())) - message = ( + message = ( "l1", { "symbol": cf_sym_to_fqsn(data.symbol), @@ -256,8 +292,16 @@ async def aio_price_feed_relay( ) try: to_trio.send_nowait(message) + await asyncio.sleep(0.001) except trio.WouldBlock as e: - print(e) + log.exception( + 'l1: OVERRUN ASYNCIO -> TRIO\n' + f'TO_TRIO.stats -> {to_trio.statistics()}' + + ) + await asyncio.sleep(0) + # breakpoint() + # raise fh.add_feed( exchange, @@ -268,8 +312,5 @@ async def aio_price_feed_relay( if not fh.running: fh.run(start_loop=False, install_signal_handlers=False) - - # sync with trio - to_trio.send_nowait(None) await asyncio.sleep(float("inf"))