Drop nesting level from emsd's pp cacheing, adjust order mode
parent
076c167d6e
commit
7442d68ecf
|
@ -289,7 +289,11 @@ class TradesRelay:
|
||||||
brokerd_dialogue: tractor.MsgStream
|
brokerd_dialogue: tractor.MsgStream
|
||||||
|
|
||||||
# map of symbols to dicts of accounts to pp msgs
|
# map of symbols to dicts of accounts to pp msgs
|
||||||
positions: dict[str, dict[str, BrokerdPosition]]
|
positions: dict[
|
||||||
|
# brokername, acctid
|
||||||
|
tuple[str, str],
|
||||||
|
list[BrokerdPosition],
|
||||||
|
]
|
||||||
|
|
||||||
# allowed account names
|
# allowed account names
|
||||||
accounts: tuple[str]
|
accounts: tuple[str]
|
||||||
|
@ -461,18 +465,24 @@ async def open_brokerd_trades_dialogue(
|
||||||
# normalizing them to EMS messages and relaying back to
|
# normalizing them to EMS messages and relaying back to
|
||||||
# the piker order client set.
|
# the piker order client set.
|
||||||
|
|
||||||
# locally cache and track positions per account.
|
# locally cache and track positions per account with
|
||||||
|
# a table of (brokername, acctid) -> `BrokerdPosition`
|
||||||
|
# msgs.
|
||||||
pps = {}
|
pps = {}
|
||||||
for msg in positions:
|
for msg in positions:
|
||||||
log.info(f'loading pp: {msg}')
|
log.info(f'loading pp: {msg}')
|
||||||
|
|
||||||
account = msg['account']
|
account = msg['account']
|
||||||
|
|
||||||
|
# TODO: better value error for this which
|
||||||
|
# dumps the account and message and states the
|
||||||
|
# mismatch..
|
||||||
assert account in accounts
|
assert account in accounts
|
||||||
|
|
||||||
pps.setdefault(
|
pps.setdefault(
|
||||||
f'{msg["symbol"]}.{broker}',
|
(broker, account),
|
||||||
{}
|
[],
|
||||||
)[account] = msg
|
).append(msg)
|
||||||
|
|
||||||
relay = TradesRelay(
|
relay = TradesRelay(
|
||||||
brokerd_dialogue=brokerd_trades_stream,
|
brokerd_dialogue=brokerd_trades_stream,
|
||||||
|
@ -578,11 +588,9 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
relay.positions.setdefault(
|
relay.positions.setdefault(
|
||||||
# NOTE: translate to a FQSN!
|
# NOTE: translate to a FQSN!
|
||||||
f'{sym}.{broker}',
|
(broker, sym),
|
||||||
{}
|
[]
|
||||||
).setdefault(
|
).append(pos_msg)
|
||||||
pos_msg['account'], {}
|
|
||||||
).update(pos_msg)
|
|
||||||
|
|
||||||
# fan-out-relay position msgs immediately by
|
# fan-out-relay position msgs immediately by
|
||||||
# broadcasting updates on all client streams
|
# broadcasting updates on all client streams
|
||||||
|
@ -635,8 +643,8 @@ async def translate_and_relay_brokerd_events(
|
||||||
# something is out of order, we don't have an oid for
|
# something is out of order, we don't have an oid for
|
||||||
# this broker-side message.
|
# this broker-side message.
|
||||||
log.error(
|
log.error(
|
||||||
'Unknown oid:{oid} for msg:\n'
|
f'Unknown oid: {oid} for msg:\n'
|
||||||
f'{pformat(brokerd_msg)}'
|
f'{pformat(brokerd_msg)}\n'
|
||||||
'Unable to relay message to client side!?'
|
'Unable to relay message to client side!?'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1088,15 +1096,12 @@ async def _emsd_main(
|
||||||
|
|
||||||
brokerd_stream = relay.brokerd_dialogue # .clone()
|
brokerd_stream = relay.brokerd_dialogue # .clone()
|
||||||
|
|
||||||
# flatten out collected pps from brokerd for delivery
|
|
||||||
pp_msgs = {
|
|
||||||
fqsn: list(pps.values())
|
|
||||||
for fqsn, pps in relay.positions.items()
|
|
||||||
}
|
|
||||||
|
|
||||||
# signal to client that we're started and deliver
|
# signal to client that we're started and deliver
|
||||||
# all known pps and accounts for this ``brokerd``.
|
# all known pps and accounts for this ``brokerd``.
|
||||||
await ems_ctx.started((pp_msgs, list(relay.accounts)))
|
await ems_ctx.started((
|
||||||
|
relay.positions,
|
||||||
|
list(relay.accounts),
|
||||||
|
))
|
||||||
|
|
||||||
# establish 2-way stream with requesting order-client and
|
# establish 2-way stream with requesting order-client and
|
||||||
# begin handling inbound order requests and updates
|
# begin handling inbound order requests and updates
|
||||||
|
|
|
@ -579,9 +579,9 @@ async def open_order_mode(
|
||||||
providers=symbol.brokers
|
providers=symbol.brokers
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: ``brokerd`` delivers a set of account names that it allows
|
# XXX: ``brokerd`` delivers a set of account names that it
|
||||||
# use of but the user also can define the accounts they'd like
|
# allows use of but the user also can define the accounts they'd
|
||||||
# to use, in order, in their `brokers.toml` file.
|
# like to use, in order, in their `brokers.toml` file.
|
||||||
accounts = {}
|
accounts = {}
|
||||||
for name in brokerd_accounts:
|
for name in brokerd_accounts:
|
||||||
# ensure name is in ``brokers.toml``
|
# ensure name is in ``brokers.toml``
|
||||||
|
@ -594,10 +594,21 @@ async def open_order_mode(
|
||||||
iter(accounts.keys())
|
iter(accounts.keys())
|
||||||
) if accounts else 'paper'
|
) if accounts else 'paper'
|
||||||
|
|
||||||
|
# Pack position messages by account, should only be one-to-one.
|
||||||
# NOTE: requires the backend exactly specifies
|
# NOTE: requires the backend exactly specifies
|
||||||
# the expected symbol key in its positions msg.
|
# the expected symbol key in its positions msg.
|
||||||
pp_msgs = position_msgs.get(symkey, ())
|
pps_by_account = {}
|
||||||
pps_by_account = {msg['account']: msg for msg in pp_msgs}
|
for (broker, acctid), msgs in position_msgs.items():
|
||||||
|
for msg in msgs:
|
||||||
|
|
||||||
|
sym = msg['symbol']
|
||||||
|
if (
|
||||||
|
sym == symkey or
|
||||||
|
# mega-UGH, i think we need to fix the FQSN stuff sooner
|
||||||
|
# then later..
|
||||||
|
sym == symkey.removesuffix(f'.{broker}')
|
||||||
|
):
|
||||||
|
pps_by_account[acctid] = msg
|
||||||
|
|
||||||
# update pp trackers with data relayed from ``brokerd``.
|
# update pp trackers with data relayed from ``brokerd``.
|
||||||
for account_name in accounts:
|
for account_name in accounts:
|
||||||
|
|
Loading…
Reference in New Issue