Inline `process_trade_msgs()` into relay loop

notokeninwswrapper
Tyler Goodlet 2022-07-03 11:18:45 -04:00
parent 47777e4192
commit aea7bec2c3
1 changed files with 72 additions and 89 deletions

View File

@ -37,9 +37,13 @@ import wsproto
from piker import pp
from piker.clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel,
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
)
from .api import (
Client,
@ -338,6 +342,7 @@ async def trades_dialogue(
token = resp['result']['token']
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
open_autorecon_ws(
@ -350,55 +355,78 @@ async def trades_dialogue(
# task for processing inbound requests from ems
n.start_soon(handle_order_requests, client, ems_stream)
# begin trade event processing
async for trade in process_trade_msgs(
ws,
trans, # pass in prior ledger transactions
):
match trade:
# prepare and send a filled status update
case Trade():
filled_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),
count: int = 0
ledger_txids = {r.tid for r in trans}
account=acc_name,
status='filled',
filled=float(trade.size),
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
},
# process and relay trades events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
async for msg in stream_messages(ws):
match msg:
case [
trades_msgs,
'ownTrades',
{'sequence': seq},
]:
# ensure that we are only processing new trades
assert seq > count
count += 1
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
for entries in trades_msgs:
for tid, msg in entries.items():
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=trade.reqid,
time_ns=time.time_ns(),
if tid in ledger_txids:
log.debug(f'Skipping ledgered {tid}:{msg}')
continue
action=trade.action,
size=float(trade.size),
price=float(trade.price),
# TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time)
)
# yield trade
reqid = msg['ordertxid']
action = msg['type']
price = float(msg['price'])
size = float(msg['vol'])
broker_time = float(msg['time'])
await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())
case _:
log.warning(f'Unhandled trades msg: {trade}')
log.warning(f'Unhandled trades msg: {msg}')
await tractor.breakpoint()
@ -452,48 +480,3 @@ async def update_ledger(
# normalize to transaction form
records = norm_trade_records(trade_entries)
return records
async def process_trade_msgs(
ws: NoBsWs,
trans: list[pp.Transaction],
):
'''
Parse and pack trades subscription messages, deliver framed
sequences of messages?
Ws api docs:
https://docs.kraken.com/websockets/#message-ownTrades
'''
count: int = 0
ledger_txids = {r.tid for r in trans}
async for msg in stream_messages(ws):
sub = msg[1]
seq = msg[2]['sequence']
# stream sanity checks
assert sub == 'ownTrades'
# ensure that we are only processing new trades
assert seq > count
count += 1
trade_events = msg[0]
for trade_event in trade_events:
for tid, trade_data in trade_event.items():
if tid in ledger_txids:
continue
trade = Trade(
reqid=msg['ordertxid'],
action=msg['type'],
price=msg['price'],
size=msg['vol'],
broker_time=msg['time']
)
yield trade