From f830a776ab5b82a9a9939be6a52e50788c98855b Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Fri, 3 Mar 2023 15:24:32 -0500 Subject: [PATCH] Update trade message format --- piker/data/cryptofeeds.py | 80 +++++++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/piker/data/cryptofeeds.py b/piker/data/cryptofeeds.py index 727c3a3c..7e6e9de2 100644 --- a/piker/data/cryptofeeds.py +++ b/piker/data/cryptofeeds.py @@ -101,33 +101,46 @@ async def mk_stream_quotes( sym = symbols[0] - async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan): + async with ( + open_cached_client(exchange.lower()) as client, + send_chan as send_chan + ): pairs = await client.cache_pairs() pair_data = pairs[sym] async with maybe_open_price_feed(pair_data, exchange, channels) as stream: + init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written sym: { "symbol_info": {"asset_type": "crypto", "price_tick_size": 0.0005}, "shm_write_opts": {"sum_tick_vml": False}, "fqsn": sym, }, } - - # broker schemas to validate symbol data quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []} + task_status.started((init_msgs, quote_msg)) feed_is_live.set() - - async for typ, quote in stream: - print(f'streaming {typ} quote: {quote}') - topic = quote["symbol"] - await send_chan.send({topic: quote}) - + # try: + # async for typ, quote in stream: + # print(f'streaming {typ} quote: {quote}') + # topic = quote["symbobl"] + # await send_chan.send({topic: quote}) + # finally: + # breakpoint() + + while True: + with trio.move_on_after(4) as cancel_scope: + log.warning(f'WAITING FOR MESSAGE') + msg = await stream.receive() + log.warning(f'RECEIVED MSG: {msg}') + topic = msg["symbol"] + await send_chan.send({topic: msg}) + log.warning(f'SENT TO CHAN') + if cancel_scope.cancelled_caught: + await tractor.breakpoint() @acm async def maybe_open_price_feed( @@ -144,10 +157,7 @@ async def maybe_open_price_feed( }, key=pair_data["name"], ) as (cache_hit, feed): - if cache_hit: - yield broadcast_receiver(feed, 10) - else: - yield feed + yield feed @acm @@ -189,32 +199,37 @@ async def aio_price_feed_relay( to_trio: trio.abc.SendChannel, ) -> None: async def _trade(data: dict, receipt_timestamp): - print(f' trade data: {data}') - to_trio.send_nowait( - ( + data = data.to_dict() + message = ( "trade", { - "symbol": cf_sym_to_fqsn(data.symbol), - "last": float(data.to_dict()['price']), + "symbol": cf_sym_to_fqsn(data['symbol']), + "last": float(data['price']), "broker_ts": time.time(), - "data": data.to_dict(), - "receipt": receipt_timestamp, + "ticks": [{ + 'type': 'trade', + 'price': float(data['price']), + 'size': float(data['amount']), + 'broker_ts': receipt_timestamp + }], }, ) - ) + print(f'trade message: {message}') + # try: + to_trio.send_nowait(message) + # except trio.WouldBlock as e: + #breakpoint() async def _l1(data: dict, receipt_timestamp): - print(f'l2 data: {data}') bid = data.book.to_dict()['bid'] ask = data.book.to_dict()['ask'] l1_ask_price, l1_ask_size = next(iter(ask.items())) l1_bid_price, l1_bid_size = next(iter(bid.items())) - - to_trio.send_nowait( - ( + message = ( "l1", { "symbol": cf_sym_to_fqsn(data.symbol), + "broker_ts": time.time(), "ticks": [ { "type": "bid", @@ -239,7 +254,10 @@ async def aio_price_feed_relay( ] } ) - ) + try: + to_trio.send_nowait(message) + except trio.WouldBlock as e: + print(e) fh.add_feed( exchange, @@ -249,10 +267,8 @@ async def aio_price_feed_relay( ) if not fh.running: - try: - fh.run(start_loop=False, install_signal_handlers=False) - except BaseExceptionGroup as e: - breakpoint() + fh.run(start_loop=False, install_signal_handlers=False) + # sync with trio to_trio.send_nowait(None)