diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 0ab5e2be..96adac9e 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -557,69 +557,68 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: yield fh +async def aio_price_feed_relay( + fh: FeedHandler, + instrument: Symbol, + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> None: + async def _trade(data: dict, receipt_timestamp): + to_trio.send_nowait(('trade', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'last': data, + 'broker_ts': time.time(), + 'data': data.to_dict(), + 'receipt': receipt_timestamp + })) + + async def _l1(data: dict, receipt_timestamp): + to_trio.send_nowait(('l1', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'ticks': [ + {'type': 'bid', + 'price': float(data.bid_price), 'size': float(data.bid_size)}, + {'type': 'bsize', + 'price': float(data.bid_price), 'size': float(data.bid_size)}, + {'type': 'ask', + 'price': float(data.ask_price), 'size': float(data.ask_size)}, + {'type': 'asize', + 'price': float(data.ask_price), 'size': float(data.ask_size)} + ] + })) + + fh.add_feed( + DERIBIT, + channels=[TRADES, L1_BOOK], + symbols=[piker_sym_to_cb_sym(instrument)], + callbacks={ + TRADES: _trade, + L1_BOOK: _l1 + }) + + if not fh.running: + fh.run( + start_loop=False, + install_signal_handlers=False) + + # sync with trio + to_trio.send_nowait(None) + + await asyncio.sleep(float('inf')) + + @acm async def open_price_feed( - instrument: str -) -> trio.abc.ReceiveStream: - - # XXX: hangs when going into this ctx mngr + instrument: str) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler() as fh: - - async def relay( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - ) -> None: - async def _trade(data: dict, receipt_timestamp): - to_trio.send_nowait(('trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp - })) - - async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait(('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - {'type': 'bid', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'bsize', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'ask', - 'price': float(data.ask_price), 'size': float(data.ask_size)}, - {'type': 'asize', - 'price': float(data.ask_price), 'size': float(data.ask_size)} - ] - })) - - fh.add_feed( - DERIBIT, - channels=[TRADES, L1_BOOK], - symbols=[instrument], - callbacks={ - TRADES: _trade, - L1_BOOK: _l1 - }) - - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) - - # sync with trio - to_trio.send_nowait(None) - - try: - await asyncio.sleep(float('inf')) - - except asyncio.exceptions.CancelledError: - ... - async with to_asyncio.open_channel_from( - relay + partial( + aio_price_feed_relay, + fh, + instrument + ) ) as (first, chan): yield chan @@ -642,51 +641,55 @@ async def maybe_open_price_feed( else: yield feed + + +async def aio_order_feed_relay( + fh: FeedHandler, + instrument: Symbol, + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> None: + async def _fill(data: dict, receipt_timestamp): + breakpoint() + + async def _order_info(data: dict, receipt_timestamp): + breakpoint() + + fh.add_feed( + DERIBIT, + channels=[FILLS, ORDER_INFO], + symbols=[instrument.upper()], + callbacks={ + FILLS: _fill, + ORDER_INFO: _order_info, + }) + + if not fh.running: + fh.run( + start_loop=False, + install_signal_handlers=False) + + # sync with trio + to_trio.send_nowait(None) + + await asyncio.sleep(float('inf')) + + @acm async def open_order_feed( instrument: List[str] ) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - - async def relay( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - ) -> None: - async def _fill(data: dict, receipt_timestamp): - breakpoint() - - async def _order_info(data: dict, receipt_timestamp): - breakpoint() - - fh.add_feed( - DERIBIT, - channels=[FILLS, ORDER_INFO], - symbols=[instrument], - callbacks={ - FILLS: _fill, - ORDER_INFO: _order_info, - }) - - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) - - # sync with trio - to_trio.send_nowait(None) - - try: - await asyncio.sleep(float('inf')) - - except asyncio.exceptions.CancelledError: - ... - async with to_asyncio.open_channel_from( - relay + partial( + aio_order_feed_relay, + fh, + instrument + ) ) as (first, chan): yield chan + @acm async def maybe_open_order_feed( instrument: str @@ -696,7 +699,8 @@ async def maybe_open_order_feed( async with maybe_open_context( acm_func=open_order_feed, kwargs={ - 'instrument': instrument + 'instrument': instrument, + 'fh': fh }, key=f'{instrument}-order', ) as (cache_hit, feed): diff --git a/requirements.txt b/requirements.txt index 91bb8918..8f35a63f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ # we require a pinned dev branch to get some edge features that # are often untested in tractor's CI and/or being tested by us # first before committing as core features in tractor's base. --e git+https://github.com/goodboy/tractor.git@master#egg=tractor +-e git+https://github.com/goodboy/tractor.git@reentrant_moc#egg=tractor # `pyqtgraph` peeps keep breaking, fixing, improving so might as well # pin this to a dev branch that we have more control over especially