diff --git a/piker/brokers/deribit.py b/piker/brokers/deribit.py index 98b2a12c..4c43e4dd 100644 --- a/piker/brokers/deribit.py +++ b/piker/brokers/deribit.py @@ -271,7 +271,6 @@ async def get_client() -> Client: async def open_aio_cryptofeed_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, - event_consumers: dict[str, trio.Event], instruments: List[str] = [] ) -> None: @@ -300,31 +299,39 @@ async def open_aio_cryptofeed_relay( instruments = [format_sym(i) for i in instruments] async def trade_cb(data: dict, receipt_timestamp): - to_trio.send_nowait({ - 'type': 'trade', - data.symbol: data.to_dict(), - 'receipt': receipt_timestamp}) + breakpoint() + # to_trio.send_nowait(('trade', { + # 'symbol': data.symbol.lower(), + # 'last': data. + # 'broker_ts': time.time(), + # 'data': data.to_dict(), + # 'receipt': receipt_timestamp})) async def l1_book_cb(data: dict, receipt_timestamp): - to_trio.send_nowait({ - 'type': 'l1_book', - data.symbol: data.to_dict(), - 'receipt': receipt_timestamp}) + to_trio.send_nowait(('l1', { + 'symbol': data.symbol.lower(), + 'ticks': [ + {'type': 'bid', + 'price': float(data.bid_price), 'size': float(data.bid_size)}, + {'type': 'bsize', + 'price': float(data.bid_price), 'size': float(data.bid_size)}, + {'type': 'ask', + 'price': float(data.ask_price), 'size': float(data.ask_size)}, + {'type': 'asize', + 'price': float(data.ask_price), 'size': float(data.ask_size)} + ]})) fh = FeedHandler(config=conf) fh.run(start_loop=False) fh.add_feed( DERIBIT, - channels=[TRADES], + channels=[L1_BOOK, TRADES], symbols=instruments, - callbacks={TRADES: TradeCallback(trade_cb)}) - - fh.add_feed( - DERIBIT, - channels=[L1_BOOK], - symbols=instruments, - callbacks={L1_BOOK: L1BookCallback(l1_book_cb)}) + callbacks={ + L1_BOOK: L1BookCallback(l1_book_cb), + TRADES: TradeCallback(trade_cb) + }) # sync with trio to_trio.send_nowait(None) @@ -332,33 +339,26 @@ async def open_aio_cryptofeed_relay( await from_trio.get() -@acm -async def open_cryptofeeds(instruments: List[str]): +async def open_cryptofeeds( + instruments: List[str], + to_chart: trio.abc.SendChannel, - # try: - event_table = {} - - async with ( - to_asyncio.open_channel_from( - open_aio_cryptofeed_relay, - event_consumers=event_table, - instruments=instruments - ) as (first, chan), - trio.open_nursery() as n, - ): + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, +): + async with to_asyncio.open_channel_from( + open_aio_cryptofeed_relay, + instruments=instruments, + ) as (first, chan): assert first is None - async def relay_events(): - async with chan.subscribe() as msg_stream: - async for msg in msg_stream: - print(msg) - - n.start_soon(relay_events) - - yield chan - await chan.send(None) + async with chan.subscribe() as msg_stream: + task_status.started() + async for msg in msg_stream: + await to_chart.send(msg) + @acm async def open_history_client( @@ -391,12 +391,13 @@ async def open_history_client( async def backfill_bars( - instrument: str, + symbol: str, shm: ShmArray, # type: ignore # noqa task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Fill historical bars into shared mem / storage afap. """ + instrument = symbol with trio.CancelScope() as cs: async with open_cached_client('deribit') as client: bars = await client.bars(instrument) @@ -418,21 +419,41 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - sym_infos = {} - uid = 0 + sym = symbols[0] + to_chart, from_feed = trio.open_memory_channel(1) async with ( open_cached_client('deribit') as client, send_chan as send_chan, - open_cryptofeeds(symbols) as feed_chan + trio.open_nursery() as n ): + await n.start( + open_cryptofeeds, symbols, to_chart) + + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': {}, + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, + }, + } # keep client cached for real-time section cache = await client.cache_symbols() - async with feed_chan.subscribe() as msg_stream: - async for msg in msg_stream: - print(msg) + async with from_feed: + typ, quote = await anext(from_feed) + + while typ != 'trade': + typ, quote = await anext(from_feed) + + task_status.started((init_msgs, quote)) + + async for typ, msg in from_feed: + topic = msg['symbol'] + await send_chan.send({topic: msg}) @tractor.context