WIP: refactor ib pp load init

pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-22 14:09:23 -04:00
parent 8d7b968c44
commit 116f7fd40f
1 changed files with 163 additions and 113 deletions

View File

@ -59,7 +59,7 @@ from piker.accounting import (
open_pps, open_pps,
PpTable, PpTable,
) )
from piker.log import get_console_log from .._util import get_console_log
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
Status, Status,
@ -281,18 +281,21 @@ async def recv_trade_updates(
async def update_ledger_from_api_trades( async def update_ledger_from_api_trades(
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
client: Union[Client, MethodProxy], client: Union[Client, MethodProxy],
accounts_def_inv: bidict[str, str],
) -> tuple[ ) -> tuple[
dict[str, Transaction], dict[str, Transaction],
dict[str, dict], dict[str, dict],
]: ]:
# XXX; ERRGGG.. # XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a # pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by # contract lookup since it seems this isn't available by
# default from the `.fills()` method endpoint... # default from the `.fills()` method endpoint...
for entry in trade_entries: for entry in trade_entries:
condict = entry['contract'] condict = entry['contract']
# print(
# f"{condict['symbol']}: GETTING CONTRACT INFO!\n"
# )
conid = condict['conId'] conid = condict['conId']
pexch = condict['primaryExchange'] pexch = condict['primaryExchange']
@ -310,9 +313,8 @@ async def update_ledger_from_api_trades(
# pack in the ``Contract.secType`` # pack in the ``Contract.secType``
entry['asset_type'] = condict['secType'] entry['asset_type'] = condict['secType']
conf = get_config()
entries = api_trades_to_ledger_entries( entries = api_trades_to_ledger_entries(
conf['accounts'].inverse, accounts_def_inv,
trade_entries, trade_entries,
) )
# normalize recent session's trades to the `Transaction` type # normalize recent session's trades to the `Transaction` type
@ -340,9 +342,16 @@ async def update_and_audit_msgs(
# retreive equivalent ib reported position message # retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent # for comparison/audit versus the piker equivalent
# breakeven pp calcs. # breakeven pp calcs.
# if (
# acctid == 'reg'
# and bs_mktid == 36285627
# ):
# await tractor.breakpoint()
ibppmsg = cids2pps.get((acctid, bs_mktid)) ibppmsg = cids2pps.get((acctid, bs_mktid))
if ibppmsg: if ibppmsg:
symbol = ibppmsg.symbol
msg = BrokerdPosition( msg = BrokerdPosition(
broker='ib', broker='ib',
@ -353,7 +362,7 @@ async def update_and_audit_msgs(
# table.. # table..
account=ibppmsg.account, account=ibppmsg.account,
# XXX: the `.ib` is stripped..? # XXX: the `.ib` is stripped..?
symbol=ibppmsg.symbol, symbol=symbol,
currency=ibppmsg.currency, currency=ibppmsg.currency,
size=p.size, size=p.size,
avg_price=p.ppu, avg_price=p.ppu,
@ -432,75 +441,17 @@ async def update_and_audit_msgs(
return msgs return msgs
@tractor.context async def aggr_open_orders(
async def trades_dialogue( order_msgs: list[Status],
client: Client,
proxy: MethodProxy,
accounts_def: bidict[str, str],
ctx: tractor.Context, ) -> None:
loglevel: str = None, '''
Collect all open orders from client and fill in `order_msgs: list`.
) -> AsyncIterator[dict[str, Any]]: '''
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
accounts_def = config.load_accounts(['ib'])
global _client_cache
# deliver positions to subscriber before anything else
all_positions = []
accounts = set()
acctids = set()
cids2pps: dict[str, BrokerdPosition] = {}
# TODO: this causes a massive tractor bug when you run marketstored
# with ``--tsdb``... you should get:
# - first error the assertion
# - chart should get that error and die
# - pikerd goes to debugger again from trio nursery multi-error
# - hitting final control-c to kill daemon will lead to hang
# assert 0
# TODO: just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
async with (
open_client_proxies() as (proxies, aioclients),
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {}
tables: dict[str, PpTable] = {}
order_msgs: list[Status] = []
with (
ExitStack() as lstack,
):
# load ledgers and pps for all detected client-proxies
for account, proxy in proxies.items():
assert account in accounts_def
accounts.add(account)
acctid = account.strip('ib.')
acctids.add(acctid)
# open ledger and pptable wrapper for each
# detected account.
ledger = ledgers[acctid] = lstack.enter_context(
open_trade_ledger(
'ib',
acctid,
)
)
table = tables[acctid] = lstack.enter_context(
open_pps(
'ib',
acctid,
write_on_exit=True,
)
)
for account, proxy in proxies.items():
client = aioclients[account]
trades: list[Trade] = client.ib.openTrades() trades: list[Trade] = client.ib.openTrades()
for trade in trades: for trade in trades:
order = trade.order order = trade.order
@ -545,44 +496,134 @@ async def trades_dialogue(
) )
order_msgs.append(msg) order_msgs.append(msg)
# process pp value reported from ib's system. we only use these return order_msgs
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
for pos in client.positions():
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
bs_mktid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account] # proxy wrapper for starting trade event stream
acctid = acctid.strip('ib.') async def open_trade_event_stream(
cids2pps[(acctid, bs_mktid)] = msg client: Client,
assert msg.account in accounts, ( task_status: TaskStatus[
f'Position for unknown account: {msg.account}') trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
ledger = ledgers[acctid]
table = tables[acctid]
pp = table.pps.get(bs_mktid)
if (
not pp
or pp.size != msg.size
): ):
trans = norm_trade_records(ledger) # each api client has a unique event stream
table.update_from_trans(trans) async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
accounts_def = config.load_accounts(['ib'])
global _client_cache
# deliver positions to subscriber before anything else
all_positions = []
accounts = set()
acctids = set()
cids2pps: dict[str, BrokerdPosition] = {}
# TODO: this causes a massive tractor bug when you run marketstored
# with ``--tsdb``... you should get:
# - first error the assertion
# - chart should get that error and die
# - pikerd goes to debugger again from trio nursery multi-error
# - hitting final control-c to kill daemon will lead to hang
# assert 0
# TODO: just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
async with (
open_client_proxies() as (
proxies,
aioclients,
),
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {}
tables: dict[str, PpTable] = {}
order_msgs: list[Status] = []
conf = get_config()
accounts_def_inv = conf['accounts'].inverse
with (
ExitStack() as lstack,
):
# load ledgers and pps for all detected client-proxies
for account, proxy in proxies.items():
assert account in accounts_def
accounts.add(account)
acctid = account.strip('ib.')
acctids.add(acctid)
# open ledger and pptable wrapper for each
# detected account.
ledger = ledgers[acctid] = lstack.enter_context(
open_trade_ledger(
'ib',
acctid,
)
)
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
tables[acctid] = lstack.enter_context(
open_pps(
'ib',
acctid,
write_on_exit=True,
)
)
for account, proxy in proxies.items():
client = aioclients[account]
# order_msgs is filled in by this helper
await aggr_open_orders(
order_msgs,
client,
proxy,
accounts_def,
)
acctid: str = account.strip('ib.')
ledger: dict = ledgers[acctid]
table: PpTable = tables[acctid]
# update trades ledgers for all accounts from connected # update trades ledgers for all accounts from connected
# api clients which report trades for **this session**. # api clients which report trades for **this session**.
trades = await proxy.trades() trades = await proxy.trades()
if trades:
( (
trans_by_acct, trans_by_acct,
api_to_ledger_entries, api_to_ledger_entries,
) = await update_ledger_from_api_trades( ) = await update_ledger_from_api_trades(
trades, trades,
proxy, proxy,
accounts_def_inv,
) )
# if new trades are detected from the API, prepare # if new trades are detected from the API, prepare
@ -591,9 +632,10 @@ async def trades_dialogue(
trade_entries = api_to_ledger_entries.get(acctid) trade_entries = api_to_ledger_entries.get(acctid)
if trade_entries: if trade_entries:
# write ledger with all new trades **AFTER** # write ledger with all new trades
# we've updated the `pps.toml` from the # **AFTER** we've updated the
# original ledger state! (i.e. this is # `pps.toml` from the original
# ledger state! (i.e. this is
# currently done on exit) # currently done on exit)
ledger.update(trade_entries) ledger.update(trade_entries)
@ -601,6 +643,35 @@ async def trades_dialogue(
if trans: if trans:
table.update_from_trans(trans) table.update_from_trans(trans)
# process pp value reported from ib's system. we only
# use these to cross-check sizing since average pricing
# on their end uses the so called (bs) "FIFO" style
# which more or less results in a price that's not
# useful for traders who want to not lose money.. xb
for pos in client.positions():
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
bs_mktid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, bs_mktid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
ledger: dict = ledgers[acctid]
table: PpTable = tables[acctid]
pp: Position = table.pps.get(bs_mktid)
if (
not pp
or pp.size != msg.size
):
trans = norm_trade_records(ledger)
table.update_from_trans(trans)
# XXX: not sure exactly why it wouldn't be in # XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but # the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it # if you create a pos from TWS and then load it
@ -630,17 +701,12 @@ async def trades_dialogue(
f'piker: {pp.size}\n' f'piker: {pp.size}\n'
) )
# iterate all (newly) updated pps tables for every
# client-account and build out position msgs to deliver to
# EMS.
for acctid, table in tables.items():
active_pps, closed_pps = table.dump_active() active_pps, closed_pps = table.dump_active()
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for pps in [active_pps, closed_pps]: for pps in [active_pps, closed_pps]:
msgs = await update_and_audit_msgs( msgs = await update_and_audit_msgs(
acctid, acctid,
@ -661,22 +727,6 @@ async def trades_dialogue(
tuple(name for name in accounts_def if name in accounts), tuple(name for name in accounts_def if name in accounts),
)) ))
# proxy wrapper for starting trade event stream
async def open_trade_event_stream(
client: Client,
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
@ -723,7 +773,7 @@ async def trades_dialogue(
async def emit_pp_update( async def emit_pp_update(
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
trade_entry: dict, trade_entry: dict,
accounts_def: bidict, accounts_def: bidict[str, str],
proxies: dict, proxies: dict,
cids2pps: dict, cids2pps: dict,
@ -733,16 +783,16 @@ async def emit_pp_update(
) -> None: ) -> None:
# compute and relay incrementally updated piker pp # compute and relay incrementally updated piker pp
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] accounts_def_inv: bidict[str, str] = accounts_def.inverse
acctid = accounts_def_inv[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid] proxy = proxies[acctid]
acctid = acctid.strip('ib.')
( (
records_by_acct, records_by_acct,
api_to_ledger_entries, api_to_ledger_entries,
) = await update_ledger_from_api_trades( ) = await update_ledger_from_api_trades(
[trade_entry], [trade_entry],
proxy, proxy,
accounts_def_inv,
) )
trans = records_by_acct[acctid] trans = records_by_acct[acctid]
r = list(trans.values())[0] r = list(trans.values())[0]
@ -1244,7 +1294,7 @@ def parse_flex_dt(
def api_trades_to_ledger_entries( def api_trades_to_ledger_entries(
accounts: bidict, accounts: bidict[str, str],
# TODO: maybe we should just be passing through the # TODO: maybe we should just be passing through the
# ``ib_insync.order.Trade`` instance directly here # ``ib_insync.order.Trade`` instance directly here