Fix multi-account positioning and order tracking..

This seems to have been broken in refactoring from commit 279c899de5
which was never tested against multiple accounts/clients.

The fix is 2 part:
- position tables are now correctly loaded ahead of time and used by
  account for each connected client in processing of ledgers and
  existing positions.
- a task for each API client is started (as implemented prior) so that
  we actually get status updates for every client used for submissions.

Further we add a bit of code using `bisect.insort()` to normalize
ledgers to a datetime sorted list records (though pretty sure the `dict`
transform ruins it?) in an effort to avoid issues with ledger
transaction processing with previously minimized `Position.clears`
tables, which should (but might not?) avoid incorporating clear events
prior to the last "net-zero" positioning state.
doin_the_splits
Tyler Goodlet 2022-08-17 13:05:02 -04:00
parent 73d2e7716f
commit 0fb07670d2
1 changed files with 34 additions and 22 deletions

View File

@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
""" """
from __future__ import annotations from __future__ import annotations
from bisect import insort
from contextlib import ExitStack from contextlib import ExitStack
from dataclasses import asdict from dataclasses import asdict
from functools import partial from functools import partial
@ -435,7 +436,6 @@ async def trades_dialogue(
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
all_positions = [] all_positions = []
accounts = set() accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
acctids = set() acctids = set()
cids2pps: dict[str, BrokerdPosition] = {} cids2pps: dict[str, BrokerdPosition] = {}
@ -486,7 +486,6 @@ async def trades_dialogue(
# the so called (bs) "FIFO" style which more or less results in # the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose # a price that's not useful for traders who want to not lose
# money.. xb # money.. xb
# for client in aioclients.values():
for pos in client.positions(): for pos in client.positions():
# collect all ib-pp reported positions so that we can be # collect all ib-pp reported positions so that we can be
@ -500,7 +499,9 @@ async def trades_dialogue(
assert msg.account in accounts, ( assert msg.account in accounts, (
f'Position for unknown account: {msg.account}') f'Position for unknown account: {msg.account}')
ledger = ledgers[acctid]
table = tables[acctid] table = tables[acctid]
pp = table.pps.get(bsuid) pp = table.pps.get(bsuid)
if ( if (
not pp not pp
@ -596,6 +597,7 @@ async def trades_dialogue(
# proxy wrapper for starting trade event stream # proxy wrapper for starting trade event stream
async def open_trade_event_stream( async def open_trade_event_stream(
client: Client,
task_status: TaskStatus[ task_status: TaskStatus[
trio.abc.ReceiveChannel trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
@ -613,18 +615,25 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
trade_event_stream = await n.start(open_trade_event_stream)
clients.append((client, trade_event_stream)) for client in aioclients.values():
trade_event_stream = await n.start(
open_trade_event_stream,
client,
)
# start order request handler **before** local trades # start order request handler **before** local trades
# event loop # event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def) n.start_soon(
handle_order_requests,
ems_stream,
accounts_def,
)
# allocate event relay tasks for each client connection # allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon( n.start_soon(
deliver_trade_events, deliver_trade_events,
stream, trade_event_stream,
ems_stream, ems_stream,
accounts_def, accounts_def,
cids2pps, cids2pps,
@ -968,7 +977,7 @@ def norm_trade_records(
ledger into our standard record format. ledger into our standard record format.
''' '''
records: dict[str, Transaction] = {} records: list[Transaction] = []
for tid, record in ledger.items(): for tid, record in ledger.items():
conid = record.get('conId') or record['conid'] conid = record.get('conId') or record['conid']
@ -1047,8 +1056,9 @@ def norm_trade_records(
# should already have entries if the pps are still open, in # should already have entries if the pps are still open, in
# which case, we can pull the fqsn from that table (see # which case, we can pull the fqsn from that table (see
# `trades_dialogue()` above). # `trades_dialogue()` above).
insort(
records[tid] = Transaction( records,
Transaction(
fqsn=fqsn, fqsn=fqsn,
tid=tid, tid=tid,
size=size, size=size,
@ -1057,9 +1067,11 @@ def norm_trade_records(
dt=dt, dt=dt,
expiry=expiry, expiry=expiry,
bsuid=conid, bsuid=conid,
),
key=lambda t: t.dt
) )
return records return {r.tid: r for r in records}
def trades_to_ledger_entries( def trades_to_ledger_entries(