Merge pull request from pikers/pp_relay_hotfix

Ugh, positions relay hotfix
fsp_feeds
goodboy 2021-09-13 08:46:23 -04:00 committed by GitHub
commit 07214f2044
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 9 deletions
piker
brokers
clearing

View File

@ -1333,7 +1333,10 @@ async def stream_quotes(
# last = time.time() # last = time.time()
def pack_position(pos: Position) -> dict[str, Any]: def pack_position(
pos: Position
) -> dict[str, Any]:
con = pos.contract con = pos.contract
if isinstance(con, Option): if isinstance(con, Option):
@ -1465,7 +1468,7 @@ async def trades_dialogue(
global _client_cache global _client_cache
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
all_positions = {} all_positions = []
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
for account, client in _accounts2clients.items(): for account, client in _accounts2clients.items():
@ -1480,9 +1483,8 @@ async def trades_dialogue(
for client in _client_cache.values(): for client in _client_cache.values():
for pos in client.positions(): for pos in client.positions():
msg = pack_position(pos) msg = pack_position(pos)
all_positions.setdefault( msg.account = accounts_def.inverse[msg.account]
msg.symbol, [] all_positions.append(msg.dict())
).append(msg.dict())
await ctx.started(all_positions) await ctx.started(all_positions)
@ -1638,6 +1640,7 @@ async def deliver_trade_events(
elif event_name == 'position': elif event_name == 'position':
msg = pack_position(item) msg = pack_position(item)
msg.account = accounts_def.inverse[msg.account]
if getattr(msg, 'reqid', 0) < -1: if getattr(msg, 'reqid', 0) < -1:

View File

@ -430,9 +430,17 @@ async def open_brokerd_trades_dialogue(
# normalizing them to EMS messages and relaying back to # normalizing them to EMS messages and relaying back to
# the piker order client set. # the piker order client set.
# locally cache and track positions per account.
pps = {}
for msg in positions:
pps.setdefault(
msg['symbol'],
{}
)[msg['account']] = msg
relay = TradesRelay( relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream, brokerd_dialogue=brokerd_trades_stream,
positions=positions, positions=pps,
consumers=1 consumers=1
) )

View File

@ -577,8 +577,8 @@ async def open_order_mode(
for msg in pp_msgs: for msg in pp_msgs:
log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') log.info(f'Loading pp for {symkey}:\n{pformat(msg)}')
account_value = msg.get('account') account_name = msg.get('account')
account_name = accounts.inverse.get(account_value) account_value = accounts.get(account_name)
if not account_name and account_value == 'paper': if not account_name and account_value == 'paper':
account_name = 'paper' account_name = 'paper'
@ -769,8 +769,9 @@ async def process_trades_and_update_ui(
# update order pane widgets # update order pane widgets
mode.pane.update_status_ui(tracker) mode.pane.update_status_ui(tracker)
# display pnl
mode.pane.display_pnl(tracker) mode.pane.display_pnl(tracker)
# short circuit to next msg to avoid # short circuit to next msg to avoid
# unnecessary msg content lookups # unnecessary msg content lookups
continue continue