Update trade message format
parent
ad9d645782
commit
c751c36a8b
|
@ -101,33 +101,46 @@ async def mk_stream_quotes(
|
||||||
|
|
||||||
sym = symbols[0]
|
sym = symbols[0]
|
||||||
|
|
||||||
async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan):
|
async with (
|
||||||
|
open_cached_client(exchange.lower()) as client,
|
||||||
|
send_chan as send_chan
|
||||||
|
):
|
||||||
pairs = await client.cache_pairs()
|
pairs = await client.cache_pairs()
|
||||||
|
|
||||||
pair_data = pairs[sym]
|
pair_data = pairs[sym]
|
||||||
|
|
||||||
async with maybe_open_price_feed(pair_data, exchange, channels) as stream:
|
async with maybe_open_price_feed(pair_data, exchange, channels) as stream:
|
||||||
|
|
||||||
init_msgs = {
|
init_msgs = {
|
||||||
# pass back token, and bool, signalling if we're the writer
|
|
||||||
# and that history has been written
|
|
||||||
sym: {
|
sym: {
|
||||||
"symbol_info": {"asset_type": "crypto", "price_tick_size": 0.0005},
|
"symbol_info": {"asset_type": "crypto", "price_tick_size": 0.0005},
|
||||||
"shm_write_opts": {"sum_tick_vml": False},
|
"shm_write_opts": {"sum_tick_vml": False},
|
||||||
"fqsn": sym,
|
"fqsn": sym,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
# broker schemas to validate symbol data
|
|
||||||
quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []}
|
quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []}
|
||||||
|
|
||||||
task_status.started((init_msgs, quote_msg))
|
task_status.started((init_msgs, quote_msg))
|
||||||
|
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
# try:
|
||||||
async for typ, quote in stream:
|
# async for typ, quote in stream:
|
||||||
print(f'streaming {typ} quote: {quote}')
|
# print(f'streaming {typ} quote: {quote}')
|
||||||
topic = quote["symbol"]
|
# topic = quote["symbobl"]
|
||||||
await send_chan.send({topic: quote})
|
# await send_chan.send({topic: quote})
|
||||||
|
# finally:
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
with trio.move_on_after(4) as cancel_scope:
|
||||||
|
log.warning(f'WAITING FOR MESSAGE')
|
||||||
|
msg = await stream.receive()
|
||||||
|
log.warning(f'RECEIVED MSG: {msg}')
|
||||||
|
topic = msg["symbol"]
|
||||||
|
await send_chan.send({topic: msg})
|
||||||
|
log.warning(f'SENT TO CHAN')
|
||||||
|
if cancel_scope.cancelled_caught:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_price_feed(
|
async def maybe_open_price_feed(
|
||||||
|
@ -144,10 +157,7 @@ async def maybe_open_price_feed(
|
||||||
},
|
},
|
||||||
key=pair_data["name"],
|
key=pair_data["name"],
|
||||||
) as (cache_hit, feed):
|
) as (cache_hit, feed):
|
||||||
if cache_hit:
|
yield feed
|
||||||
yield broadcast_receiver(feed, 10)
|
|
||||||
else:
|
|
||||||
yield feed
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -189,32 +199,37 @@ async def aio_price_feed_relay(
|
||||||
to_trio: trio.abc.SendChannel,
|
to_trio: trio.abc.SendChannel,
|
||||||
) -> None:
|
) -> None:
|
||||||
async def _trade(data: dict, receipt_timestamp):
|
async def _trade(data: dict, receipt_timestamp):
|
||||||
print(f' trade data: {data}')
|
data = data.to_dict()
|
||||||
to_trio.send_nowait(
|
message = (
|
||||||
(
|
|
||||||
"trade",
|
"trade",
|
||||||
{
|
{
|
||||||
"symbol": cf_sym_to_fqsn(data.symbol),
|
"symbol": cf_sym_to_fqsn(data['symbol']),
|
||||||
"last": float(data.to_dict()['price']),
|
"last": float(data['price']),
|
||||||
"broker_ts": time.time(),
|
"broker_ts": time.time(),
|
||||||
"data": data.to_dict(),
|
"ticks": [{
|
||||||
"receipt": receipt_timestamp,
|
'type': 'trade',
|
||||||
|
'price': float(data['price']),
|
||||||
|
'size': float(data['amount']),
|
||||||
|
'broker_ts': receipt_timestamp
|
||||||
|
}],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
)
|
print(f'trade message: {message}')
|
||||||
|
# try:
|
||||||
|
to_trio.send_nowait(message)
|
||||||
|
# except trio.WouldBlock as e:
|
||||||
|
#breakpoint()
|
||||||
|
|
||||||
async def _l1(data: dict, receipt_timestamp):
|
async def _l1(data: dict, receipt_timestamp):
|
||||||
print(f'l2 data: {data}')
|
|
||||||
bid = data.book.to_dict()['bid']
|
bid = data.book.to_dict()['bid']
|
||||||
ask = data.book.to_dict()['ask']
|
ask = data.book.to_dict()['ask']
|
||||||
l1_ask_price, l1_ask_size = next(iter(ask.items()))
|
l1_ask_price, l1_ask_size = next(iter(ask.items()))
|
||||||
l1_bid_price, l1_bid_size = next(iter(bid.items()))
|
l1_bid_price, l1_bid_size = next(iter(bid.items()))
|
||||||
|
message = (
|
||||||
to_trio.send_nowait(
|
|
||||||
(
|
|
||||||
"l1",
|
"l1",
|
||||||
{
|
{
|
||||||
"symbol": cf_sym_to_fqsn(data.symbol),
|
"symbol": cf_sym_to_fqsn(data.symbol),
|
||||||
|
"broker_ts": time.time(),
|
||||||
"ticks": [
|
"ticks": [
|
||||||
{
|
{
|
||||||
"type": "bid",
|
"type": "bid",
|
||||||
|
@ -239,7 +254,10 @@ async def aio_price_feed_relay(
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
)
|
try:
|
||||||
|
to_trio.send_nowait(message)
|
||||||
|
except trio.WouldBlock as e:
|
||||||
|
print(e)
|
||||||
|
|
||||||
fh.add_feed(
|
fh.add_feed(
|
||||||
exchange,
|
exchange,
|
||||||
|
@ -249,10 +267,8 @@ async def aio_price_feed_relay(
|
||||||
)
|
)
|
||||||
|
|
||||||
if not fh.running:
|
if not fh.running:
|
||||||
try:
|
fh.run(start_loop=False, install_signal_handlers=False)
|
||||||
fh.run(start_loop=False, install_signal_handlers=False)
|
|
||||||
except BaseExceptionGroup as e:
|
|
||||||
breakpoint()
|
|
||||||
# sync with trio
|
# sync with trio
|
||||||
to_trio.send_nowait(None)
|
to_trio.send_nowait(None)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue