aio_open_interest_feed_relay

max_pain_deribit
Nelson Torres 2024-11-26 15:15:38 -03:00
parent 17249205c9
commit 564cd63014
1 changed files with 62 additions and 0 deletions

View File

@ -769,6 +769,68 @@ async def maybe_open_price_feed(
yield feed
async def aio_open_interest_feed_relay(
fh: FeedHandler,
instruments: list,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(
trade: Trade, # cryptofeed, NOT ours from `.venues`!
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
# Get timestamp and convert it to isoformat
date = (datetime.utcfromtimestamp(trade.timestamp)).isoformat()
print('Trade...')
print(date)
print(trade)
print('=======================')
to_trio.send_nowait(('trade', trade))
# trade and oi are user defined functions that
# will be called when trade and open interest updates are received
# data type is not dict, is an object: cryptofeed.types.OpenINterest
async def _oi(
oi: OpenInterest,
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side.
'''
# Get timestamp and convert it to isoformat
date = (datetime.utcfromtimestamp(oi.timestamp)).isoformat()
print('>>>> Open Interest...')
print(date)
print(oi)
print('==========================')
to_trio.send_nowait(('oi', oi))
callbacks = {TRADES: _trade, OPEN_INTEREST: _oi}
fh.add_feed(
DERIBIT,
channels=[TRADES, OPEN_INTEREST],
symbols=instruments,
callbacks=callbacks
)
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False
)
# sync with trio
to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf'))
# TODO, move all to `.broker` submod!
# async def aio_order_feed_relay(