Expect `accounts: set[str]` startup msg through clearing system

chart_mod_breakup
Tyler Goodlet 2021-09-14 10:36:13 -04:00
parent 75e1bf3f6e
commit b04645aa47
4 changed files with 31 additions and 20 deletions

View File

@ -1469,6 +1469,7 @@ async def trades_dialogue(
# deliver positions to subscriber before anything else
all_positions = []
accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
for account, client in _accounts2clients.items():
@ -1484,9 +1485,10 @@ async def trades_dialogue(
for pos in client.positions():
msg = pack_position(pos)
msg.account = accounts_def.inverse[msg.account]
accounts.add(msg.account)
all_positions.append(msg.dict())
await ctx.started(all_positions)
await ctx.started((all_positions, accounts))
async with (
ctx.open_stream() as ems_stream,

View File

@ -210,7 +210,7 @@ async def open_ems(
broker=broker,
symbol=symbol.key,
) as (ctx, positions),
) as (ctx, (positions, accounts)),
# open 2-way trade command stream
ctx.open_stream() as trades_stream,
@ -222,4 +222,4 @@ async def open_ems(
trades_stream
)
yield book, trades_stream, positions
yield book, trades_stream, positions, accounts

View File

@ -268,6 +268,9 @@ class TradesRelay:
# map of symbols to dicts of accounts to pp msgs
positions: dict[str, dict[str, BrokerdPosition]]
# allowed account names
accounts: set[str]
# count of connected ems clients for this ``brokerd``
consumers: int = 0
@ -410,8 +413,7 @@ async def open_brokerd_trades_dialogue(
try:
async with (
open_trades_endpoint as (brokerd_ctx, positions),
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
@ -433,15 +435,20 @@ async def open_brokerd_trades_dialogue(
# locally cache and track positions per account.
pps = {}
for msg in positions:
account = msg['account']
assert account in accounts
pps.setdefault(
msg['symbol'],
{}
)[msg['account']] = msg
)[account] = msg
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=pps,
consumers=1
accounts=set(accounts),
consumers=1,
)
_router.relays[broker] = relay
@ -936,11 +943,11 @@ async def _emsd_main(
) -> None:
'''EMS (sub)actor entrypoint providing the
execution management (micro)service which conducts broker
order control on behalf of clients.
order clearing control on behalf of clients.
This is the daemon (child) side routine which starts an EMS runtime
(one per broker-feed) and and begins streaming back alerts from
broker executions/fills.
task (one per broker-feed) and and begins streaming back alerts from
each broker's executions/fills.
``send_order_cmds()`` is called here to execute in a task back in
the actor which started this service (spawned this actor), presuming
@ -964,8 +971,8 @@ async def _emsd_main(
reponse" proxy-broker.
|
- ``process_client_order_cmds()``:
accepts order cmds from requesting piker clients, registers
execs with exec loop
accepts order cmds from requesting clients, registers dark orders and
alerts with clearing loop.
'''
global _router
@ -1015,13 +1022,15 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started
# TODO: we could eventually send back **all** brokerd
# positions here?
await ems_ctx.started(
{sym: list(pps.values())
for sym, pps in relay.positions.items()}
)
# flatten out collected pps from brokerd for delivery
pp_msgs = {
sym: list(pps.values())
for sym, pps in relay.positions.items()
}
# signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``.
await ems_ctx.started((pp_msgs, relay.accounts))
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates

View File

@ -463,7 +463,7 @@ async def trades_dialogue(
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)
await ctx.started({})
await ctx.started(({}, {'paper',}))
async with (
ctx.open_stream() as ems_stream,