Port position calcs to new ctx mngr apis and drop multi-loop madness

ib_native_data_hack
Tyler Goodlet 2022-07-22 15:14:24 -04:00
parent ad458e3fcd
commit e1d57e8a8a
1 changed files with 98 additions and 93 deletions

View File

@ -53,7 +53,6 @@ import pendulum
from piker import config from piker import config
from piker.pp import ( from piker.pp import (
# update_pps_conf,
Position, Position,
Transaction, Transaction,
open_trade_ledger, open_trade_ledger,
@ -426,6 +425,8 @@ async def trades_dialogue(
all_positions = [] all_positions = []
accounts = set() accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
acctids = set()
cids2pps: dict[str, BrokerdPosition] = {}
# TODO: this causes a massive tractor bug when you run marketstored # TODO: this causes a massive tractor bug when you run marketstored
# with ``--tsdb``... you should get: # with ``--tsdb``... you should get:
@ -435,37 +436,13 @@ async def trades_dialogue(
# - hitting final control-c to kill daemon will lead to hang # - hitting final control-c to kill daemon will lead to hang
# assert 0 # assert 0
# TODO: just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
async with ( async with (
trio.open_nursery() as nurse, trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients), open_client_proxies() as (proxies, aioclients),
): ):
for account, proxy in proxies.items():
client = aioclients[account]
async def open_trade_event_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
assert account in accounts_def
accounts.add(account)
cids2pps: dict[str, BrokerdPosition] = {}
# Open a trade ledgers stack for appending trade records over # Open a trade ledgers stack for appending trade records over
# multiple accounts. # multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api.. # TODO: we probably want to generalize this into a "ledgers" api..
@ -474,73 +451,106 @@ async def trades_dialogue(
with ( with (
ExitStack() as lstack, ExitStack() as lstack,
): ):
for account, proxy in proxies.items():
# process pp value reported from ib's system. we only use these acctid = account.strip('ib.')
# to cross-check sizing since average pricing on their end uses acctids.add(acctid)
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
acctids = set()
for client in aioclients.values():
for pos in client.positions():
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
acctids.add(acctid)
cids2pps[(acctid, cid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
for acctid in acctids:
# open ledger and pptable wrapper for each # open ledger and pptable wrapper for each
# detected account. # detected account.
ledgers[acctid] = lstack.enter_context( ledger = ledgers[acctid] = lstack.enter_context(
open_trade_ledger('ib', acctid) open_trade_ledger('ib', acctid)
) )
tables[acctid] = lstack.enter_context( table = tables[acctid] = lstack.enter_context(
open_pps('ib', acctid) open_pps('ib', acctid)
) )
# update trades ledgers for all accounts from client = aioclients[account]
# connected api clients which report trades for **this session**.
for account, proxy in proxies.items():
async def open_trade_event_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
assert account in accounts_def
accounts.add(account)
# update trades ledgers for all accounts from connected
# api clients which report trades for **this session**.
trades = await proxy.trades() trades = await proxy.trades()
( (
trans_by_acct, trans_by_acct,
ready_for_ledger_entries, api_ready_for_ledger_entries,
) = await update_ledger_from_api_trades( ) = await update_ledger_from_api_trades(
trades, trades,
proxy, proxy,
) )
acctid = account.strip('ib.') # if new trades are detected from the API, prepare
ledger = ledgers[acctid] # them for the ledger file and update the pptable.
ledger.update(ready_for_ledger_entries[acctid]) if api_ready_for_ledger_entries:
trade_entries = api_ready_for_ledger_entries[acctid]
ledger.update(trade_entries)
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
trans = trans_by_acct.get(acctid) # process pp value reported from ib's system. we only use these
if trans: # to cross-check sizing since average pricing on their end uses
tables[acctid].update_from_trans(trans) # the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
# for client in aioclients.values():
for pos in client.positions():
# load all positions from `pps.toml`, cross check with ib's # collect all ib-pp reported positions so that we can be
# positions data, and relay re-formatted pps as msgs to the ems. # sure know which positions to update from the ledger if
# __2 cases__: # any are missing from the ``pps.toml``
# - new trades have taken place this session that we want to bsuid, msg = pack_position(pos)
# always reprocess indempotently, acctid = msg.account = accounts_def.inverse[msg.account]
# - no new trades yet but we want to reload and audit any acctid = acctid.strip('ib.')
# positions reported by ib's sys that may not yet be in cids2pps[(acctid, bsuid)] = msg
# piker's ``pps.toml`` state-file. assert msg.account in accounts, (
for acctid in acctids: f'Position for unknown account: {msg.account}')
table = tables[acctid]
pp = table.pps.get(bsuid)
if (
not pp
or pp.size != msg.size
):
trans = norm_trade_records(ledger)
updated = table.update_from_trans(trans)
pp = updated[bsuid]
assert msg.size == pp.size, 'WTF'
# TODO: figure out why these don't match?
# assert pp.calc_be_price() == pp.be_price
table = tables[acctid]
_, closed_pps = table.dump_active('ib') _, closed_pps = table.dump_active('ib')
active_pps = table.pps active_pps = table.pps
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for pps in [active_pps, closed_pps]: for pps in [active_pps, closed_pps]:
msgs = await update_and_audit_msgs( msgs = await update_and_audit_msgs(
acctid, acctid,
@ -556,24 +566,14 @@ async def trades_dialogue(
f'{pformat(cids2pps)}' f'{pformat(cids2pps)}'
) )
# log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started(( await ctx.started((
all_positions, all_positions,
tuple(name for name in accounts_def if name in accounts), tuple(name for name in accounts_def if name in accounts),
)) ))
# TODO: maybe just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
# write ledger with all new trades **AFTER** we've updated the # write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state! # `pps.toml` from the original ledger state!
for acctid, trades_by_id in ready_for_ledger_entries.items(): for acctid, trades_by_id in api_ready_for_ledger_entries.items():
ledgers[acctid].update(trades_by_id) ledgers[acctid].update(trades_by_id)
async with ( async with (
@ -621,7 +621,7 @@ async def emit_pp_update(
acctid = acctid.strip('ib.') acctid = acctid.strip('ib.')
( (
records_by_acct, records_by_acct,
ready_for_ledger_entries, api_ready_for_ledger_entries,
) = await update_ledger_from_api_trades( ) = await update_ledger_from_api_trades(
[trade_entry], [trade_entry],
proxy, proxy,
@ -635,7 +635,7 @@ async def emit_pp_update(
active = table.pps active = table.pps
# NOTE: update ledger with all new trades # NOTE: update ledger with all new trades
for acctid, trades_by_id in ready_for_ledger_entries.items(): for acctid, trades_by_id in api_ready_for_ledger_entries.items():
ledger = ledgers[acctid] ledger = ledgers[acctid]
ledger.update(trades_by_id) ledger.update(trades_by_id)
@ -1128,8 +1128,7 @@ def load_flex_trades(
trade_entries = report.extract('Trade') trade_entries = report.extract('Trade')
ln = len(trade_entries) ln = len(trade_entries)
# log.info(f'Loaded {ln} trades from flex query') log.info(f'Loaded {ln} trades from flex query')
print(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries( trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names # get reverse map to user account names
@ -1138,14 +1137,20 @@ def load_flex_trades(
source_type='flex', source_type='flex',
) )
ledgers = {} for acctid in trades_by_account:
for acctid, trades_by_id in trades_by_account.items(): trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger: with open_trade_ledger('ib', acctid) as ledger_dict:
ledger.update(trades_by_id) tid_delta = set(trades_by_id) - set(ledger_dict)
log.info(
'New trades detected\n'
f'{pformat(tid_delta)}'
)
if tid_delta:
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
ledgers[acctid] = ledger return ledger_dict
return ledgers
if __name__ == '__main__': if __name__ == '__main__':