diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 8a6fa84c..ac22c8df 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -140,11 +140,8 @@ async def handle_order_requests( try: txid = reqids2txids[reqid] except KeyError: - assert 0 - # XXX: not sure if this block ever gets hit now? log.error('TOO FAST EDIT') - reqids2txids[reqid] = TooFastEdit(reqid) await ems_order_stream.send( BrokerdError( @@ -971,139 +968,46 @@ async def handle_order_updates( chain = apiflows[reqid] chain.maps.append(event) - resps, errored = process_status( - event, - oid, - token, - chain, - reqids2txids, - ) + if status == 'error': + # any of ``{'add', 'edit', 'cancel'}`` + action = etype.removesuffix('OrderStatus') + errmsg = rest['errorMessage'] + log.error( + f'Failed to {action} order {reqid}:\n' + f'{errmsg}' + ) + await ems_stream.send(BrokerdError( + oid=oid, + # XXX: use old reqid in case it changed? + reqid=reqid, + symbol=chain.get('symbol', 'N/A'), - if resps: - for resp in resps: - await ems_stream.send(resp) + reason=f'Failed {action}:\n{errmsg}', + broker_details=event + )) - txid = txid or lasttxid - if ( - # errored likely on a rate limit or bad input - errored - and txid + txid = txid or lasttxid + if ( + txid + + # we throttle too-fast-requests on the ems side + or (txid and isinstance(txid, TooFastEdit)) + ): + # client was editting too quickly + # so we instead cancel this order + log.cancel( + f'Cancelling {reqid}@{txid} due to:\n {event}') + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) - # we throttle too-fast-requests on the ems side - or (txid and isinstance(txid, TooFastEdit)) - ): - # client was editting too quickly - # so we instead cancel this order - log.cancel( - f'Cancelling {reqid}@{txid} due to error:\n {event}') - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid or 0, - 'txid': [txid], - }) case _: log.warning(f'Unhandled trades update msg: {msg}') -def process_status( - event: dict[str, str], - oid: str, - token: str, - chain: ChainMap, - reqids2txids: dict[int, str], - -) -> tuple[list[MsgUnion], bool]: - ''' - Process `'[add/edit/cancel]OrderStatus'` events by translating to - and returning the equivalent EMS-msg responses. - - ''' - match event: - case { - 'event': etype, - 'status': 'error', - 'reqid': reqid, - 'errorMessage': errmsg, - }: - # any of ``{'add', 'edit', 'cancel'}`` - action = etype.removesuffix('OrderStatus') - log.error( - f'Failed to {action} order {reqid}:\n' - f'{errmsg}' - ) - resp = BrokerdError( - oid=oid, - # XXX: use old reqid in case it changed? - reqid=reqid, - symbol=chain.get('symbol', 'N/A'), - - reason=f'Failed {action}:\n{errmsg}', - broker_details=event - ) - return [resp], True - - # successful request cases - case { - 'event': 'addOrderStatus', - 'status': "ok", - 'reqid': reqid, # oid from ems side - 'txid': txid, - 'descr': descr, # only on success? - }: - log.info( - f'Submitted order: {descr}\n' - f'ems oid: {oid}\n' - f'brokerd reqid: {reqid}\n' - f'txid: {txid}\n' - ) - return [], False - - case { - 'event': 'editOrderStatus', - 'status': "ok", - 'reqid': reqid, # oid from ems side - 'descr': descr, - - # NOTE: for edit request this is a new value - 'txid': txid, - 'originaltxid': origtxid, - }: - log.info( - f'Editting order {oid}[requid={reqid}]:\n' - f'brokerd reqid: {reqid}\n' - f'txid: {origtxid} -> {txid}\n' - f'{descr}' - ) - - # XXX: update the expected txid since the ``openOrders`` sub - # doesn't relay through the ``userref`` value.. - # (hopefully kraken will fix this so we don't need this - # line.) - # reqids2txids[reqid] = txid - # deliver another ack to update the ems-side `.reqid`. - return [], False - - case { - "event": "cancelOrderStatus", - "status": "ok", - 'reqid': reqid, - - # XXX: sometimes this isn't provided!? - # 'txid': txids, - **rest, - }: - for txid in rest.get('txid', [chain['reqid']]): - log.info( - f'Cancelling order {oid}[requid={reqid}]:\n' - f'brokerd reqid: {reqid}\n' - ) - # if txid == reqids2txids[reqid]: - # reqids2txids.pop(reqid) - - return [], False - - def norm_trade_records( ledger: dict[str, Any], diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 6910d206..218d46e0 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -39,7 +39,11 @@ from docker.errors import ( APIError, # ContainerError, ) -from requests.exceptions import ConnectionError, ReadTimeout +import requests +from requests.exceptions import ( + ConnectionError, + ReadTimeout, +) from ..log import get_logger, get_console_log from .. import config @@ -188,13 +192,12 @@ class Container: def hard_kill(self, start: float) -> None: delay = time.time() - start - log.error( - f'Failed to kill container {self.cntr.id} after {delay}s\n' - 'sending SIGKILL..' - ) # get out the big guns, bc apparently marketstore # doesn't actually know how to terminate gracefully # :eyeroll:... + log.error( + f'SIGKILL-ing: {self.cntr.id} after {delay}s\n' + ) self.try_signal('SIGKILL') self.cntr.wait( timeout=3, @@ -218,20 +221,25 @@ class Container: self.try_signal('SIGINT') start = time.time() - for _ in range(30): + for _ in range(6): with trio.move_on_after(0.5) as cs: - cs.shield = True log.cancel('polling for CNTR logs...') try: await self.process_logs_until(stop_msg) except ApplicationLogError: hard_kill = True + else: + # if we aren't cancelled on above checkpoint then we + # assume we read the expected stop msg and + # terminated. + break - # if we aren't cancelled on above checkpoint then we - # assume we read the expected stop msg and terminated. - break + if cs.cancelled_caught: + # on timeout just try a hard kill after + # a quick container sync-wait. + hard_kill = True try: log.info(f'Polling for container shutdown:\n{cid}') @@ -254,9 +262,16 @@ class Container: except ( docker.errors.APIError, ConnectionError, + requests.exceptions.ConnectionError, + trio.Cancelled, ): log.exception('Docker connection failure') self.hard_kill(start) + raise + + except trio.Cancelled: + log.exception('trio cancelled...') + self.hard_kill(start) else: hard_kill = True @@ -305,16 +320,13 @@ async def open_ahabd( )) try: - # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing # callers to have root perms? await trio.sleep_forever() finally: - # needed? - with trio.CancelScope(shield=True): - await cntr.cancel(stop_msg) + await cntr.cancel(stop_msg) async def start_ahab(