Introduce piker protocol in stream_messages
parent
54e989320d
commit
d5ffad01b7
|
@ -271,7 +271,6 @@ async def get_client() -> Client:
|
|||
async def open_aio_cryptofeed_relay(
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
event_consumers: dict[str, trio.Event],
|
||||
instruments: List[str] = []
|
||||
) -> None:
|
||||
|
||||
|
@ -300,31 +299,39 @@ async def open_aio_cryptofeed_relay(
|
|||
instruments = [format_sym(i) for i in instruments]
|
||||
|
||||
async def trade_cb(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait({
|
||||
'type': 'trade',
|
||||
data.symbol: data.to_dict(),
|
||||
'receipt': receipt_timestamp})
|
||||
breakpoint()
|
||||
# to_trio.send_nowait(('trade', {
|
||||
# 'symbol': data.symbol.lower(),
|
||||
# 'last': data.
|
||||
# 'broker_ts': time.time(),
|
||||
# 'data': data.to_dict(),
|
||||
# 'receipt': receipt_timestamp}))
|
||||
|
||||
async def l1_book_cb(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait({
|
||||
'type': 'l1_book',
|
||||
data.symbol: data.to_dict(),
|
||||
'receipt': receipt_timestamp})
|
||||
to_trio.send_nowait(('l1', {
|
||||
'symbol': data.symbol.lower(),
|
||||
'ticks': [
|
||||
{'type': 'bid',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
{'type': 'bsize',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
{'type': 'ask',
|
||||
'price': float(data.ask_price), 'size': float(data.ask_size)},
|
||||
{'type': 'asize',
|
||||
'price': float(data.ask_price), 'size': float(data.ask_size)}
|
||||
]}))
|
||||
|
||||
fh = FeedHandler(config=conf)
|
||||
fh.run(start_loop=False)
|
||||
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[TRADES],
|
||||
channels=[L1_BOOK, TRADES],
|
||||
symbols=instruments,
|
||||
callbacks={TRADES: TradeCallback(trade_cb)})
|
||||
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[L1_BOOK],
|
||||
symbols=instruments,
|
||||
callbacks={L1_BOOK: L1BookCallback(l1_book_cb)})
|
||||
callbacks={
|
||||
L1_BOOK: L1BookCallback(l1_book_cb),
|
||||
TRADES: TradeCallback(trade_cb)
|
||||
})
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
|
@ -332,33 +339,26 @@ async def open_aio_cryptofeed_relay(
|
|||
await from_trio.get()
|
||||
|
||||
|
||||
@acm
|
||||
async def open_cryptofeeds(instruments: List[str]):
|
||||
async def open_cryptofeeds(
|
||||
instruments: List[str],
|
||||
to_chart: trio.abc.SendChannel,
|
||||
|
||||
# try:
|
||||
event_table = {}
|
||||
|
||||
async with (
|
||||
to_asyncio.open_channel_from(
|
||||
# startup sync
|
||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
async with to_asyncio.open_channel_from(
|
||||
open_aio_cryptofeed_relay,
|
||||
event_consumers=event_table,
|
||||
instruments=instruments
|
||||
) as (first, chan),
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
instruments=instruments,
|
||||
) as (first, chan):
|
||||
assert first is None
|
||||
|
||||
async def relay_events():
|
||||
async with chan.subscribe() as msg_stream:
|
||||
async for msg in msg_stream:
|
||||
print(msg)
|
||||
|
||||
n.start_soon(relay_events)
|
||||
|
||||
yield chan
|
||||
|
||||
await chan.send(None)
|
||||
|
||||
async with chan.subscribe() as msg_stream:
|
||||
task_status.started()
|
||||
async for msg in msg_stream:
|
||||
await to_chart.send(msg)
|
||||
|
||||
|
||||
@acm
|
||||
async def open_history_client(
|
||||
|
@ -391,12 +391,13 @@ async def open_history_client(
|
|||
|
||||
|
||||
async def backfill_bars(
|
||||
instrument: str,
|
||||
symbol: str,
|
||||
shm: ShmArray, # type: ignore # noqa
|
||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
) -> None:
|
||||
"""Fill historical bars into shared mem / storage afap.
|
||||
"""
|
||||
instrument = symbol
|
||||
with trio.CancelScope() as cs:
|
||||
async with open_cached_client('deribit') as client:
|
||||
bars = await client.bars(instrument)
|
||||
|
@ -418,21 +419,41 @@ async def stream_quotes(
|
|||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
|
||||
sym_infos = {}
|
||||
uid = 0
|
||||
sym = symbols[0]
|
||||
to_chart, from_feed = trio.open_memory_channel(1)
|
||||
|
||||
async with (
|
||||
open_cached_client('deribit') as client,
|
||||
send_chan as send_chan,
|
||||
open_cryptofeeds(symbols) as feed_chan
|
||||
trio.open_nursery() as n
|
||||
):
|
||||
await n.start(
|
||||
open_cryptofeeds, symbols, to_chart)
|
||||
|
||||
init_msgs = {
|
||||
# pass back token, and bool, signalling if we're the writer
|
||||
# and that history has been written
|
||||
sym: {
|
||||
'symbol_info': {},
|
||||
'shm_write_opts': {'sum_tick_vml': False},
|
||||
'fqsn': sym,
|
||||
},
|
||||
}
|
||||
|
||||
# keep client cached for real-time section
|
||||
cache = await client.cache_symbols()
|
||||
|
||||
async with feed_chan.subscribe() as msg_stream:
|
||||
async for msg in msg_stream:
|
||||
print(msg)
|
||||
async with from_feed:
|
||||
typ, quote = await anext(from_feed)
|
||||
|
||||
while typ != 'trade':
|
||||
typ, quote = await anext(from_feed)
|
||||
|
||||
task_status.started((init_msgs, quote))
|
||||
|
||||
async for typ, msg in from_feed:
|
||||
topic = msg['symbol']
|
||||
await send_chan.send({topic: msg})
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
|
Loading…
Reference in New Issue