Merge pull request #366 from pikers/multisympaper

Fix #222 multi-symbol paper engine support
pydantic_zombie
goodboy 2022-07-27 12:29:05 -04:00 committed by GitHub
commit 01005e40a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 22 deletions

View File

@ -31,6 +31,7 @@ from ..log import get_logger
from ._ems import _emsd_main from ._ems import _emsd_main
from .._daemon import maybe_open_emsd from .._daemon import maybe_open_emsd
from ._messages import Order, Cancel from ._messages import Order, Cancel
from ..brokers import get_brokermod
log = get_logger(__name__) log = get_logger(__name__)
@ -203,14 +204,21 @@ async def open_ems(
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn) broker, symbol, suffix = unpack_fqsn(fqsn)
mode: str = 'live'
async with maybe_open_emsd(broker) as portal: async with maybe_open_emsd(broker) as portal:
mod = get_brokermod(broker)
if not getattr(mod, 'trades_dialogue', None):
mode = 'paper'
async with ( async with (
# connect to emsd # connect to emsd
portal.open_context( portal.open_context(
_emsd_main, _emsd_main,
fqsn=fqsn, fqsn=fqsn,
exec_mode=mode,
) as (ctx, (positions, accounts)), ) as (ctx, (positions, accounts)),

View File

@ -331,12 +331,11 @@ class Router(Struct):
@asynccontextmanager @asynccontextmanager
async def maybe_open_brokerd_trades_dialogue( async def maybe_open_brokerd_trades_dialogue(
self, self,
feed: Feed, feed: Feed,
symbol: str, symbol: str,
dark_book: _DarkBook, dark_book: _DarkBook,
_exec_mode: str, exec_mode: str,
loglevel: str, loglevel: str,
) -> tuple[dict, tractor.MsgStream]: ) -> tuple[dict, tractor.MsgStream]:
@ -346,14 +345,23 @@ class Router(Struct):
''' '''
relay = self.relays.get(feed.mod.name) relay = self.relays.get(feed.mod.name)
if relay is None: if (
relay is None
# We always want to spawn a new relay for the paper engine
# per symbol since we need a new tractor context to be
# opened for every every symbol such that a new data feed
# and ``PaperBoi`` client will be created and then used to
# simulate clearing events.
or exec_mode == 'paper'
):
relay = await self.nursery.start( relay = await self.nursery.start(
open_brokerd_trades_dialogue, open_brokerd_trades_dialogue,
self, self,
feed, feed,
symbol, symbol,
_exec_mode, exec_mode,
loglevel, loglevel,
) )
@ -380,7 +388,7 @@ async def open_brokerd_trades_dialogue(
router: Router, router: Router,
feed: Feed, feed: Feed,
symbol: str, symbol: str,
_exec_mode: str, exec_mode: str,
loglevel: str, loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
@ -404,20 +412,20 @@ async def open_brokerd_trades_dialogue(
# when the data feed closes it may result in a half-closed # when the data feed closes it may result in a half-closed
# channel that the brokerd side thinks is still open somehow!? # channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd( async with maybe_spawn_brokerd(
broker, broker,
loglevel=loglevel, loglevel=loglevel,
) as portal: ) as portal:
if (
if trades_endpoint is None or _exec_mode == 'paper': trades_endpoint is None
or exec_mode == 'paper'
):
# for paper mode we need to mock this trades response feed # for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a # so we load bidir stream to a new sub-actor running
# paper-simulator clearing engine. # a paper-simulator clearing engine.
# load the paper trading engine # load the paper trading engine
_exec_mode = 'paper' exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}') log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd # load the paper trading engine as a subactor of this emsd
@ -1009,8 +1017,8 @@ async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
fqsn: str, fqsn: str,
exec_mode: str, # ('paper', 'live')
_exec_mode: str = 'dark', # ('paper', 'dark', 'live')
loglevel: str = 'info', loglevel: str = 'info',
) -> None: ) -> None:
@ -1086,7 +1094,7 @@ async def _emsd_main(
feed, feed,
symbol, symbol,
dark_book, dark_book,
_exec_mode, exec_mode,
loglevel, loglevel,
) as relay, ) as relay,

View File

@ -268,7 +268,7 @@ class PaperBoi:
tid=oid, tid=oid,
size=size, size=size,
price=price, price=price,
cost=1., # todo cost model cost=0, # TODO: cost model
dt=pendulum.from_timestamp(fill_time_s), dt=pendulum.from_timestamp(fill_time_s),
bsuid=symbol, bsuid=symbol,
) )
@ -466,7 +466,6 @@ async def trades_dialogue(
tractor.log.get_console_log(loglevel) tractor.log.get_console_log(loglevel)
async with ( async with (
data.open_feed( data.open_feed(
[fqsn], [fqsn],
loglevel=loglevel, loglevel=loglevel,
@ -482,7 +481,6 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
client = PaperBoi( client = PaperBoi(
broker, broker,
ems_stream, ems_stream,
@ -498,7 +496,11 @@ async def trades_dialogue(
_trade_ledger={}, _trade_ledger={},
) )
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(
handle_order_requests,
client,
ems_stream,
)
# paper engine simulator clearing task # paper engine simulator clearing task
await simulate_fills(feed.stream, client) await simulate_fills(feed.stream, client)
@ -526,6 +528,7 @@ async def open_paperboi(
# (we likely don't need more then one proc for basic # (we likely don't need more then one proc for basic
# simulated order clearing) # simulated order clearing)
if portal is None: if portal is None:
log.info('Starting new paper-engine actor')
portal = await tn.start_actor( portal = await tn.start_actor(
service_name, service_name,
enable_modules=[__name__] enable_modules=[__name__]