diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 9cb3e9b3..25c8c1a7 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -104,7 +104,7 @@ async def handle_order_requests( # XXX: UGH, let's unify this.. with ``msgspec``. msg: dict[str, Any] order: BrokerdOrder - counter = count() + counter = count(1) async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') @@ -114,16 +114,32 @@ async def handle_order_requests( }: cancel = BrokerdCancel(**msg) reqid = ids[cancel.oid] - txid = reqids2txids[reqid] - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [txid], # should be txid from submission - }) + try: + txid = reqids2txids[reqid] + except KeyError: + # XXX: not sure if this block ever gets hit now? + log.error('TOO FAST CANCEL/EDIT') + reqids2txids[reqid] = TooFastEdit(reqid) + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + f'TooFastEdit reqid:{reqid}, could not cancelling..' + ), + + ) + ) + else: + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [txid], # should be txid from submission + }) case { 'account': 'kraken.spot' as account, @@ -138,8 +154,10 @@ async def handle_order_requests( ep = 'editOrder' reqid = ids[order.oid] # integer not txid try: - txid = reqids2txids.pop(reqid) + txid = reqids2txids[reqid] except KeyError: + # XXX: not sure if this block ever gets hit now? + log.error('TOO FAST EDIT') reqids2txids[reqid] = TooFastEdit(reqid) await ems_order_stream.send( BrokerdError( @@ -151,10 +169,11 @@ async def handle_order_requests( ) ) - - extra = { - 'orderid': txid, # txid - } + else: + extra = { + 'orderid': txid, # txid + 'newuserref': str(reqid), + } else: ep = 'addOrder' @@ -188,18 +207,22 @@ async def handle_order_requests( 'event': ep, 'token': token, - # XXX: this seems to always get an error response? - # 'userref': f"'{reqid}'", - 'reqid': reqid, # remapped-to-int uid from ems + # XXX: we set these to the same value since for us + # a request dialog and an order's state-liftime are + # treated the same. Also this used to not work, the + # values used to be mutex for some odd reason until + # we dealt with support about it, and then they + # fixed it and pretended like we were crazy and the + # issue was never there lmao... coorps bro. + # 'userref': str(reqid), + 'userref': str(reqid), 'pair': pair, 'price': str(order.price), 'volume': str(order.size), - - # only ensures request is valid, nothing more - # validate: 'true', - + # validate: 'true', # validity check, nothing more } | extra + log.info(f'Submitting WS order request:\n{pformat(req)}') await ws.send_msg(req) @@ -220,9 +243,7 @@ async def handle_order_requests( symbol=msg['symbol'], reason=( 'Invalid request msg:\n{msg}' - ), - - ) + )) ) @@ -230,9 +251,17 @@ async def handle_order_requests( async def subscribe( ws: wsproto.WSConnection, token: str, - subs: list[str] = [ - 'ownTrades', - 'openOrders', + subs: list[tuple[str, dict]] = [ + ('ownTrades', { + # don't send first 50 trades on startup, + # we already pull this manually from the rest endpoint. + 'snapshot': False, + },), + ('openOrders', { + # include rate limit counters + 'ratecounter': True, + },), + ], ): ''' @@ -244,14 +273,16 @@ async def subscribe( ''' # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - assert token - for sub in subs: + subnames: set[str] = set() + + for name, sub_opts in subs: msg = { 'event': 'subscribe', 'subscription': { - 'name': sub, + 'name': name, 'token': token, + **sub_opts, } } @@ -260,7 +291,34 @@ async def subscribe( # since internally the ws methods appear to be FIFO # locked. await ws.send_msg(msg) + subnames.add(name) + # wait on subscriptionn acks + with trio.move_on_after(5): + while True: + match (msg := await ws.recv_msg()): + case { + 'event': 'subscriptionStatus', + 'status': 'subscribed', + 'subscription': sub_opts, + } as msg: + log.info( + f'Sucessful subscribe for {sub_opts}:\n' + f'{pformat(msg)}' + ) + subnames.remove(sub_opts['name']) + if not subnames: + break + + case { + 'event': 'subscriptionStatus', + 'status': 'error', + 'errorMessage': errmsg, + } as msg: + raise RuntimeError( + f'{errmsg}\n\n' + f'{pformat(msg)}' + ) yield for sub in subs: @@ -490,16 +548,9 @@ async def trades_dialogue( ) await ctx.started((ppmsgs, [acc_name])) - # XXX: not fucking clue but putting this finally block - # will suppress errors inside the direct await below!?! - # likely something to do with the exist stack inside - # the nobsws stuff... - # try: - # Get websocket token for authenticated data stream # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) - err = resp.get('error') if err: raise BrokerError(err) @@ -573,18 +624,37 @@ async def handle_order_updates( defined in the signature clear to the reader. ''' - async for msg in ws_stream: match msg: - # process and relay clearing trade events to ems - # https://docs.kraken.com/websockets/#message-ownTrades + # TODO: turns out you get the fill events from the # `openOrders` before you get this, so it might be better # to do all fill/status/pp updates in that sub and just use # this one for ledger syncs? + + # XXX: ASK SUPPORT ABOUT THIS! + # For eg. we could take the "last 50 trades" and do a diff # with the ledger and then only do a re-sync if something # seems amiss? + + # process and relay clearing trade events to ems + # https://docs.kraken.com/websockets/#message-ownTrades + # format as tid -> trade event map + # eg. received msg format, + # [{'TOKWHY-SMTUB-G5DOI6': { + # 'cost': '95.29047', + # 'fee': '0.24776', + # 'margin': '0.00000', + # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', + # 'ordertype': 'limit', + # 'pair': 'XBT/EUR', + # 'postxid': 'TKH2SE-M7IF5-CFI7LT', + # 'price': '21268.20000', + # 'time': '1657990947.640891', + # 'type': 'buy', + # 'vol': '0.00448042' + # }}] case [ trades_msgs, 'ownTrades', @@ -594,63 +664,36 @@ async def handle_order_updates( f'ownTrades update_{seq}:\n' f'{pformat(trades_msgs)}' ) - # XXX: a fix / todo - # see the comment in the caller about weird error - # suppression around a commented `try:` - # assert 0 - - # format as tid -> trade event map - # eg. received msg format, - # [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047', - # 'fee': '0.24776', - # 'margin': '0.00000', - # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', - # 'ordertype': 'limit', - # 'pair': 'XBT/EUR', - # 'postxid': 'TKH2SE-M7IF5-CFI7LT', - # 'price': '21268.20000', - # 'time': '1657990947.640891', - # 'type': 'buy', - # 'vol': '0.00448042'}}] trades = { tid: trade for entry in trades_msgs for (tid, trade) in entry.items() # don't re-process datums we've already seen - if tid not in ledger_trans + # if tid not in ledger_trans } for tid, trade in trades.items(): + assert tid not in ledger_trans txid = trade['ordertxid'] + reqid = trade.get('userref') - # NOTE: yet again, here we don't have any ref to the - # reqid that's generated by us (as the client) and - # sent in the order request, so we have to look it - # up from our own registry... - reqid = reqids2txids.inverse[txid] + if not reqid: + # NOTE: yet again, here we don't have any ref to the + # reqid that's generated by us (as the client) and + # sent in the order request, so we have to look it + # up from our own registry... + reqid = reqids2txids.inverse[txid] + if not reqid: + log.warning(f'Unknown trade dialog: {txid}') action = trade['type'] price = float(trade['price']) size = float(trade['vol']) broker_time = float(trade['time']) - # send a fill msg for gui update - fill_msg = BrokerdFill( - time_ns=time.time_ns(), - reqid=reqid, - - action=action, - size=size, - price=price, - - # TODO: maybe capture more msg data - # i.e fees? - broker_details={'name': 'kraken'} | trade, - broker_time=broker_time - ) - await ems_stream.send(fill_msg) - - filled_msg = BrokerdStatus( + # TODO: we can emit this on the "closed" state in + # the `openOrders` sub-block below. + status_msg = BrokerdStatus( reqid=reqid, time_ns=time.time_ns(), @@ -672,7 +715,7 @@ async def handle_order_updates( # https://github.com/pikers/piker/issues/296 remaining=0, ) - await ems_stream.send(filled_msg) + await ems_stream.send(status_msg) new_trans = norm_trade_records(trades) ppmsgs = trades2pps( @@ -700,19 +743,31 @@ async def handle_order_updates( txid, update_msg = list(order_msg.items())[0] match update_msg: - # we ignore internal order updates triggered by - # kraken's "edit" endpoint. - case { - 'cancel_reason': 'Order replaced', - 'status': status, - # 'userref': reqid, # XXX: always zero bug XD - # **rest, - }: - log.info( - f'Order {txid} was replaced' - ) - continue - + # XXX: eg. of full msg schema: + # {'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm} # 0.0000 case { 'userref': reqid, @@ -721,42 +776,7 @@ async def handle_order_updates( # actual status updates.. see case above. 'status': status, **rest, - - # XXX: eg. of remaining msg schema: - # 'avg_price': _, - # 'cost': _, - # 'descr': { - # 'close': None, - # 'leverage': None, - # 'order': descr, - # 'ordertype': 'limit', - # 'pair': 'XMR/EUR', - # 'price': '74.94000000', - # 'price2': '0.00000000', - # 'type': 'buy' - # }, - # 'expiretm': None, - # 'fee': '0.00000000', - # 'limitprice': '0.00000000', - # 'misc': '', - # 'oflags': 'fciq', - # 'opentm': '1656966131.337344', - # 'refid': None, - # 'starttm': None, - # 'stopprice': '0.00000000', - # 'timeinforce': 'GTC', - # 'vol': submit_vlm, # '13.34400854', - # 'vol_exec': exec_vlm, # 0.0000 }: - ems_status = { - 'open': 'submitted', - 'closed': 'filled', - 'canceled': 'cancelled', - # do we even need to forward - # this state to the ems? - 'pending': 'pending', - }[status] - # TODO: store this in a ChainMap instance # per order dialog. # submit_vlm = rest.get('vol', 0) @@ -766,23 +786,70 @@ async def handle_order_updates( else: vlm = rest.get('vol_exec', 0) - ourreqid = reqids2txids.inverse.get(txid) + if status == 'canceled': + reqids2txids.pop(reqid) - if ourreqid != reqid: - log.warning( - 'REQID MISMATCH due to kraken api bugs..\n' - f'msg:{reqid}, ours:{ourreqid}' - ) - reqid = ourreqid + # we specially ignore internal order + # updates triggered by kraken's "edit" + # endpoint. + if rest['cancel_reason'] == 'Order replaced': + # TODO: + # - put the edit order status update + # code here? + # - send open order status msg. + log.info( + f'Order replaced: {txid}@reqid={reqid}' + ) + + # we don't do normal msg emission on + # a replacement cancel since it's + # the result of an "edited order" + # and thus we mask the kraken + # backend cancel then create details + # from the ems side. + continue + else: + # XXX: keep kraken engine's ``txid`` synced + # with the ems dialog's ``reqid``. + reqids2txids[reqid] = txid + + ourreqid = reqids2txids.inverse.get(txid) + if ourreqid is None: + log.info( + 'Mapping new txid to our reqid:\n' + f'{reqid} -> {txid}' + ) oid = ids.inverse.get(reqid) if ( status == 'open' and ( + # XXX: too fast edit handled by the + # request handler task: this + # scenario occurs when ems side + # requests are coming in too quickly + # such that there is no known txid + # yet established for the ems + # dialog's last reqid when the + # request handler task is already + # receceiving a new update for that + # reqid. In this case we simply mark + # the reqid as being "too fast" and + # then when we get the next txid + # update from kraken's backend, and + # thus the new txid, we simply + # cancel the order for now. - # TOO fast edit handled by the - # request handler task. + # TODO: Ideally we eventually + # instead make the client side of + # the ems block until a submission + # is confirmed by the backend + # instead of this hacky throttle + # style approach and avoid requests + # coming in too quickly on the other + # side of the ems, aka the client + # <-> ems dialog. (toofast := isinstance( reqids2txids.get(reqid), TooFastEdit @@ -798,7 +865,7 @@ async def handle_order_updates( # by not moving the client side line # until an edit confirmation # arrives... - log.warning( + log.cancel( f'Received too fast edit {txid}:\n' f'{update_msg}\n' 'Cancelling order for now!..' @@ -808,7 +875,7 @@ async def handle_order_updates( # TODO: handle these and relay them # through the EMS to the client / UI # side! - log.warning( + log.cancel( f'Rx unknown active order {txid}:\n' f'{update_msg}\n' 'Cancelling order for now!..' @@ -824,6 +891,18 @@ async def handle_order_updates( }) continue + # remap statuses to ems set. + ems_status = { + 'open': 'submitted', + 'closed': 'filled', + 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? + 'pending': 'pending', + }[status] + # TODO: i like the open / closed semantics + # more we should consider them for internals + # send BrokerdStatus messages for all # order state updates resp = BrokerdStatus( @@ -855,27 +934,45 @@ async def handle_order_updates( apiflows[reqid].maps.append(update_msg) await ems_stream.send(resp) - # fill event. - # there is no `status` field + # fill msg. + # eg. contents (in total): + # { + # 'vol_exec': '0.84709869', + # 'cost': '101.25370642', + # 'fee': '0.26325964', + # 'avg_price': '119.53000001', + # 'userref': 0, + # } + # NOTE: there is no `status` field case { 'vol_exec': vlm, + 'avg_price': price, + 'userref': reqid, **rest, - }: - # eg. fill msg contents (in total): - # { - # 'vol_exec': '0.84709869', - # 'cost': '101.25370642', - # 'fee': '0.26325964', - # 'avg_price': '119.53000001', - # 'userref': 0, - # } - # TODO: emit fill msg from here - reqid = reqids2txids.inverse[txid] + } as msg: + + ourreqid = reqids2txids.inverse[txid] + assert reqid == ourreqid log.info( f'openOrders vlm={vlm} Fill for {reqid}:\n' f'{update_msg}' ) - continue + + fill_msg = BrokerdFill( + time_ns=time.time_ns(), + reqid=reqid, + + # action=action, # just use size value + # for now? + size=vlm, + price=price, + + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'} | trade, + broker_time=broker_time + ) + await ems_stream.send(fill_msg) case _: log.warning( @@ -883,21 +980,7 @@ async def handle_order_updates( f'{txid}:{order_msg}' ) - # TODO: given the 'openOrders' sub , pretty - # much all the msgs we get for this sub are duplicate - # of the (incremental) updates in that one though we still - # need them because that sub seems to have a bug where the - # `userref` field is always 0 instead of our generated reqid - # value... - # Not sure why kraken devs decided to repeat themselves but - # it almost seems as though we could drop this entire sub - # and get everything we need by just parsing msgs correctly - # above? The only reason for this seems to be remapping - # underlying `txid` values on order "edits" which the - # `openOrders` sub doesn't seem to have any knowledge of. - # I'd also like to ask them which event guarantees that the - # the live order is now in the book, since these status ones - # almost seem more like request-acks then state guarantees. + # order request status updates case { 'event': etype, 'status': status, @@ -914,9 +997,13 @@ async def handle_order_updates( f'{etype}:\n' f'{pformat(msg)}' ) - oid = ids.inverse.get(reqid) + + txid = rest.get('txid') + lasttxid = reqids2txids.get(reqid) + # TODO: relay these to EMS once it supports # open order loading. + oid = ids.inverse.get(reqid) if not oid: log.warning( 'Unknown order status update?:\n' @@ -924,136 +1011,50 @@ async def handle_order_updates( ) continue - lasttxid = reqids2txids.get(reqid) - txid = rest.get('txid') - # update the msg chain chain = apiflows[reqid] chain.maps.append(event) - resps, errored = process_status( - event, - oid, - token, - chain, - ) - if resps: - for resp in resps: - await ems_stream.send(resp) + if status == 'error': + # any of ``{'add', 'edit', 'cancel'}`` + action = etype.removesuffix('OrderStatus') + errmsg = rest['errorMessage'] + log.error( + f'Failed to {action} order {reqid}:\n' + f'{errmsg}' + ) + await ems_stream.send(BrokerdError( + oid=oid, + # XXX: use old reqid in case it changed? + reqid=reqid, + symbol=chain.get('symbol', 'N/A'), - if txid: + reason=f'Failed {action}:\n{errmsg}', + broker_details=event + )) + + txid = txid or lasttxid if ( - isinstance(lasttxid, TooFastEdit) - or errored + txid + + # we throttle too-fast-requests on the ems side + or (txid and isinstance(txid, TooFastEdit)) ): # client was editting too quickly # so we instead cancel this order - log.cancel(f'Cancelling order for {reqid}@{txid}') + log.cancel( + f'Cancelling {reqid}@{txid} due to:\n {event}') await ws.send_msg({ 'event': 'cancelOrder', 'token': token, 'reqid': reqid or 0, 'txid': [txid], }) - else: - # XXX: we **must** do this mapping for edit order - # status updates since the `openOrders` sub above - # never relays back the correct client-side `reqid` - # that is put in the order request.. - reqids2txids[reqid] = txid - case _: + log.warning(f'Unhandled trades update msg: {msg}') -def process_status( - event: dict[str, str], - oid: str, - token: str, - chain: ChainMap, - -) -> tuple[list[MsgUnion], bool]: - ''' - Process `'[add/edit/cancel]OrderStatus'` events by translating to - and returning the equivalent EMS-msg responses. - - ''' - match event: - case { - 'event': etype, - 'status': 'error', - 'reqid': reqid, - 'errorMessage': errmsg, - }: - # any of ``{'add', 'edit', 'cancel'}`` - action = etype.removesuffix('OrderStatus') - log.error( - f'Failed to {action} order {reqid}:\n' - f'{errmsg}' - ) - resp = BrokerdError( - oid=oid, - # XXX: use old reqid in case it changed? - reqid=reqid, - symbol=chain.get('symbol', 'N/A'), - - reason=f'Failed {action}:\n{errmsg}', - broker_details=event - ) - return [resp], True - - # successful request cases - case { - 'event': 'addOrderStatus', - 'status': "ok", - 'reqid': reqid, # oid from ems side - 'txid': txid, - 'descr': descr, # only on success? - }: - log.info( - f'Submitted order: {descr}\n' - f'ems oid: {oid}\n' - f'brokerd reqid: {reqid}\n' - f'txid: {txid}\n' - ) - return [], False - - case { - 'event': 'editOrderStatus', - 'status': "ok", - 'reqid': reqid, # oid from ems side - 'descr': descr, - - # NOTE: for edit request this is a new value - 'txid': txid, - 'originaltxid': origtxid, - }: - log.info( - f'Editting order {oid}[requid={reqid}]:\n' - f'brokerd reqid: {reqid}\n' - f'txid: {origtxid} -> {txid}\n' - f'{descr}' - ) - # deliver another ack to update the ems-side `.reqid`. - return [], False - - case { - "event": "cancelOrderStatus", - "status": "ok", - 'reqid': reqid, - - # XXX: sometimes this isn't provided!? - # 'txid': txids, - **rest, - }: - for txid in rest.get('txid', [chain['reqid']]): - log.info( - f'Cancelling order {oid}[requid={reqid}]:\n' - f'brokerd reqid: {reqid}\n' - ) - return [], False - - def norm_trade_records( ledger: dict[str, Any], diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 0f41a3ec..7c589d85 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -152,19 +152,8 @@ async def stream_messages( continue - case { - 'connectionID': _, - 'event': 'systemStatus', - 'status': 'online', - 'version': _, - } as msg: - log.info( - 'WS connection is up:\n' - f'{msg}' - ) - continue - case _: + # passthrough sub msgs yield msg diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 6910d206..218d46e0 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -39,7 +39,11 @@ from docker.errors import ( APIError, # ContainerError, ) -from requests.exceptions import ConnectionError, ReadTimeout +import requests +from requests.exceptions import ( + ConnectionError, + ReadTimeout, +) from ..log import get_logger, get_console_log from .. import config @@ -188,13 +192,12 @@ class Container: def hard_kill(self, start: float) -> None: delay = time.time() - start - log.error( - f'Failed to kill container {self.cntr.id} after {delay}s\n' - 'sending SIGKILL..' - ) # get out the big guns, bc apparently marketstore # doesn't actually know how to terminate gracefully # :eyeroll:... + log.error( + f'SIGKILL-ing: {self.cntr.id} after {delay}s\n' + ) self.try_signal('SIGKILL') self.cntr.wait( timeout=3, @@ -218,20 +221,25 @@ class Container: self.try_signal('SIGINT') start = time.time() - for _ in range(30): + for _ in range(6): with trio.move_on_after(0.5) as cs: - cs.shield = True log.cancel('polling for CNTR logs...') try: await self.process_logs_until(stop_msg) except ApplicationLogError: hard_kill = True + else: + # if we aren't cancelled on above checkpoint then we + # assume we read the expected stop msg and + # terminated. + break - # if we aren't cancelled on above checkpoint then we - # assume we read the expected stop msg and terminated. - break + if cs.cancelled_caught: + # on timeout just try a hard kill after + # a quick container sync-wait. + hard_kill = True try: log.info(f'Polling for container shutdown:\n{cid}') @@ -254,9 +262,16 @@ class Container: except ( docker.errors.APIError, ConnectionError, + requests.exceptions.ConnectionError, + trio.Cancelled, ): log.exception('Docker connection failure') self.hard_kill(start) + raise + + except trio.Cancelled: + log.exception('trio cancelled...') + self.hard_kill(start) else: hard_kill = True @@ -305,16 +320,13 @@ async def open_ahabd( )) try: - # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing # callers to have root perms? await trio.sleep_forever() finally: - # needed? - with trio.CancelScope(shield=True): - await cntr.cancel(stop_msg) + await cntr.cancel(stop_msg) async def start_ahab(