From b04645aa47e7020629efd13c228217d181a35e90 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 14 Sep 2021 10:36:13 -0400 Subject: [PATCH] Expect `accounts: set[str]` startup msg through clearing system --- piker/brokers/ib.py | 4 +++- piker/clearing/_client.py | 4 ++-- piker/clearing/_ems.py | 41 ++++++++++++++++++++------------- piker/clearing/_paper_engine.py | 2 +- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 21946802..51cf4a39 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1469,6 +1469,7 @@ async def trades_dialogue( # deliver positions to subscriber before anything else all_positions = [] + accounts = set() clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] for account, client in _accounts2clients.items(): @@ -1484,9 +1485,10 @@ async def trades_dialogue( for pos in client.positions(): msg = pack_position(pos) msg.account = accounts_def.inverse[msg.account] + accounts.add(msg.account) all_positions.append(msg.dict()) - await ctx.started(all_positions) + await ctx.started((all_positions, accounts)) async with ( ctx.open_stream() as ems_stream, diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index a23fdb5e..4f766daf 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -210,7 +210,7 @@ async def open_ems( broker=broker, symbol=symbol.key, - ) as (ctx, positions), + ) as (ctx, (positions, accounts)), # open 2-way trade command stream ctx.open_stream() as trades_stream, @@ -222,4 +222,4 @@ async def open_ems( trades_stream ) - yield book, trades_stream, positions + yield book, trades_stream, positions, accounts diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 397ef48b..583e2509 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -268,6 +268,9 @@ class TradesRelay: # map of symbols to dicts of accounts to pp msgs positions: dict[str, dict[str, BrokerdPosition]] + # allowed account names + accounts: set[str] + # count of connected ems clients for this ``brokerd`` consumers: int = 0 @@ -410,8 +413,7 @@ async def open_brokerd_trades_dialogue( try: async with ( - - open_trades_endpoint as (brokerd_ctx, positions), + open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), brokerd_ctx.open_stream() as brokerd_trades_stream, ): @@ -433,15 +435,20 @@ async def open_brokerd_trades_dialogue( # locally cache and track positions per account. pps = {} for msg in positions: + + account = msg['account'] + assert account in accounts + pps.setdefault( msg['symbol'], {} - )[msg['account']] = msg + )[account] = msg relay = TradesRelay( brokerd_dialogue=brokerd_trades_stream, positions=pps, - consumers=1 + accounts=set(accounts), + consumers=1, ) _router.relays[broker] = relay @@ -936,11 +943,11 @@ async def _emsd_main( ) -> None: '''EMS (sub)actor entrypoint providing the execution management (micro)service which conducts broker - order control on behalf of clients. + order clearing control on behalf of clients. This is the daemon (child) side routine which starts an EMS runtime - (one per broker-feed) and and begins streaming back alerts from - broker executions/fills. + task (one per broker-feed) and and begins streaming back alerts from + each broker's executions/fills. ``send_order_cmds()`` is called here to execute in a task back in the actor which started this service (spawned this actor), presuming @@ -964,8 +971,8 @@ async def _emsd_main( reponse" proxy-broker. | - ``process_client_order_cmds()``: - accepts order cmds from requesting piker clients, registers - execs with exec loop + accepts order cmds from requesting clients, registers dark orders and + alerts with clearing loop. ''' global _router @@ -1015,13 +1022,15 @@ async def _emsd_main( brokerd_stream = relay.brokerd_dialogue # .clone() - # signal to client that we're started - # TODO: we could eventually send back **all** brokerd - # positions here? - await ems_ctx.started( - {sym: list(pps.values()) - for sym, pps in relay.positions.items()} - ) + # flatten out collected pps from brokerd for delivery + pp_msgs = { + sym: list(pps.values()) + for sym, pps in relay.positions.items() + } + + # signal to client that we're started and deliver + # all known pps and accounts for this ``brokerd``. + await ems_ctx.started((pp_msgs, relay.accounts)) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 628f58b9..892087c4 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -463,7 +463,7 @@ async def trades_dialogue( # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` # await ctx.started(all_positions) - await ctx.started({}) + await ctx.started(({}, {'paper',})) async with ( ctx.open_stream() as ems_stream,