Fix #222 multi-symbol paper engine support
parent
ae71168216
commit
2766fad719
|
@ -31,6 +31,7 @@ from ..log import get_logger
|
||||||
from ._ems import _emsd_main
|
from ._ems import _emsd_main
|
||||||
from .._daemon import maybe_open_emsd
|
from .._daemon import maybe_open_emsd
|
||||||
from ._messages import Order, Cancel
|
from ._messages import Order, Cancel
|
||||||
|
from ..brokers import get_brokermod
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -203,14 +204,21 @@ async def open_ems(
|
||||||
from ..data._source import unpack_fqsn
|
from ..data._source import unpack_fqsn
|
||||||
broker, symbol, suffix = unpack_fqsn(fqsn)
|
broker, symbol, suffix = unpack_fqsn(fqsn)
|
||||||
|
|
||||||
|
mode: str = 'live'
|
||||||
|
|
||||||
async with maybe_open_emsd(broker) as portal:
|
async with maybe_open_emsd(broker) as portal:
|
||||||
|
|
||||||
|
mod = get_brokermod(broker)
|
||||||
|
if not getattr(mod, 'trades_dialogue', None):
|
||||||
|
mode = 'paper'
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
# connect to emsd
|
# connect to emsd
|
||||||
portal.open_context(
|
portal.open_context(
|
||||||
|
|
||||||
_emsd_main,
|
_emsd_main,
|
||||||
fqsn=fqsn,
|
fqsn=fqsn,
|
||||||
|
exec_mode=mode,
|
||||||
|
|
||||||
) as (ctx, (positions, accounts)),
|
) as (ctx, (positions, accounts)),
|
||||||
|
|
||||||
|
|
|
@ -331,12 +331,11 @@ class Router(Struct):
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def maybe_open_brokerd_trades_dialogue(
|
async def maybe_open_brokerd_trades_dialogue(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
feed: Feed,
|
feed: Feed,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
dark_book: _DarkBook,
|
dark_book: _DarkBook,
|
||||||
_exec_mode: str,
|
exec_mode: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
|
||||||
) -> tuple[dict, tractor.MsgStream]:
|
) -> tuple[dict, tractor.MsgStream]:
|
||||||
|
@ -346,14 +345,23 @@ class Router(Struct):
|
||||||
'''
|
'''
|
||||||
relay = self.relays.get(feed.mod.name)
|
relay = self.relays.get(feed.mod.name)
|
||||||
|
|
||||||
if relay is None:
|
if (
|
||||||
|
relay is None
|
||||||
|
|
||||||
|
# We always want to spawn a new relay for the paper engine
|
||||||
|
# per symbol since we need a new tractor context to be
|
||||||
|
# opened for every every symbol such that a new data feed
|
||||||
|
# and ``PaperBoi`` client will be created and then used to
|
||||||
|
# simulate clearing events.
|
||||||
|
or exec_mode == 'paper'
|
||||||
|
):
|
||||||
|
|
||||||
relay = await self.nursery.start(
|
relay = await self.nursery.start(
|
||||||
open_brokerd_trades_dialogue,
|
open_brokerd_trades_dialogue,
|
||||||
self,
|
self,
|
||||||
feed,
|
feed,
|
||||||
symbol,
|
symbol,
|
||||||
_exec_mode,
|
exec_mode,
|
||||||
loglevel,
|
loglevel,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -380,7 +388,7 @@ async def open_brokerd_trades_dialogue(
|
||||||
router: Router,
|
router: Router,
|
||||||
feed: Feed,
|
feed: Feed,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
_exec_mode: str,
|
exec_mode: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
|
||||||
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
|
||||||
|
@ -404,20 +412,20 @@ async def open_brokerd_trades_dialogue(
|
||||||
# when the data feed closes it may result in a half-closed
|
# when the data feed closes it may result in a half-closed
|
||||||
# channel that the brokerd side thinks is still open somehow!?
|
# channel that the brokerd side thinks is still open somehow!?
|
||||||
async with maybe_spawn_brokerd(
|
async with maybe_spawn_brokerd(
|
||||||
|
|
||||||
broker,
|
broker,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
|
||||||
) as portal:
|
) as portal:
|
||||||
|
if (
|
||||||
if trades_endpoint is None or _exec_mode == 'paper':
|
trades_endpoint is None
|
||||||
|
or exec_mode == 'paper'
|
||||||
|
):
|
||||||
# for paper mode we need to mock this trades response feed
|
# for paper mode we need to mock this trades response feed
|
||||||
# so we load bidir stream to a new sub-actor running a
|
# so we load bidir stream to a new sub-actor running
|
||||||
# paper-simulator clearing engine.
|
# a paper-simulator clearing engine.
|
||||||
|
|
||||||
# load the paper trading engine
|
# load the paper trading engine
|
||||||
_exec_mode = 'paper'
|
exec_mode = 'paper'
|
||||||
log.warning(f'Entering paper trading mode for {broker}')
|
log.warning(f'Entering paper trading mode for {broker}')
|
||||||
|
|
||||||
# load the paper trading engine as a subactor of this emsd
|
# load the paper trading engine as a subactor of this emsd
|
||||||
|
@ -1009,8 +1017,8 @@ async def _emsd_main(
|
||||||
|
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
|
exec_mode: str, # ('paper', 'live')
|
||||||
|
|
||||||
_exec_mode: str = 'dark', # ('paper', 'dark', 'live')
|
|
||||||
loglevel: str = 'info',
|
loglevel: str = 'info',
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -1086,7 +1094,7 @@ async def _emsd_main(
|
||||||
feed,
|
feed,
|
||||||
symbol,
|
symbol,
|
||||||
dark_book,
|
dark_book,
|
||||||
_exec_mode,
|
exec_mode,
|
||||||
loglevel,
|
loglevel,
|
||||||
|
|
||||||
) as relay,
|
) as relay,
|
||||||
|
|
|
@ -268,7 +268,7 @@ class PaperBoi:
|
||||||
tid=oid,
|
tid=oid,
|
||||||
size=size,
|
size=size,
|
||||||
price=price,
|
price=price,
|
||||||
cost=1., # todo cost model
|
cost=0, # TODO: cost model
|
||||||
dt=pendulum.from_timestamp(fill_time_s),
|
dt=pendulum.from_timestamp(fill_time_s),
|
||||||
bsuid=symbol,
|
bsuid=symbol,
|
||||||
)
|
)
|
||||||
|
@ -466,7 +466,6 @@ async def trades_dialogue(
|
||||||
tractor.log.get_console_log(loglevel)
|
tractor.log.get_console_log(loglevel)
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
|
|
||||||
data.open_feed(
|
data.open_feed(
|
||||||
[fqsn],
|
[fqsn],
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
@ -482,7 +481,6 @@ async def trades_dialogue(
|
||||||
ctx.open_stream() as ems_stream,
|
ctx.open_stream() as ems_stream,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
|
|
||||||
client = PaperBoi(
|
client = PaperBoi(
|
||||||
broker,
|
broker,
|
||||||
ems_stream,
|
ems_stream,
|
||||||
|
@ -498,7 +496,11 @@ async def trades_dialogue(
|
||||||
_trade_ledger={},
|
_trade_ledger={},
|
||||||
)
|
)
|
||||||
|
|
||||||
n.start_soon(handle_order_requests, client, ems_stream)
|
n.start_soon(
|
||||||
|
handle_order_requests,
|
||||||
|
client,
|
||||||
|
ems_stream,
|
||||||
|
)
|
||||||
|
|
||||||
# paper engine simulator clearing task
|
# paper engine simulator clearing task
|
||||||
await simulate_fills(feed.stream, client)
|
await simulate_fills(feed.stream, client)
|
||||||
|
@ -526,6 +528,7 @@ async def open_paperboi(
|
||||||
# (we likely don't need more then one proc for basic
|
# (we likely don't need more then one proc for basic
|
||||||
# simulated order clearing)
|
# simulated order clearing)
|
||||||
if portal is None:
|
if portal is None:
|
||||||
|
log.info('Starting new paper-engine actor')
|
||||||
portal = await tn.start_actor(
|
portal = await tn.start_actor(
|
||||||
service_name,
|
service_name,
|
||||||
enable_modules=[__name__]
|
enable_modules=[__name__]
|
||||||
|
|
Loading…
Reference in New Issue