diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 1cf4e9fe..433c058b 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -19,6 +19,8 @@ Order api and machinery ''' from contextlib import asynccontextmanager as acm +from functools import partial +from itertools import chain from pprint import pformat import time from typing import ( @@ -234,20 +236,49 @@ async def handle_order_requests( log.error(f'Unknown order command: {request_msg}') -def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: +@acm +async def subscribe( + ws: wsproto.WSConnection, + token: str, + subs: list[str] = ['ownTrades', 'openOrders'], +): ''' - Create a request subscription packet dict. - - ## TODO: point to the auth urls + Setup ws api subscriptions: https://docs.kraken.com/websockets/#message-subscribe + By default we sign up for trade and order update events. + ''' - # eg. specific logic for this in kraken's sync client: + # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'event': 'subscribe', - 'subscription': data, - } + + assert token + for sub in subs: + msg = { + 'event': 'subscribe', + 'subscription': { + 'name': sub, + 'token': token, + } + } + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(msg) + + yield + + for sub in subs: + # unsub from all pairs on teardown + await ws.send_msg({ + 'event': 'unsubscribe', + 'subscription': [sub], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() @tractor.context @@ -259,33 +290,6 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - @acm - async def subscribe(ws: wsproto.WSConnection, token: str): - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - trades_sub = make_auth_sub( - {'name': 'ownTrades', 'token': token} - ) - - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_msg(trades_sub) - - yield - - # unsub from all pairs on teardown - await ws.send_msg({ - 'event': 'unsubscribe', - 'subscription': ['ownTrades'], - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - async with get_client() as client: # TODO: make ems flip to paper mode via @@ -347,8 +351,10 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, open_autorecon_ws( 'wss://ws-auth.kraken.com/', - fixture=subscribe, - token=token, + fixture=partial( + subscribe, + token=token, + ), ) as ws, trio.open_nursery() as n, ): @@ -356,7 +362,6 @@ async def trades_dialogue( n.start_soon(handle_order_requests, client, ems_stream) count: int = 0 - ledger_txids = {r.tid for r in trans} # process and relay trades events to ems # https://docs.kraken.com/websockets/#message-ownTrades @@ -367,62 +372,116 @@ async def trades_dialogue( 'ownTrades', {'sequence': seq}, ]: - # ensure that we are only processing new trades + # XXX: do we actually need this orrr? + # ensure that we are only processing new trades? assert seq > count count += 1 - for entries in trades_msgs: - for tid, msg in entries.items(): + # flatten msgs for processing + trades = { + tid: trade + for entry in trades_msgs + for (tid, trade) in entry.items() - if tid in ledger_txids: - log.debug(f'Skipping ledgered {tid}:{msg}') - continue + # only emit entries which are already not-in-ledger + if tid not in {r.tid for r in trans} + } + for tid, trade in trades.items(): - # yield trade - reqid = msg['ordertxid'] - action = msg['type'] - price = float(msg['price']) - size = float(msg['vol']) - broker_time = float(msg['time']) + # parse-cast + reqid = trade['ordertxid'] + action = trade['type'] + price = float(trade['price']) + size = float(trade['vol']) + broker_time = float(trade['time']) - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=reqid, - time_ns=time.time_ns(), + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=reqid, + time_ns=time.time_ns(), - action=action, - size=size, - price=price, - # TODO: maybe capture more msg data - # i.e fees? - broker_details={'name': 'kraken'}, - broker_time=broker_time - ) + action=action, + size=size, + price=price, + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'}, + broker_time=broker_time + ) + await ems_stream.send(fill_msg.dict()) - await ems_stream.send(fill_msg.dict()) - filled_msg = BrokerdStatus( - reqid=reqid, - time_ns=time.time_ns(), + filled_msg = BrokerdStatus( + reqid=reqid, + time_ns=time.time_ns(), - account=acc_name, - status='filled', - filled=size, - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': broker_time - }, + account=acc_name, + status='filled', + filled=size, + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': broker_time + }, - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - await ems_stream.send(filled_msg.dict()) + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) + + # update ledger and position tracking + trans = await update_ledger(acctid, trades) + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys( + t.bsuid for t in trans), + ) + + # emit pp msgs + for pos in filter( + bool, + chain(active.values(), closed.values()), + ): + pp_msg = BrokerdPosition( + broker='kraken', + + # XXX: ok so this is annoying, we're + # relaying an account name with the + # backend suffix prefixed but when + # reading accounts from ledgers we + # don't need it and/or it's prefixed + # in the section table.. we should + # just strip this from the message + # right since `.broker` is already + # included? + account=f'kraken.{acctid}', + symbol=pos.symbol.front_fqsn(), + size=pos.size, + avg_price=pos.be_price, + + # TODO + # currency='' + ) + await ems_stream.send(pp_msg.dict()) + + case [ + trades_msgs, + 'openOrders', + {'sequence': seq}, + ]: + # TODO: async order update handling which we + # should remove from `handle_order_requests()` + # above: + # https://github.com/pikers/piker/issues/293 + # https://github.com/pikers/piker/issues/310 + log.info(f'Order update {seq}:{trades_msgs}') case _: log.warning(f'Unhandled trades msg: {msg}') @@ -452,7 +511,7 @@ def norm_trade_records( size=float(size), price=float(record['price']), cost=float(record['fee']), - dt=pendulum.from_timestamp(record['time']), + dt=pendulum.from_timestamp(float(record['time'])), bsuid=bsuid, # XXX: there are no derivs on kraken right?