From 564cd63014883f09b96611ac3e552531fb39ef4a Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Tue, 26 Nov 2024 15:15:38 -0300 Subject: [PATCH] aio_open_interest_feed_relay --- piker/brokers/deribit/api.py | 62 ++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 67301911..15a9f88f 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -769,6 +769,68 @@ async def maybe_open_price_feed( yield feed +async def aio_open_interest_feed_relay( + fh: FeedHandler, + instruments: list, + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> None: + async def _trade( + trade: Trade, # cryptofeed, NOT ours from `.venues`! + receipt_timestamp: int, + ) -> None: + ''' + Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. + + ''' + # Get timestamp and convert it to isoformat + date = (datetime.utcfromtimestamp(trade.timestamp)).isoformat() + print('Trade...') + print(date) + print(trade) + print('=======================') + to_trio.send_nowait(('trade', trade)) + + # trade and oi are user defined functions that + # will be called when trade and open interest updates are received + # data type is not dict, is an object: cryptofeed.types.OpenINterest + async def _oi( + oi: OpenInterest, + receipt_timestamp: int, + ) -> None: + ''' + Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side. + + ''' + # Get timestamp and convert it to isoformat + date = (datetime.utcfromtimestamp(oi.timestamp)).isoformat() + print('>>>> Open Interest...') + print(date) + print(oi) + print('==========================') + to_trio.send_nowait(('oi', oi)) + + callbacks = {TRADES: _trade, OPEN_INTEREST: _oi} + fh.add_feed( + DERIBIT, + channels=[TRADES, OPEN_INTEREST], + symbols=instruments, + callbacks=callbacks + ) + + if not fh.running: + fh.run( + start_loop=False, + install_signal_handlers=False + ) + + # sync with trio + to_trio.send_nowait(None) + + # run until cancelled + await asyncio.sleep(float('inf')) + + # TODO, move all to `.broker` submod! # async def aio_order_feed_relay(