Port clearing engine to new tractor stream api

tractor_open_stream_from
Tyler Goodlet 2021-04-29 08:40:16 -04:00
parent 56db2c812d
commit d3b50b9920
2 changed files with 133 additions and 131 deletions

View File

@ -246,23 +246,24 @@ async def open_ems(
async with maybe_open_emsd(broker) as portal:
trades_stream = await portal.run(
async with portal.open_stream_from(
_emsd_main,
client_actor_name=actor.name,
broker=broker,
symbol=symbol.key,
)
with trio.fail_after(10):
await book._ready_to_receive.wait()
) as trades_stream:
with trio.fail_after(10):
await book._ready_to_receive.wait()
try:
yield book, trades_stream
try:
yield book, trades_stream
finally:
# TODO: we want to eventually keep this up (by having
# the exec loop keep running in the pikerd tree) but for
# now we have to kill the context to avoid backpressure
# build-up on the shm write loop.
with trio.CancelScope(shield=True):
await trades_stream.aclose()
finally:
# TODO: we want to eventually keep this up (by having
# the exec loop keep running in the pikerd tree) but for
# now we have to kill the context to avoid backpressure
# build-up on the shm write loop.
with trio.CancelScope(shield=True):
await trades_stream.aclose()

View File

@ -339,130 +339,131 @@ async def process_broker_trades(
"""
broker = feed.mod.name
with trio.fail_after(5):
# in the paper engine case this is just a mem receive channel
trades_stream = await feed.recv_trades_data()
# TODO: make this a context
# in the paper engine case this is just a mem receive channel
async with feed.receive_trades_data() as trades_stream:
first = await trades_stream.__anext__()
# startup msg expected as first from broker backend
assert first['local_trades'] == 'start'
task_status.started()
# startup msg expected as first from broker backend
assert first['local_trades'] == 'start'
task_status.started()
async for event in trades_stream:
async for event in trades_stream:
name, msg = event['local_trades']
name, msg = event['local_trades']
log.info(f'Received broker trade event:\n{pformat(msg)}')
log.info(f'Received broker trade event:\n{pformat(msg)}')
if name == 'position':
msg['resp'] = 'position'
# relay through
await ctx.send_yield(msg)
continue
# Get the broker (order) request id, this **must** be normalized
# into messaging provided by the broker backend
reqid = msg['reqid']
# make response packet to EMS client(s)
oid = book._broker2ems_ids.get(reqid)
if oid is None:
# paper engine race case: ``Client.submit_limit()`` hasn't
# returned yet and provided an output reqid to register
# locally, so we need to retreive the oid that was already
# packed at submission since we already know it ahead of
# time
paper = msg.get('paper_info')
if paper:
oid = paper['oid']
else:
msg.get('external')
if not msg:
log.error(f"Unknown trade event {event}")
if name == 'position':
msg['resp'] = 'position'
# relay through
await ctx.send_yield(msg)
continue
resp = {
'resp': None, # placeholder
'oid': oid
}
# Get the broker (order) request id, this **must** be normalized
# into messaging provided by the broker backend
reqid = msg['reqid']
if name in (
'error',
):
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# make response packet to EMS client(s)
oid = book._broker2ems_ids.get(reqid)
# This looks like a supervision policy for pending orders on
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
if oid is None:
# paper engine race case: ``Client.submit_limit()`` hasn't
# returned yet and provided an output reqid to register
# locally, so we need to retreive the oid that was already
# packed at submission since we already know it ahead of
# time
paper = msg.get('paper_info')
if paper:
oid = paper['oid']
message = msg['message']
# XXX should we make one when it's blank?
log.error(pformat(message))
# TODO: getting this bs, prolly need to handle status messages
# 'Market data farm connection is OK:usfarm.nj'
# another stupid ib error to handle
# if 10147 in message: cancel
# don't relay message to order requester client
continue
elif name in (
'status',
):
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case
status = msg['status'].lower()
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not msg['remaining']:
resp['resp'] = 'broker_executed'
log.info(f'Execution for {oid} is complete!')
# just log it
else:
log.info(f'{broker} filled {msg}')
msg.get('external')
if not msg:
log.error(f"Unknown trade event {event}")
else:
# one of (submitted, cancelled)
resp['resp'] = 'broker_' + status
continue
elif name in (
'fill',
):
# proxy through the "fill" result(s)
resp['resp'] = 'broker_filled'
resp.update(msg)
resp = {
'resp': None, # placeholder
'oid': oid
}
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
if name in (
'error',
):
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# respond to requesting client
await ctx.send_yield(resp)
# This looks like a supervision policy for pending orders on
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
message = msg['message']
# XXX should we make one when it's blank?
log.error(pformat(message))
# TODO: getting this bs, prolly need to handle status messages
# 'Market data farm connection is OK:usfarm.nj'
# another stupid ib error to handle
# if 10147 in message: cancel
# don't relay message to order requester client
continue
elif name in (
'status',
):
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case
status = msg['status'].lower()
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not msg['remaining']:
resp['resp'] = 'broker_executed'
log.info(f'Execution for {oid} is complete!')
# just log it
else:
log.info(f'{broker} filled {msg}')
else:
# one of (submitted, cancelled)
resp['resp'] = 'broker_' + status
elif name in (
'fill',
):
# proxy through the "fill" result(s)
resp['resp'] = 'broker_filled'
resp.update(msg)
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
# respond to requesting client
await ctx.send_yield(resp)
async def process_order_cmds(
@ -675,17 +676,17 @@ async def _emsd_main(
# acting as an EMS client and will submit orders) to
# receive requests pushed over a tractor stream
# using (for now) an async generator.
order_stream = await portal.run(
async with portal.open_stream_from(
send_order_cmds,
symbol_key=symbol,
)
) as order_stream:
# start inbound order request processing
await process_order_cmds(
ctx,
order_stream,
symbol,
feed,
client,
dark_book,
)
# start inbound order request processing
await process_order_cmds(
ctx,
order_stream,
symbol,
feed,
client,
dark_book,
)