From 2766fad719a889df517879810de0db3b8dc4c614 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Jul 2022 11:16:39 -0400 Subject: [PATCH] Fix #222 multi-symbol paper engine support --- piker/clearing/_client.py | 8 ++++++++ piker/clearing/_ems.py | 36 ++++++++++++++++++++------------- piker/clearing/_paper_engine.py | 19 +++++++++-------- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 91cb94fa..95d80986 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -31,6 +31,7 @@ from ..log import get_logger from ._ems import _emsd_main from .._daemon import maybe_open_emsd from ._messages import Order, Cancel +from ..brokers import get_brokermod log = get_logger(__name__) @@ -203,14 +204,21 @@ async def open_ems( from ..data._source import unpack_fqsn broker, symbol, suffix = unpack_fqsn(fqsn) + mode: str = 'live' + async with maybe_open_emsd(broker) as portal: + mod = get_brokermod(broker) + if not getattr(mod, 'trades_dialogue', None): + mode = 'paper' + async with ( # connect to emsd portal.open_context( _emsd_main, fqsn=fqsn, + exec_mode=mode, ) as (ctx, (positions, accounts)), diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 2b9f50cd..1a764812 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -331,12 +331,11 @@ class Router(Struct): @asynccontextmanager async def maybe_open_brokerd_trades_dialogue( - self, feed: Feed, symbol: str, dark_book: _DarkBook, - _exec_mode: str, + exec_mode: str, loglevel: str, ) -> tuple[dict, tractor.MsgStream]: @@ -346,14 +345,23 @@ class Router(Struct): ''' relay = self.relays.get(feed.mod.name) - if relay is None: + if ( + relay is None + + # We always want to spawn a new relay for the paper engine + # per symbol since we need a new tractor context to be + # opened for every every symbol such that a new data feed + # and ``PaperBoi`` client will be created and then used to + # simulate clearing events. + or exec_mode == 'paper' + ): relay = await self.nursery.start( open_brokerd_trades_dialogue, self, feed, symbol, - _exec_mode, + exec_mode, loglevel, ) @@ -380,7 +388,7 @@ async def open_brokerd_trades_dialogue( router: Router, feed: Feed, symbol: str, - _exec_mode: str, + exec_mode: str, loglevel: str, task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, @@ -404,20 +412,20 @@ async def open_brokerd_trades_dialogue( # when the data feed closes it may result in a half-closed # channel that the brokerd side thinks is still open somehow!? async with maybe_spawn_brokerd( - broker, loglevel=loglevel, ) as portal: - - if trades_endpoint is None or _exec_mode == 'paper': - + if ( + trades_endpoint is None + or exec_mode == 'paper' + ): # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running a - # paper-simulator clearing engine. + # so we load bidir stream to a new sub-actor running + # a paper-simulator clearing engine. # load the paper trading engine - _exec_mode = 'paper' + exec_mode = 'paper' log.warning(f'Entering paper trading mode for {broker}') # load the paper trading engine as a subactor of this emsd @@ -1009,8 +1017,8 @@ async def _emsd_main( ctx: tractor.Context, fqsn: str, + exec_mode: str, # ('paper', 'live') - _exec_mode: str = 'dark', # ('paper', 'dark', 'live') loglevel: str = 'info', ) -> None: @@ -1086,7 +1094,7 @@ async def _emsd_main( feed, symbol, dark_book, - _exec_mode, + exec_mode, loglevel, ) as relay, diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 7d2ceebd..2c7c8603 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -268,7 +268,7 @@ class PaperBoi: tid=oid, size=size, price=price, - cost=1., # todo cost model + cost=0, # TODO: cost model dt=pendulum.from_timestamp(fill_time_s), bsuid=symbol, ) @@ -466,7 +466,6 @@ async def trades_dialogue( tractor.log.get_console_log(loglevel) async with ( - data.open_feed( [fqsn], loglevel=loglevel, @@ -482,7 +481,6 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): - client = PaperBoi( broker, ems_stream, @@ -498,7 +496,11 @@ async def trades_dialogue( _trade_ledger={}, ) - n.start_soon(handle_order_requests, client, ems_stream) + n.start_soon( + handle_order_requests, + client, + ems_stream, + ) # paper engine simulator clearing task await simulate_fills(feed.stream, client) @@ -526,16 +528,17 @@ async def open_paperboi( # (we likely don't need more then one proc for basic # simulated order clearing) if portal is None: + log.info('Starting new paper-engine actor') portal = await tn.start_actor( service_name, enable_modules=[__name__] ) async with portal.open_context( - trades_dialogue, - broker=broker, - fqsn=fqsn, - loglevel=loglevel, + trades_dialogue, + broker=broker, + fqsn=fqsn, + loglevel=loglevel, ) as (ctx, first): yield ctx, first