Merge pull request #347 from pikers/pps_postmortem

Pps postmortem
ib_rt_pp_update_hotfix
goodboy 2022-07-04 15:28:27 -04:00 committed by GitHub
commit 479ad1bb15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 56 additions and 39 deletions

View File

@ -20,15 +20,10 @@ Interactive Brokers API backend.
Sub-modules within break into the core functionalities: Sub-modules within break into the core functionalities:
- ``broker.py`` part for orders / trading endpoints - ``broker.py`` part for orders / trading endpoints
- ``data.py`` for real-time data feed endpoints - ``feed.py`` for real-time data feed endpoints
- ``api.py`` for the core API machinery which is ``trio``-ized
- ``client.py`` for the core API machinery which is ``trio``-ized
wrapping around ``ib_insync``. wrapping around ``ib_insync``.
- ``report.py`` for the hackery to build manual pp calcs
to avoid ib's absolute bullshit FIFO style position
tracking..
""" """
from .api import ( from .api import (
get_client, get_client,

View File

@ -296,8 +296,7 @@ async def update_ledger_from_api_trades(
client: Union[Client, MethodProxy], client: Union[Client, MethodProxy],
) -> dict[str, pp.Transaction]: ) -> dict[str, pp.Transaction]:
# construct piker pps from trade ledger, underneath using
# LIFO style breakeven pricing calcs.
conf = get_config() conf = get_config()
# XXX; ERRGGG.. # XXX; ERRGGG..

View File

@ -289,7 +289,11 @@ class TradesRelay:
brokerd_dialogue: tractor.MsgStream brokerd_dialogue: tractor.MsgStream
# map of symbols to dicts of accounts to pp msgs # map of symbols to dicts of accounts to pp msgs
positions: dict[str, dict[str, BrokerdPosition]] positions: dict[
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
]
# allowed account names # allowed account names
accounts: tuple[str] accounts: tuple[str]
@ -461,18 +465,24 @@ async def open_brokerd_trades_dialogue(
# normalizing them to EMS messages and relaying back to # normalizing them to EMS messages and relaying back to
# the piker order client set. # the piker order client set.
# locally cache and track positions per account. # locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition`
# msgs.
pps = {} pps = {}
for msg in positions: for msg in positions:
log.info(f'loading pp: {msg}') log.info(f'loading pp: {msg}')
account = msg['account'] account = msg['account']
# TODO: better value error for this which
# dumps the account and message and states the
# mismatch..
assert account in accounts assert account in accounts
pps.setdefault( pps.setdefault(
f'{msg["symbol"]}.{broker}', (broker, account),
{} [],
)[account] = msg ).append(msg)
relay = TradesRelay( relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream, brokerd_dialogue=brokerd_trades_stream,
@ -578,11 +588,9 @@ async def translate_and_relay_brokerd_events(
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
f'{sym}.{broker}', (broker, sym),
{} []
).setdefault( ).append(pos_msg)
pos_msg['account'], {}
).update(pos_msg)
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
@ -635,8 +643,8 @@ async def translate_and_relay_brokerd_events(
# something is out of order, we don't have an oid for # something is out of order, we don't have an oid for
# this broker-side message. # this broker-side message.
log.error( log.error(
'Unknown oid:{oid} for msg:\n' f'Unknown oid: {oid} for msg:\n'
f'{pformat(brokerd_msg)}' f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?' 'Unable to relay message to client side!?'
) )
@ -1088,15 +1096,12 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_dialogue # .clone() brokerd_stream = relay.brokerd_dialogue # .clone()
# flatten out collected pps from brokerd for delivery
pp_msgs = {
fqsn: list(pps.values())
for fqsn, pps in relay.positions.items()
}
# signal to client that we're started and deliver # signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``. # all known pps and accounts for this ``brokerd``.
await ems_ctx.started((pp_msgs, list(relay.accounts))) await ems_ctx.started((
relay.positions,
list(relay.accounts),
))
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates # begin handling inbound order requests and updates

View File

@ -285,15 +285,14 @@ class Position(Struct):
all transactions before the last net zero size to avoid all transactions before the last net zero size to avoid
unecessary history irrelevant to the current pp state. unecessary history irrelevant to the current pp state.
''' '''
size: float = 0 size: float = self.size
clears_since_zero: deque[tuple(str, dict)] = deque() clears_since_zero: deque[tuple(str, dict)] = deque()
# scan for the last "net zero" position by # scan for the last "net zero" position by
# iterating clears in reverse. # iterating clears in reverse.
for tid, clear in reversed(self.clears.items()): for tid, clear in reversed(self.clears.items()):
size += clear['size'] size -= clear['size']
clears_since_zero.appendleft((tid, clear)) clears_since_zero.appendleft((tid, clear))
if size == 0: if size == 0:
@ -358,8 +357,6 @@ def update_pps(
# track clearing data # track clearing data
pp.update(r) pp.update(r)
assert len(set(pp.clears)) == len(pp.clears)
return pps return pps
@ -632,7 +629,12 @@ def load_pps_from_toml(
# index clears entries in "object" form by tid in a top # index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our # level dict instead of a list (as is presented in our
# ``pps.toml``). # ``pps.toml``).
clears = {} pp = pp_objs.get(bsuid)
if pp:
clears = pp.clears
else:
clears = {}
for clears_table in clears_list: for clears_table in clears_list:
tid = clears_table.pop('tid') tid = clears_table.pop('tid')
clears[tid] = clears_table clears[tid] = clears_table
@ -716,6 +718,11 @@ def update_pps_conf(
for bsuid in list(pp_objs): for bsuid in list(pp_objs):
pp = pp_objs[bsuid] pp = pp_objs[bsuid]
# XXX: debug hook for size mismatches
# if bsuid == 447767096:
# breakpoint()
pp.minimize_clears() pp.minimize_clears()
if ( if (

View File

@ -579,9 +579,9 @@ async def open_order_mode(
providers=symbol.brokers providers=symbol.brokers
) )
# XXX: ``brokerd`` delivers a set of account names that it allows # XXX: ``brokerd`` delivers a set of account names that it
# use of but the user also can define the accounts they'd like # allows use of but the user also can define the accounts they'd
# to use, in order, in their `brokers.toml` file. # like to use, in order, in their `brokers.toml` file.
accounts = {} accounts = {}
for name in brokerd_accounts: for name in brokerd_accounts:
# ensure name is in ``brokers.toml`` # ensure name is in ``brokers.toml``
@ -594,10 +594,21 @@ async def open_order_mode(
iter(accounts.keys()) iter(accounts.keys())
) if accounts else 'paper' ) if accounts else 'paper'
# Pack position messages by account, should only be one-to-one.
# NOTE: requires the backend exactly specifies # NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg. # the expected symbol key in its positions msg.
pp_msgs = position_msgs.get(symkey, ()) pps_by_account = {}
pps_by_account = {msg['account']: msg for msg in pp_msgs} for (broker, acctid), msgs in position_msgs.items():
for msg in msgs:
sym = msg['symbol']
if (
sym == symkey or
# mega-UGH, i think we need to fix the FQSN stuff sooner
# then later..
sym == symkey.removesuffix(f'.{broker}')
):
pps_by_account[acctid] = msg
# update pp trackers with data relayed from ``brokerd``. # update pp trackers with data relayed from ``brokerd``.
for account_name in accounts: for account_name in accounts: