Introduce piker protocol in stream_messages

deribit
Guillermo Rodriguez 2022-06-06 15:58:53 -03:00
parent a0b415095a
commit e558e5837e
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
1 changed files with 67 additions and 46 deletions

View File

@ -271,7 +271,6 @@ async def get_client() -> Client:
async def open_aio_cryptofeed_relay( async def open_aio_cryptofeed_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
event_consumers: dict[str, trio.Event],
instruments: List[str] = [] instruments: List[str] = []
) -> None: ) -> None:
@ -300,31 +299,39 @@ async def open_aio_cryptofeed_relay(
instruments = [format_sym(i) for i in instruments] instruments = [format_sym(i) for i in instruments]
async def trade_cb(data: dict, receipt_timestamp): async def trade_cb(data: dict, receipt_timestamp):
to_trio.send_nowait({ breakpoint()
'type': 'trade', # to_trio.send_nowait(('trade', {
data.symbol: data.to_dict(), # 'symbol': data.symbol.lower(),
'receipt': receipt_timestamp}) # 'last': data.
# 'broker_ts': time.time(),
# 'data': data.to_dict(),
# 'receipt': receipt_timestamp}))
async def l1_book_cb(data: dict, receipt_timestamp): async def l1_book_cb(data: dict, receipt_timestamp):
to_trio.send_nowait({ to_trio.send_nowait(('l1', {
'type': 'l1_book', 'symbol': data.symbol.lower(),
data.symbol: data.to_dict(), 'ticks': [
'receipt': receipt_timestamp}) {'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]}))
fh = FeedHandler(config=conf) fh = FeedHandler(config=conf)
fh.run(start_loop=False) fh.run(start_loop=False)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
channels=[TRADES], channels=[L1_BOOK, TRADES],
symbols=instruments, symbols=instruments,
callbacks={TRADES: TradeCallback(trade_cb)}) callbacks={
L1_BOOK: L1BookCallback(l1_book_cb),
fh.add_feed( TRADES: TradeCallback(trade_cb)
DERIBIT, })
channels=[L1_BOOK],
symbols=instruments,
callbacks={L1_BOOK: L1BookCallback(l1_book_cb)})
# sync with trio # sync with trio
to_trio.send_nowait(None) to_trio.send_nowait(None)
@ -332,33 +339,26 @@ async def open_aio_cryptofeed_relay(
await from_trio.get() await from_trio.get()
@acm async def open_cryptofeeds(
async def open_cryptofeeds(instruments: List[str]): instruments: List[str],
to_chart: trio.abc.SendChannel,
# try: # startup sync
event_table = {} task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
):
async with ( async with to_asyncio.open_channel_from(
to_asyncio.open_channel_from( open_aio_cryptofeed_relay,
open_aio_cryptofeed_relay, instruments=instruments,
event_consumers=event_table, ) as (first, chan):
instruments=instruments
) as (first, chan),
trio.open_nursery() as n,
):
assert first is None assert first is None
async def relay_events():
async with chan.subscribe() as msg_stream:
async for msg in msg_stream:
print(msg)
n.start_soon(relay_events)
yield chan
await chan.send(None) await chan.send(None)
async with chan.subscribe() as msg_stream:
task_status.started()
async for msg in msg_stream:
await to_chart.send(msg)
@acm @acm
async def open_history_client( async def open_history_client(
@ -391,12 +391,13 @@ async def open_history_client(
async def backfill_bars( async def backfill_bars(
instrument: str, symbol: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
""" """
instrument = symbol
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
bars = await client.bars(instrument) bars = await client.bars(instrument)
@ -418,21 +419,41 @@ async def stream_quotes(
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
sym_infos = {} sym = symbols[0]
uid = 0 to_chart, from_feed = trio.open_memory_channel(1)
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan, send_chan as send_chan,
open_cryptofeeds(symbols) as feed_chan trio.open_nursery() as n
): ):
await n.start(
open_cryptofeeds, symbols, to_chart)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
# keep client cached for real-time section # keep client cached for real-time section
cache = await client.cache_symbols() cache = await client.cache_symbols()
async with feed_chan.subscribe() as msg_stream: async with from_feed:
async for msg in msg_stream: typ, quote = await anext(from_feed)
print(msg)
while typ != 'trade':
typ, quote = await anext(from_feed)
task_status.started((init_msgs, quote))
async for typ, msg in from_feed:
topic = msg['symbol']
await send_chan.send({topic: msg})
@tractor.context @tractor.context