Cache EMS trade relay tasks on feed fqsn

Except for paper accounts (in which case we need a trades dialog and
paper engine per symbol to enable simulated clearing) we can rely on the
instrument feed (symbol name) to be the caching key. Utilize
`tractor.trionics.maybe_open_context()` and the new key-as-callable
support in the paper case to ensure we have separate paper clearing
loops per symbol.

Requires https://github.com/goodboy/tractor/pull/329
offline_dark_clearing
Tyler Goodlet 2022-10-06 17:37:05 -04:00
parent 2bc25e3593
commit 94f81587ab
1 changed files with 95 additions and 59 deletions

View File

@ -31,6 +31,7 @@ from typing import (
AsyncIterator, AsyncIterator,
Any, Any,
Callable, Callable,
Hashable,
Optional, Optional,
) )
@ -1265,6 +1266,24 @@ async def process_client_order_cmds(
) )
@acm
async def maybe_open_trade_relays(
router: Router,
fqsn: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
) -> tuple:
relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays,
fqsn,
exec_mode,
loglevel,
)
yield relay, feed, client_ready
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
@ -1329,69 +1348,86 @@ async def _emsd_main(
feed: Feed feed: Feed
client_ready: trio.Event client_ready: trio.Event
# open a stream with the brokerd backend for order flow dialogue # NOTE: open a stream with the brokerd backend for order flow
# only open if one isn't already up: we try to keep as few duplicate # dialogue and dark clearing but only open one: we try to keep as
# streams as necessary. # few duplicate streams as necessary per ems actor.
# TODO: should we try using `tractor.trionics.maybe_open_context()` def cache_on_fqsn_unless_paper(
# here? router: Router,
relay, feed, client_ready = await _router.nursery.start( fqsn: str,
_router.open_trade_relays, exec_mode: str, # ('paper', 'live')
fqsn, loglevel: str = 'info',
exec_mode, ) -> Hashable:
loglevel, if exec_mode == 'paper':
) return f'paper_{fqsn}'
brokerd_stream = relay.brokerd_stream else:
dark_book = _router.get_dark_book(broker) return fqsn
# signal to client that we're started and deliver async with tractor.trionics.maybe_open_context(
# all known pps and accounts for this ``brokerd``. acm_func=maybe_open_trade_relays,
await ems_ctx.started(( kwargs={
relay.positions, 'router': _router,
list(relay.accounts), 'fqsn': fqsn,
dark_book._active, 'exec_mode': exec_mode,
)) 'loglevel': loglevel,
},
key=cache_on_fqsn_unless_paper,
# establish 2-way stream with requesting order-client and ) as (
# begin handling inbound order requests and updates cache_hit,
async with ems_ctx.open_stream() as client_stream: (relay, feed, client_ready)
):
brokerd_stream = relay.brokerd_stream
dark_book = _router.get_dark_book(broker)
# register the client side before starting the # signal to client that we're started and deliver
# brokerd-side relay task to ensure the client is # all known pps and accounts for this ``brokerd``.
# delivered all exisiting open orders on startup. await ems_ctx.started((
# TODO: instead of by fqsn we need a subscription relay.positions,
# system/schema here to limit what each new client is list(relay.accounts),
# allowed to see in terms of broadcasted order flow dark_book._active,
# updates per dialog. ))
_router.subscribers[fqsn].add(client_stream)
client_ready.set()
# start inbound (from attached client) order request processing # establish 2-way stream with requesting order-client and
# main entrypoint, run here until cancelled. # begin handling inbound order requests and updates
try: async with ems_ctx.open_stream() as client_stream:
await process_client_order_cmds(
client_stream,
brokerd_stream,
fqsn,
feed,
dark_book,
_router,
)
finally: # register the client side before starting the
# try to remove client from subscription registry # brokerd-side relay task to ensure the client is
_router.subscribers[fqsn].remove(client_stream) # delivered all exisiting open orders on startup.
# TODO: instead of by fqsn we need a subscription
# system/schema here to limit what each new client is
# allowed to see in terms of broadcasted order flow
# updates per dialog.
_router.subscribers[fqsn].add(client_stream)
client_ready.set()
for oid, client_streams in _router.dialogs.items(): # start inbound (from attached client) order request processing
client_streams.discard(client_stream) # main entrypoint, run here until cancelled.
try:
await process_client_order_cmds(
client_stream,
brokerd_stream,
fqsn,
feed,
dark_book,
_router,
)
# TODO: for order dialogs left "alive" in finally:
# the ems this is where we should allow some # try to remove client from subscription registry
# system to take over management. Likely we _router.subscribers[fqsn].remove(client_stream)
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders for oid, client_streams in _router.dialogs.items():
# from client, run some algo, etc.) client_streams.discard(client_stream)
if not client_streams:
log.warning( # TODO: for order dialogs left "alive" in
f'Order dialog is not being monitored:\n' # the ems this is where we should allow some
f'{oid} ->\n{client_stream._ctx.chan.uid}' # system to take over management. Likely we
) # want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.)
if not client_streams:
log.warning(
f'Order dialog is not being monitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)