Drop status event processing at large

Since we figured out how to pass through ems dialog ids to the
`openOrders` sub we don't really need to do much with status updates
other then error handling. This drops `process_status()` and moves the
error handling logic into a status handler sub-block; we now just
info-log status updates for troubleshooting purposes.
kraken_userref_hackzin
Tyler Goodlet 2022-08-01 14:08:45 -04:00
parent 1cbf45b4c4
commit 69e501764a
2 changed files with 59 additions and 143 deletions

View File

@ -140,11 +140,8 @@ async def handle_order_requests(
try:
txid = reqids2txids[reqid]
except KeyError:
assert 0
# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT')
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
@ -971,68 +968,15 @@ async def handle_order_updates(
chain = apiflows[reqid]
chain.maps.append(event)
resps, errored = process_status(
event,
oid,
token,
chain,
reqids2txids,
)
if resps:
for resp in resps:
await ems_stream.send(resp)
txid = txid or lasttxid
if (
# errored likely on a rate limit or bad input
errored
and txid
# we throttle too-fast-requests on the ems side
or (txid and isinstance(txid, TooFastEdit))
):
# client was editting too quickly
# so we instead cancel this order
log.cancel(
f'Cancelling {reqid}@{txid} due to error:\n {event}')
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
case _:
log.warning(f'Unhandled trades update msg: {msg}')
def process_status(
event: dict[str, str],
oid: str,
token: str,
chain: ChainMap,
reqids2txids: dict[int, str],
) -> tuple[list[MsgUnion], bool]:
'''
Process `'[add/edit/cancel]OrderStatus'` events by translating to
and returning the equivalent EMS-msg responses.
'''
match event:
case {
'event': etype,
'status': 'error',
'reqid': reqid,
'errorMessage': errmsg,
}:
if status == 'error':
# any of ``{'add', 'edit', 'cancel'}``
action = etype.removesuffix('OrderStatus')
errmsg = rest['errorMessage']
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
resp = BrokerdError(
await ems_stream.send(BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=reqid,
@ -1040,68 +984,28 @@ def process_status(
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
)
return [resp], True
))
# successful request cases
case {
'event': 'addOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'txid': txid,
'descr': descr, # only on success?
}:
log.info(
f'Submitted order: {descr}\n'
f'ems oid: {oid}\n'
f'brokerd reqid: {reqid}\n'
f'txid: {txid}\n'
)
return [], False
txid = txid or lasttxid
if (
txid
case {
'event': 'editOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'descr': descr,
# we throttle too-fast-requests on the ems side
or (txid and isinstance(txid, TooFastEdit))
):
# client was editting too quickly
# so we instead cancel this order
log.cancel(
f'Cancelling {reqid}@{txid} due to:\n {event}')
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'brokerd reqid: {reqid}\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# XXX: update the expected txid since the ``openOrders`` sub
# doesn't relay through the ``userref`` value..
# (hopefully kraken will fix this so we don't need this
# line.)
# reqids2txids[reqid] = txid
# deliver another ack to update the ems-side `.reqid`.
return [], False
case {
"event": "cancelOrderStatus",
"status": "ok",
'reqid': reqid,
# XXX: sometimes this isn't provided!?
# 'txid': txids,
**rest,
}:
for txid in rest.get('txid', [chain['reqid']]):
log.info(
f'Cancelling order {oid}[requid={reqid}]:\n'
f'brokerd reqid: {reqid}\n'
)
# if txid == reqids2txids[reqid]:
# reqids2txids.pop(reqid)
return [], False
case _:
log.warning(f'Unhandled trades update msg: {msg}')
def norm_trade_records(

View File

@ -39,7 +39,11 @@ from docker.errors import (
APIError,
# ContainerError,
)
from requests.exceptions import ConnectionError, ReadTimeout
import requests
from requests.exceptions import (
ConnectionError,
ReadTimeout,
)
from ..log import get_logger, get_console_log
from .. import config
@ -188,13 +192,12 @@ class Container:
def hard_kill(self, start: float) -> None:
delay = time.time() - start
log.error(
f'Failed to kill container {self.cntr.id} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
log.error(
f'SIGKILL-ing: {self.cntr.id} after {delay}s\n'
)
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
@ -218,21 +221,26 @@ class Container:
self.try_signal('SIGINT')
start = time.time()
for _ in range(30):
for _ in range(6):
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.cancel('polling for CNTR logs...')
try:
await self.process_logs_until(stop_msg)
except ApplicationLogError:
hard_kill = True
else:
# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and terminated.
# assume we read the expected stop msg and
# terminated.
break
if cs.cancelled_caught:
# on timeout just try a hard kill after
# a quick container sync-wait.
hard_kill = True
try:
log.info(f'Polling for container shutdown:\n{cid}')
@ -254,9 +262,16 @@ class Container:
except (
docker.errors.APIError,
ConnectionError,
requests.exceptions.ConnectionError,
trio.Cancelled,
):
log.exception('Docker connection failure')
self.hard_kill(start)
raise
except trio.Cancelled:
log.exception('trio cancelled...')
self.hard_kill(start)
else:
hard_kill = True
@ -305,15 +320,12 @@ async def open_ahabd(
))
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
finally:
# needed?
with trio.CancelScope(shield=True):
await cntr.cancel(stop_msg)