Factor `brokerd` trade dialog init into acm

Connecting to a `brokerd` daemon's trading dialog via a helper `@acm`
func is handy so that arbitrary trading middleware clients **and** the
ems can setup a trading dialog and, at the least, query existing
position state; this is in fact our immediate need when simply querying
for an account's position status in the `.accounting.cli.ledger` cli.

It's now exposed (for now) as `.clearing._ems.open_brokerd_dialog()` and
is called by the `Router.maybe_open_brokerd_dialog()` for every new
relay allocation or paper-account engine instance.
basic_buy_bot
Tyler Goodlet 2023-06-12 19:51:55 -04:00
parent d704d631ba
commit cc3037149c
2 changed files with 190 additions and 116 deletions

View File

@ -23,11 +23,15 @@ from ._client import (
open_ems,
OrderClient,
)
from ._ems import (
open_brokerd_dialog,
)
__all__ = [
'open_ems',
'OrderClient',
'open_brokerd_dialog',
]

View File

@ -34,6 +34,7 @@ from typing import (
Callable,
Hashable,
Optional,
TYPE_CHECKING,
)
from bidict import bidict
@ -50,14 +51,8 @@ from ..accounting._mktinfo import (
unpack_fqme,
float_digits,
)
from ..data.feed import (
Feed,
Flume,
maybe_open_feed,
)
from ..ui._notify import notify_from_ems_status_msg
from ..data.types import Struct
from . import _paper_engine as paper
from ._messages import (
Order,
Status,
@ -70,6 +65,12 @@ from ._messages import (
BrokerdPosition,
)
if TYPE_CHECKING:
from ..data.feed import (
Feed,
Flume,
)
# TODO: numba all of this
def mk_check(
@ -307,15 +308,175 @@ class TradesRelay(Struct):
# map of symbols to dicts of accounts to pp msgs
positions: dict[
# brokername, acctid
# brokername, acctid ->
tuple[str, str],
list[BrokerdPosition],
# fqme -> msg
dict[str, BrokerdPosition],
]
# allowed account names
accounts: tuple[str]
@acm
async def open_brokerd_dialog(
brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str,
fqme: str | None = None,
loglevel: str | None = None,
) -> tuple[
tractor.MsgStream,
# {(brokername, accountname) -> {fqme -> msg}}
dict[(str, str), dict[str, BrokerdPosition]],
list[str],
]:
'''
Open either a live trades control dialog or a dialog with a new
paper engine instance depending on live trading support for the
broker backend, configuration, or client code usage.
'''
broker: str = brokermod.name
def mk_paper_ep():
from . import _paper_engine as paper_mod
nonlocal brokermod, exec_mode
# for logging purposes
brokermod = paper_mod
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running
# a paper-simulator clearing engine.
# load the paper trading engine
exec_mode = 'paper'
log.info(f'{broker}: Entering `paper` trading mode')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
if not fqme:
log.warning(
f'Paper engine activate for {broker} but no fqme provided?'
)
return paper_mod.open_paperboi(
fqme=fqme,
broker=broker,
loglevel=loglevel,
)
# TODO: ideally choose only one of these ep names..
trades_endpoint: Callable
for ep_name in [
'trades_dialogue',
'open_trade_dialog',
]:
trades_endpoint = getattr(
brokermod,
ep_name,
None,
)
break
if (
trades_endpoint is not None
or exec_mode != 'paper'
):
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
else:
exec_mode: str = 'paper'
@acm
async def maybe_open_paper_ep():
if exec_mode == 'paper':
async with mk_paper_ep() as msg:
yield msg
return
# open trades-dialog endpoint with backend broker
async with open_trades_endpoint as msg:
ctx, first = msg
# runtime indication that the backend can't support live
# order ctrl yet, so boot the paperboi B0
if first == 'paper':
async with mk_paper_ep() as msg:
yield msg
return
else:
# working live ep case B)
yield msg
return
pps_by_broker_account: dict[(str, str), BrokerdPosition] = {}
async with (
maybe_open_paper_ep() as (
brokerd_ctx,
(position_msgs, accounts),
),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd`
# actor to relay global `brokerd` order events
# unless we're going to expect each backend to
# relay only orders affiliated with a particular
# ``trades_dialogue()`` session (seems annoying
# for implementers). So, here we cache the relay
# task and instead of running multiple tasks
# (which will result in multiples of the same
# msg being relayed for each EMS client) we just
# register each client stream to this single
# relay loop in the dialog table.
# begin processing order events from the target
# brokerd backend by receiving order submission
# response messages, normalizing them to EMS
# messages and relaying back to the piker order
# client set.
# locally cache and track positions per account with
# a nested table of msgs:
# tuple(brokername, acctid) ->
# (fqme: str ->
# `BrokerdPosition`)
for msg in position_msgs:
msg = BrokerdPosition(**msg)
log.info(
f'loading pp for {brokermod.__name__}:\n'
f'{pformat(msg.to_dict())}',
)
# TODO: state any mismatch here?
account: str = msg.account
assert account in accounts
pps_by_broker_account.setdefault(
(broker, account),
{},
)[msg.symbol] = msg
# should be unique entries, verdad!
assert len(set(accounts)) == len(accounts)
yield (
brokerd_trades_stream,
pps_by_broker_account,
accounts,
)
class Router(Struct):
'''
Order router which manages and tracks per-broker dark book,
@ -407,118 +568,25 @@ class Router(Struct):
yield relay
return
def mk_paper_ep():
nonlocal brokermod, exec_mode
async with open_brokerd_dialog(
brokermod=brokermod,
portal=portal,
exec_mode=exec_mode,
fqme=fqme,
loglevel=loglevel,
# for logging purposes
brokermod = paper
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running
# a paper-simulator clearing engine.
# load the paper trading engine
exec_mode = 'paper'
log.info(f'{broker}: Entering `paper` trading mode')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
return paper.open_paperboi(
fqme=fqme,
loglevel=loglevel,
)
trades_endpoint = getattr(brokermod, 'trades_dialogue', None)
if (
trades_endpoint is not None
or exec_mode != 'paper'
) as (
brokerd_stream,
pp_msg_table,
accounts,
):
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
else:
exec_mode: str = 'paper'
@acm
async def maybe_open_paper_ep():
if exec_mode == 'paper':
async with mk_paper_ep() as msg:
yield msg
return
# open trades-dialog endpoint with backend broker
async with open_trades_endpoint as msg:
ctx, first = msg
# runtime indication that the backend can't support live
# order ctrl yet, so boot the paperboi B0
if first == 'paper':
async with mk_paper_ep() as msg:
yield msg
return
else:
# working live ep case B)
yield msg
return
positions: list[BrokerdPosition]
accounts: tuple[str]
async with (
maybe_open_paper_ep() as (
brokerd_ctx,
(positions, accounts),
),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd`
# actor to relay global `brokerd` order events
# unless we're going to expect each backend to
# relay only orders affiliated with a particular
# ``trades_dialogue()`` session (seems annoying
# for implementers). So, here we cache the relay
# task and instead of running multiple tasks
# (which will result in multiples of the same
# msg being relayed for each EMS client) we just
# register each client stream to this single
# relay loop in the dialog table.
# begin processing order events from the target
# brokerd backend by receiving order submission
# response messages, normalizing them to EMS
# messages and relaying back to the piker order
# client set.
# locally cache and track positions per account with
# a nested table of msgs:
# tuple(brokername, acctid) ->
# (fqme: str ->
# `BrokerdPosition`)
# create a new relay and sync it's state according
# to brokerd-backend reported position msgs.
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions={},
accounts=accounts,
brokerd_stream=brokerd_stream,
positions=pp_msg_table,
accounts=tuple(accounts),
)
for msg in positions:
msg = BrokerdPosition(**msg)
log.info(
f'loading pp for {brokermod.__name__}:\n'
f'{pformat(msg.to_dict())}',
)
# TODO: state any mismatch here?
account = msg.account
assert account in accounts
relay.positions.setdefault(
(broker, account),
{},
)[msg.symbol] = msg
self.relays[broker] = relay
# this context should block here indefinitely until
@ -550,6 +618,8 @@ class Router(Struct):
indefinitely.
'''
from ..data.feed import maybe_open_feed
async with (
maybe_open_feed(
[fqme],