Factor order request processing into new func

basic_orders
Tyler Goodlet 2021-03-07 16:25:47 -05:00
parent e71bcb363c
commit 5deea50963
1 changed files with 149 additions and 129 deletions

View File

@ -448,72 +448,16 @@ async def process_broker_trades(
await ctx.send_yield(resp)
@tractor.stream
async def _emsd_main(
async def process_order_cmds(
ctx: tractor.Context,
client_actor_name: str,
broker: str,
cmd_stream: 'tractor.ReceiveStream', # noqa
symbol: str,
_mode: str = 'dark', # ('paper', 'dark', 'live')
feed: 'Feed', # noqa
client: 'Client', # noqa
dark_book: _DarkBook,
) -> None:
"""EMS (sub)actor entrypoint providing the
execution management (micro)service which conducts broker
order control on behalf of clients.
This is the daemon (child) side routine which starts an EMS runtime
(one per broker-feed) and and begins streaming back alerts from
broker executions/fills.
``send_order_cmds()`` is called here to execute in a task back in
the actor which started this service (spawned this actor), presuming
capabilities allow it, such that requests for EMS executions are
received in a stream from that client actor and then responses are
streamed back up to the original calling task in the same client.
The task tree is:
- ``_emsd_main()``:
accepts order cmds, registers execs with exec loop
- ``exec_loop()``:
run (dark) conditions on inputs and trigger broker submissions
- ``process_broker_trades()``:
accept normalized trades responses, process and relay to ems client(s)
"""
from ._client import send_order_cmds
dark_book = get_dark_book(broker)
# get a portal back to the client
async with tractor.wait_for_actor(client_actor_name) as portal:
# spawn one task per broker feed
async with trio.open_nursery() as n:
# TODO: eventually support N-brokers
# start the condition scan loop
quote, feed, client = await n.start(
exec_loop,
ctx,
broker,
symbol,
_mode,
)
await n.start(
process_broker_trades,
ctx,
feed,
dark_book,
)
# connect back to the calling actor (the one that is
# acting as an EMS client and will submit orders) to
# receive requests pushed over a tractor stream
# using (for now) an async generator.
async for cmd in await portal.run(send_order_cmds):
async for cmd in cmd_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}')
@ -549,7 +493,7 @@ async def _emsd_main(
trigger_price = cmd['price']
size = cmd['size']
brokers = cmd['brokers']
exec_mode = cmd.get('exec_mode', _mode)
exec_mode = cmd['exec_mode']
broker = brokers[0]
last = dark_book.lasts[(broker, sym)]
@ -642,4 +586,80 @@ async def _emsd_main(
'oid': oid
})
# continue and wait on next order cmd
@tractor.stream
async def _emsd_main(
ctx: tractor.Context,
client_actor_name: str,
broker: str,
symbol: str,
_mode: str = 'dark', # ('paper', 'dark', 'live')
) -> None:
"""EMS (sub)actor entrypoint providing the
execution management (micro)service which conducts broker
order control on behalf of clients.
This is the daemon (child) side routine which starts an EMS runtime
(one per broker-feed) and and begins streaming back alerts from
broker executions/fills.
``send_order_cmds()`` is called here to execute in a task back in
the actor which started this service (spawned this actor), presuming
capabilities allow it, such that requests for EMS executions are
received in a stream from that client actor and then responses are
streamed back up to the original calling task in the same client.
The task tree is:
- ``_emsd_main()``:
accepts order cmds, registers execs with exec loop
- ``exec_loop()``:
run (dark) conditions on inputs and trigger broker submissions
- ``process_broker_trades()``:
accept normalized trades responses, process and relay to ems client(s)
"""
from ._client import send_order_cmds
dark_book = get_dark_book(broker)
# get a portal back to the client
async with tractor.wait_for_actor(client_actor_name) as portal:
# spawn one task per broker feed
async with trio.open_nursery() as n:
# TODO: eventually support N-brokers
# start the condition scan loop
quote, feed, client = await n.start(
exec_loop,
ctx,
broker,
symbol,
_mode,
)
await n.start(
process_broker_trades,
ctx,
feed,
dark_book,
)
# connect back to the calling actor (the one that is
# acting as an EMS client and will submit orders) to
# receive requests pushed over a tractor stream
# using (for now) an async generator.
order_stream = await portal.run(send_order_cmds)
# start inbound order request processing
await process_order_cmds(
ctx,
order_stream,
symbol,
feed,
client,
dark_book,
)