Update trade message format

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-03 15:24:32 -05:00
parent 79feb1073d
commit f830a776ab
1 changed files with 48 additions and 32 deletions

View File

@ -101,33 +101,46 @@ async def mk_stream_quotes(
sym = symbols[0]
async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan):
async with (
open_cached_client(exchange.lower()) as client,
send_chan as send_chan
):
pairs = await client.cache_pairs()
pair_data = pairs[sym]
async with maybe_open_price_feed(pair_data, exchange, channels) as stream:
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
"symbol_info": {"asset_type": "crypto", "price_tick_size": 0.0005},
"shm_write_opts": {"sum_tick_vml": False},
"fqsn": sym,
},
}
# broker schemas to validate symbol data
quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []}
task_status.started((init_msgs, quote_msg))
feed_is_live.set()
# try:
# async for typ, quote in stream:
# print(f'streaming {typ} quote: {quote}')
# topic = quote["symbobl"]
# await send_chan.send({topic: quote})
# finally:
# breakpoint()
async for typ, quote in stream:
print(f'streaming {typ} quote: {quote}')
topic = quote["symbol"]
await send_chan.send({topic: quote})
while True:
with trio.move_on_after(4) as cancel_scope:
log.warning(f'WAITING FOR MESSAGE')
msg = await stream.receive()
log.warning(f'RECEIVED MSG: {msg}')
topic = msg["symbol"]
await send_chan.send({topic: msg})
log.warning(f'SENT TO CHAN')
if cancel_scope.cancelled_caught:
await tractor.breakpoint()
@acm
async def maybe_open_price_feed(
@ -144,10 +157,7 @@ async def maybe_open_price_feed(
},
key=pair_data["name"],
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
yield feed
@acm
@ -189,32 +199,37 @@ async def aio_price_feed_relay(
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
print(f' trade data: {data}')
to_trio.send_nowait(
(
data = data.to_dict()
message = (
"trade",
{
"symbol": cf_sym_to_fqsn(data.symbol),
"last": float(data.to_dict()['price']),
"symbol": cf_sym_to_fqsn(data['symbol']),
"last": float(data['price']),
"broker_ts": time.time(),
"data": data.to_dict(),
"receipt": receipt_timestamp,
"ticks": [{
'type': 'trade',
'price': float(data['price']),
'size': float(data['amount']),
'broker_ts': receipt_timestamp
}],
},
)
)
print(f'trade message: {message}')
# try:
to_trio.send_nowait(message)
# except trio.WouldBlock as e:
#breakpoint()
async def _l1(data: dict, receipt_timestamp):
print(f'l2 data: {data}')
bid = data.book.to_dict()['bid']
ask = data.book.to_dict()['ask']
l1_ask_price, l1_ask_size = next(iter(ask.items()))
l1_bid_price, l1_bid_size = next(iter(bid.items()))
to_trio.send_nowait(
(
message = (
"l1",
{
"symbol": cf_sym_to_fqsn(data.symbol),
"broker_ts": time.time(),
"ticks": [
{
"type": "bid",
@ -239,7 +254,10 @@ async def aio_price_feed_relay(
]
}
)
)
try:
to_trio.send_nowait(message)
except trio.WouldBlock as e:
print(e)
fh.add_feed(
exchange,
@ -249,10 +267,8 @@ async def aio_price_feed_relay(
)
if not fh.running:
try:
fh.run(start_loop=False, install_signal_handlers=False)
except BaseExceptionGroup as e:
breakpoint()
fh.run(start_loop=False, install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)