More IB repairs..

Make the throttle error propagate through to `trio` again by adding
`dict`-msg support between the two loops such that errors can be
re-raised on the `trio` side. This is all integrated into the
`MethoProxy` and accompanying result relay task.

Further fix a longer standing issue where sometimes the `ib_insync`
order entry method will raise a weird assertion error because it detects
some internal order-id state issue.. Just ignore those and make relay
back an error to the ems in such cases.

Add a bunch of notes for todos surrounding data feed reset hackery.
broker_bumpz
Tyler Goodlet 2022-03-25 16:06:52 -04:00
parent 3e125625b1
commit 62d073dc18
1 changed files with 69 additions and 21 deletions

View File

@ -673,6 +673,7 @@ class Client:
# against non-known prices. # against non-known prices.
raise RuntimeError("Can not order {symbol}, no live feed?") raise RuntimeError("Can not order {symbol}, no live feed?")
try:
trade = self.ib.placeOrder( trade = self.ib.placeOrder(
contract, contract,
Order( Order(
@ -690,6 +691,10 @@ class Client:
designatedLocation='SMART', designatedLocation='SMART',
), ),
) )
except AssertionError: # errrg insync..
log.warning(f'order for {reqid} already complete?')
# will trigger an error in ems request handler task.
return None
# ib doesn't support setting your own id outside # ib doesn't support setting your own id outside
# their own weird client int counting ids.. # their own weird client int counting ids..
@ -1116,7 +1121,25 @@ class MethodProxy:
chan = self.chan chan = self.chan
# send through method + ``kwargs: dict`` as pair # send through method + ``kwargs: dict`` as pair
await chan.send((meth, kwargs)) await chan.send((meth, kwargs))
return await chan.receive() msg = await chan.receive()
res = msg.get('result')
if res:
return res
err = msg.get('error')
if not err:
raise ValueError(f'Received unexpected asyncio msg {msg}')
raise err
async def wait_for_data_reset(self) -> None:
'''
Send hacker hot keys to ib program and wait
for the event that declares the data feeds to be
back up before unblocking.
'''
...
async def open_aio_client_method_relay( async def open_aio_client_method_relay(
@ -1132,6 +1155,9 @@ async def open_aio_client_method_relay(
): ):
to_trio.send_nowait(client) to_trio.send_nowait(client)
# TODO: separate channel for error handling?
# client.inline_errors(to_trio)
# relay all method requests to ``asyncio``-side client and # relay all method requests to ``asyncio``-side client and
# deliver back results # deliver back results
while not to_trio._closed: while not to_trio._closed:
@ -1143,10 +1169,18 @@ async def open_aio_client_method_relay(
meth_name, kwargs = msg meth_name, kwargs = msg
meth = getattr(client, meth_name) meth = getattr(client, meth_name)
try:
resp = await meth(**kwargs) resp = await meth(**kwargs)
# echo the msg back # echo the msg back
to_trio.send_nowait(resp) to_trio.send_nowait({'result': resp})
except (
RequestError,
# TODO: relay all errors to trio?
# BaseException,
) as err:
to_trio.send_nowait({'error': err})
@acm @acm
@ -1361,6 +1395,7 @@ async def get_bars(
return await get() return await get()
except RequestError as err: except RequestError as err:
# why do we always need to rebind this?
_err = err _err = err
# TODO: retreive underlying ``ib_insync`` error? # TODO: retreive underlying ``ib_insync`` error?
@ -1409,6 +1444,10 @@ async def get_bars(
if not in_throttle: if not in_throttle:
await tractor.breakpoint() await tractor.breakpoint()
# TODO: wait on data con reset event
# then begin backfilling again.
# await proxy.wait_for_data()
in_throttle = True in_throttle = True
fails += 1 fails += 1
continue continue
@ -1463,6 +1502,7 @@ async def backfill_bars(
# on that until we have the `marketstore` daemon in place in which # on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys # case the shm size will be driven by user config and available sys
# memory. # memory.
# count: int = 120,
count: int = 22, count: int = 22,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -1745,7 +1785,9 @@ async def stream_quotes(
init_msgs = mk_init_msgs() init_msgs = mk_init_msgs()
with trio.move_on_after(1): # TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(6):
contract, first_ticker, details = await _trio_run_client_method( contract, first_ticker, details = await _trio_run_client_method(
method='get_quote', method='get_quote',
symbol=sym, symbol=sym,
@ -1922,6 +1964,12 @@ async def handle_order_requests(
# counter - collision prone..) # counter - collision prone..)
reqid=order.reqid, reqid=order.reqid,
) )
if reqid is None:
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
).dict())
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(