Port `ib` broker machinery to new ctx mngr pp api

This drops the use of `pp.update_pps_conf()` (and friends) and instead
moves to using the context style `open_trade_ledger()` and `open_pps()`
managers for faster pp msg gen due to delayed file writing (which was
the main source update latency).

In order to make this work with potentially multiple accounts this also
uses an exit stack which loads each ledger / `pps.toml` into an account
id mapped `dict`; a POC for likely how we should implement some higher
level position manager api.
ib_pps_upgrade
Tyler Goodlet 2022-07-21 10:06:18 -04:00
parent 666587991a
commit 30ff793a22
1 changed files with 195 additions and 199 deletions

View File

@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
"""
from __future__ import annotations
from contextlib import ExitStack
from dataclasses import asdict
from functools import partial
from pprint import pformat
@ -35,8 +36,8 @@ from trio_typing import TaskStatus
import tractor
from ib_insync.contract import (
Contract,
Option,
Forex,
# Option,
# Forex,
)
from ib_insync.order import (
Trade,
@ -47,11 +48,18 @@ from ib_insync.objects import (
Execution,
CommissionReport,
)
from ib_insync.objects import Position
from ib_insync.objects import Position as IbPosition
import pendulum
from piker import config
from piker import pp
from piker.pp import (
# update_pps_conf,
Position,
Transaction,
open_trade_ledger,
open_pps,
PpTable,
)
from piker.log import get_console_log
from piker.clearing._messages import (
BrokerdOrder,
@ -66,7 +74,8 @@ from piker.data._source import Symbol
from .api import (
_accounts2clients,
# _adhoc_futes_set,
_adhoc_symbol_map,
con2fqsn,
# _adhoc_symbol_map,
log,
get_config,
open_client_proxies,
@ -76,49 +85,12 @@ from .api import (
def pack_position(
pos: Position
pos: IbPosition
) -> dict[str, Any]:
con = pos.contract
if isinstance(con, Option):
# TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '')
else:
# TODO: lookup fqsn even for derivs.
symbol = con.symbol.lower()
# TODO: probably write a mofo exchange mapper routine since ib
# can't get it's shit together like, ever.
# try our best to figure out the exchange / venue
exch = (con.primaryExchange or con.exchange).lower()
if not exch:
if isinstance(con, Forex):
# bc apparently it's not in the contract obj?
exch = 'idealfx'
else:
# for wtv cucked reason some futes don't show their
# exchange (like CL.NYMEX) ...
entry = _adhoc_symbol_map.get(
con.symbol or con.localSymbol
)
if entry:
meta, kwargs = entry
cid = meta.get('conId')
if cid:
assert con.conId == meta['conId']
exch = meta['exchange']
assert exch, f'No clue:\n {con}'
fqsn = '.'.join((symbol, exch))
expiry = con.lastTradeDateOrContractMonth
if expiry:
fqsn += f'.{expiry}'
fqsn, calc_price = con2fqsn(con)
# TODO: options contracts into a sane format..
return (
@ -305,12 +277,10 @@ async def update_ledger_from_api_trades(
client: Union[Client, MethodProxy],
) -> tuple[
dict[str, pp.Transaction],
dict[str, Transaction],
dict[str, dict],
]:
conf = get_config()
# XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by
@ -331,39 +301,34 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch
conf = get_config()
entries = trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
# write recent session's trades to the user's (local) ledger file.
records: dict[str, pp.Transactions] = {}
# normalize recent session's trades to the `Transaction` type
trans_by_acct: dict[str, dict[str, Transaction]] = {}
for acctid, trades_by_id in entries.items():
# normalize to transaction form
records[acctid] = norm_trade_records(trades_by_id)
trans_by_acct[acctid] = norm_trade_records(trades_by_id)
return records, entries
return trans_by_acct, entries
async def update_and_audit_msgs(
acctid: str, # no `ib.` prefix is required!
pps: list[pp.Position],
pps: list[Position],
cids2pps: dict[tuple[str, int], BrokerdPosition],
validate: bool = False,
) -> list[BrokerdPosition]:
msgs: list[BrokerdPosition] = []
# pps: dict[int, pp.Position] = {}
for p in pps:
bsuid = p.bsuid
# build trade-session-actor local table
# of pps from unique symbol ids.
# pps[bsuid] = p
# retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent
# breakeven pp calcs.
@ -479,7 +444,7 @@ async def trades_dialogue(
client = aioclients[account]
async def open_stream(
async def open_trade_event_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
@ -493,7 +458,7 @@ async def trades_dialogue(
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_stream)
trade_event_stream = await nurse.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
@ -501,122 +466,147 @@ async def trades_dialogue(
accounts.add(account)
cids2pps: dict[str, BrokerdPosition] = {}
update_records: dict[str, bidict] = {}
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
for client in aioclients.values():
for pos in client.positions():
cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, cid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
update_records.setdefault(acctid, bidict())[cid] = msg.symbol
# update trades ledgers for all accounts from
# connected api clients which report trades for **this session**.
new_trades = {}
for account, proxy in proxies.items():
trades = await proxy.trades()
(
records_by_acct,
ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
new_trades.update(records_by_acct)
for acctid, trans in new_trades.items():
for t in trans:
bsuid = t.bsuid
if bsuid in update_records:
assert update_records[bsuid] == t.fqsn
else:
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
# load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for acctid, to_update in update_records.items():
trans = new_trades.get(acctid)
active, closed = pp.update_pps_conf(
'ib',
acctid,
trade_records=trans,
ledger_reload=to_update,
)
for pps in [active, closed]:
msgs = await update_and_audit_msgs(
acctid,
pps.values(),
cids2pps,
validate=True,
)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
'Positions reported by ib but not found in `pps.toml`!?\n'
f'{pformat(cids2pps)}'
)
# log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
# TODO: maybe just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
# write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {}
tables: dict[str, PpTable] = {}
with (
ExitStack() as lstack,
):
# start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def)
# allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon(
deliver_trade_events,
stream,
ems_stream,
accounts_def,
cids2pps,
proxies,
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
acctids = set()
for client in aioclients.values():
for pos in client.positions():
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
acctids.add(acctid)
cids2pps[(acctid, cid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
for acctid in acctids:
# open ledger and pptable wrapper for each
# detected account.
ledgers[acctid] = lstack.enter_context(
open_trade_ledger('ib', acctid)
)
tables[acctid] = lstack.enter_context(
open_pps('ib', acctid)
)
# block until cancelled
await trio.sleep_forever()
# update trades ledgers for all accounts from
# connected api clients which report trades for **this session**.
for account, proxy in proxies.items():
trades = await proxy.trades()
(
trans_by_acct,
ready_for_ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
acctid = acctid.strip('ib.')
ledgers[acctid].update(ready_for_ledger_entries)
# WTF, yet again this key error is getting ignored?!?!
# tables[acctid].update_from_trans(trans_by_acct[account])
# this causes a hang..
# - marketstored tries to kill container, cant,
# - ctrl-c makes pikerd get stuck...
# assert 0
trans = trans_by_acct.get(acctid)
if trans:
tables[acctid].update_from_trans(trans)
# load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for acctid in acctids:
table = tables[acctid]
_, closed_pps = table.dump_active('ib')
active_pps = table.pps
for pps in [active_pps, closed_pps]:
msgs = await update_and_audit_msgs(
acctid,
pps.values(),
cids2pps,
validate=True,
)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
'Positions reported by ib but not found in `pps.toml`!?\n'
f'{pformat(cids2pps)}'
)
# log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
# TODO: maybe just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
# write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ready_for_ledger_entries.items():
ledgers[acctid].update(trades_by_id)
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# start order request handler **before** local trades
# event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def)
# allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon(
deliver_trade_events,
stream,
ems_stream,
accounts_def,
cids2pps,
proxies,
ledgers,
tables,
)
# block until cancelled
await trio.sleep_forever()
async def emit_pp_update(
@ -626,44 +616,44 @@ async def emit_pp_update(
proxies: dict,
cids2pps: dict,
ledgers,
tables,
) -> None:
# compute and relay incrementally updated piker pp
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid]
acctname = acctid.strip('ib.')
records_by_acct, ledger_entries = await update_ledger_from_api_trades(
acctid = acctid.strip('ib.')
(
records_by_acct,
ready_for_ledger_entries,
) = await update_ledger_from_api_trades(
[trade_entry],
proxy,
)
records = records_by_acct[acctname]
r = records[0]
trans = records_by_acct[acctid]
r = list(trans.values())[0]
# update and load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as msgs to the
# ems. we report both the open and closed updates in one map since
# for incremental update we may have just fully closed a pp and need
# to relay that msg as well!
active, closed = pp.update_pps_conf(
'ib',
acctname,
trade_records=records,
ledger_reload={r.bsuid: r.fqsn},
)
table = tables[acctid]
table.update_from_trans(trans)
_, closed = table.dump_active('ib')
active = table.pps
# NOTE: write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
# NOTE: update ledger with all new trades
for acctid, trades_by_id in ready_for_ledger_entries.items():
ledger = ledgers[acctid]
ledger.update(trades_by_id)
# generate pp msgs and cross check with ib's positions data, relay
# re-formatted pps as msgs to the ems.
for pos in filter(
bool,
[active.get(r.bsuid), closed.get(r.bsuid)]
):
msgs = await update_and_audit_msgs(
acctname,
acctid,
[pos],
cids2pps,
@ -685,6 +675,9 @@ async def deliver_trade_events(
cids2pps: dict[tuple[str, str], BrokerdPosition],
proxies: dict[str, MethodProxy],
ledgers,
tables,
) -> None:
'''
Format and relay all trade events for a given client to emsd.
@ -834,6 +827,8 @@ async def deliver_trade_events(
accounts_def,
proxies,
cids2pps,
ledgers,
tables,
)
case 'cost':
@ -866,6 +861,8 @@ async def deliver_trade_events(
accounts_def,
proxies,
cids2pps,
ledgers,
tables,
)
case 'error':
@ -916,14 +913,13 @@ async def deliver_trade_events(
def norm_trade_records(
ledger: dict[str, Any],
) -> list[pp.Transaction]:
) -> list[Transaction]:
'''
Normalize a flex report or API retrieved executions
ledger into our standard record format.
'''
records: list[pp.Transaction] = []
records: dict[str, Transaction] = {}
for tid, record in ledger.items():
conid = record.get('conId') or record['conid']
@ -1001,7 +997,7 @@ def norm_trade_records(
# which case, we can pull the fqsn from that table (see
# `trades_dialogue()` above).
records.append(pp.Transaction(
records[tid] = Transaction(
fqsn=fqsn,
tid=tid,
size=size,
@ -1010,7 +1006,7 @@ def norm_trade_records(
dt=dt,
expiry=expiry,
bsuid=conid,
))
)
return records
@ -1151,7 +1147,7 @@ def load_flex_trades(
ledgers = {}
for acctid, trades_by_id in trades_by_account.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
with open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
ledgers[acctid] = ledger