WIP: refactor ib pp load init
parent
3e5da64571
commit
c59ec77d9c
|
@ -59,7 +59,7 @@ from piker.accounting import (
|
|||
open_pps,
|
||||
PpTable,
|
||||
)
|
||||
from piker.log import get_console_log
|
||||
from .._util import get_console_log
|
||||
from piker.clearing._messages import (
|
||||
Order,
|
||||
Status,
|
||||
|
@ -281,18 +281,21 @@ async def recv_trade_updates(
|
|||
async def update_ledger_from_api_trades(
|
||||
trade_entries: list[dict[str, Any]],
|
||||
client: Union[Client, MethodProxy],
|
||||
accounts_def_inv: bidict[str, str],
|
||||
|
||||
) -> tuple[
|
||||
dict[str, Transaction],
|
||||
dict[str, dict],
|
||||
]:
|
||||
|
||||
# XXX; ERRGGG..
|
||||
# pack in the "primary/listing exchange" value from a
|
||||
# contract lookup since it seems this isn't available by
|
||||
# default from the `.fills()` method endpoint...
|
||||
for entry in trade_entries:
|
||||
condict = entry['contract']
|
||||
# print(
|
||||
# f"{condict['symbol']}: GETTING CONTRACT INFO!\n"
|
||||
# )
|
||||
conid = condict['conId']
|
||||
pexch = condict['primaryExchange']
|
||||
|
||||
|
@ -310,9 +313,8 @@ async def update_ledger_from_api_trades(
|
|||
# pack in the ``Contract.secType``
|
||||
entry['asset_type'] = condict['secType']
|
||||
|
||||
conf = get_config()
|
||||
entries = api_trades_to_ledger_entries(
|
||||
conf['accounts'].inverse,
|
||||
accounts_def_inv,
|
||||
trade_entries,
|
||||
)
|
||||
# normalize recent session's trades to the `Transaction` type
|
||||
|
@ -340,9 +342,16 @@ async def update_and_audit_msgs(
|
|||
# retreive equivalent ib reported position message
|
||||
# for comparison/audit versus the piker equivalent
|
||||
# breakeven pp calcs.
|
||||
# if (
|
||||
# acctid == 'reg'
|
||||
# and bs_mktid == 36285627
|
||||
# ):
|
||||
# await tractor.breakpoint()
|
||||
|
||||
ibppmsg = cids2pps.get((acctid, bs_mktid))
|
||||
|
||||
if ibppmsg:
|
||||
symbol = ibppmsg.symbol
|
||||
msg = BrokerdPosition(
|
||||
broker='ib',
|
||||
|
||||
|
@ -353,7 +362,7 @@ async def update_and_audit_msgs(
|
|||
# table..
|
||||
account=ibppmsg.account,
|
||||
# XXX: the `.ib` is stripped..?
|
||||
symbol=ibppmsg.symbol,
|
||||
symbol=symbol,
|
||||
currency=ibppmsg.currency,
|
||||
size=p.size,
|
||||
avg_price=p.ppu,
|
||||
|
@ -432,75 +441,17 @@ async def update_and_audit_msgs(
|
|||
return msgs
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def trades_dialogue(
|
||||
async def aggr_open_orders(
|
||||
order_msgs: list[Status],
|
||||
client: Client,
|
||||
proxy: MethodProxy,
|
||||
accounts_def: bidict[str, str],
|
||||
|
||||
ctx: tractor.Context,
|
||||
loglevel: str = None,
|
||||
) -> None:
|
||||
'''
|
||||
Collect all open orders from client and fill in `order_msgs: list`.
|
||||
|
||||
) -> AsyncIterator[dict[str, Any]]:
|
||||
|
||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
|
||||
accounts_def = config.load_accounts(['ib'])
|
||||
|
||||
global _client_cache
|
||||
|
||||
# deliver positions to subscriber before anything else
|
||||
all_positions = []
|
||||
accounts = set()
|
||||
acctids = set()
|
||||
cids2pps: dict[str, BrokerdPosition] = {}
|
||||
|
||||
# TODO: this causes a massive tractor bug when you run marketstored
|
||||
# with ``--tsdb``... you should get:
|
||||
# - first error the assertion
|
||||
# - chart should get that error and die
|
||||
# - pikerd goes to debugger again from trio nursery multi-error
|
||||
# - hitting final control-c to kill daemon will lead to hang
|
||||
# assert 0
|
||||
|
||||
# TODO: just write on teardown?
|
||||
# we might also want to delegate a specific actor for
|
||||
# ledger writing / reading for speed?
|
||||
async with (
|
||||
open_client_proxies() as (proxies, aioclients),
|
||||
):
|
||||
# Open a trade ledgers stack for appending trade records over
|
||||
# multiple accounts.
|
||||
# TODO: we probably want to generalize this into a "ledgers" api..
|
||||
ledgers: dict[str, dict] = {}
|
||||
tables: dict[str, PpTable] = {}
|
||||
order_msgs: list[Status] = []
|
||||
with (
|
||||
ExitStack() as lstack,
|
||||
):
|
||||
# load ledgers and pps for all detected client-proxies
|
||||
for account, proxy in proxies.items():
|
||||
assert account in accounts_def
|
||||
accounts.add(account)
|
||||
acctid = account.strip('ib.')
|
||||
acctids.add(acctid)
|
||||
|
||||
# open ledger and pptable wrapper for each
|
||||
# detected account.
|
||||
ledger = ledgers[acctid] = lstack.enter_context(
|
||||
open_trade_ledger(
|
||||
'ib',
|
||||
acctid,
|
||||
)
|
||||
)
|
||||
table = tables[acctid] = lstack.enter_context(
|
||||
open_pps(
|
||||
'ib',
|
||||
acctid,
|
||||
write_on_exit=True,
|
||||
)
|
||||
)
|
||||
|
||||
for account, proxy in proxies.items():
|
||||
client = aioclients[account]
|
||||
'''
|
||||
trades: list[Trade] = client.ib.openTrades()
|
||||
for trade in trades:
|
||||
order = trade.order
|
||||
|
@ -545,44 +496,134 @@ async def trades_dialogue(
|
|||
)
|
||||
order_msgs.append(msg)
|
||||
|
||||
# process pp value reported from ib's system. we only use these
|
||||
# to cross-check sizing since average pricing on their end uses
|
||||
# the so called (bs) "FIFO" style which more or less results in
|
||||
# a price that's not useful for traders who want to not lose
|
||||
# money.. xb
|
||||
for pos in client.positions():
|
||||
return order_msgs
|
||||
|
||||
# collect all ib-pp reported positions so that we can be
|
||||
# sure know which positions to update from the ledger if
|
||||
# any are missing from the ``pps.toml``
|
||||
bs_mktid, msg = pack_position(pos)
|
||||
|
||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||
acctid = acctid.strip('ib.')
|
||||
cids2pps[(acctid, bs_mktid)] = msg
|
||||
assert msg.account in accounts, (
|
||||
f'Position for unknown account: {msg.account}')
|
||||
|
||||
ledger = ledgers[acctid]
|
||||
table = tables[acctid]
|
||||
|
||||
pp = table.pps.get(bs_mktid)
|
||||
if (
|
||||
not pp
|
||||
or pp.size != msg.size
|
||||
# proxy wrapper for starting trade event stream
|
||||
async def open_trade_event_stream(
|
||||
client: Client,
|
||||
task_status: TaskStatus[
|
||||
trio.abc.ReceiveChannel
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
trans = norm_trade_records(ledger)
|
||||
table.update_from_trans(trans)
|
||||
# each api client has a unique event stream
|
||||
async with tractor.to_asyncio.open_channel_from(
|
||||
recv_trade_updates,
|
||||
client=client,
|
||||
) as (first, trade_event_stream):
|
||||
|
||||
task_status.started(trade_event_stream)
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def trades_dialogue(
|
||||
|
||||
ctx: tractor.Context,
|
||||
loglevel: str = None,
|
||||
|
||||
) -> AsyncIterator[dict[str, Any]]:
|
||||
|
||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
|
||||
accounts_def = config.load_accounts(['ib'])
|
||||
|
||||
global _client_cache
|
||||
|
||||
# deliver positions to subscriber before anything else
|
||||
all_positions = []
|
||||
accounts = set()
|
||||
acctids = set()
|
||||
cids2pps: dict[str, BrokerdPosition] = {}
|
||||
|
||||
# TODO: this causes a massive tractor bug when you run marketstored
|
||||
# with ``--tsdb``... you should get:
|
||||
# - first error the assertion
|
||||
# - chart should get that error and die
|
||||
# - pikerd goes to debugger again from trio nursery multi-error
|
||||
# - hitting final control-c to kill daemon will lead to hang
|
||||
# assert 0
|
||||
|
||||
# TODO: just write on teardown?
|
||||
# we might also want to delegate a specific actor for
|
||||
# ledger writing / reading for speed?
|
||||
async with (
|
||||
open_client_proxies() as (
|
||||
proxies,
|
||||
aioclients,
|
||||
),
|
||||
):
|
||||
# Open a trade ledgers stack for appending trade records over
|
||||
# multiple accounts.
|
||||
# TODO: we probably want to generalize this into a "ledgers" api..
|
||||
ledgers: dict[str, dict] = {}
|
||||
tables: dict[str, PpTable] = {}
|
||||
order_msgs: list[Status] = []
|
||||
conf = get_config()
|
||||
accounts_def_inv = conf['accounts'].inverse
|
||||
|
||||
with (
|
||||
ExitStack() as lstack,
|
||||
):
|
||||
# load ledgers and pps for all detected client-proxies
|
||||
for account, proxy in proxies.items():
|
||||
assert account in accounts_def
|
||||
accounts.add(account)
|
||||
acctid = account.strip('ib.')
|
||||
acctids.add(acctid)
|
||||
|
||||
# open ledger and pptable wrapper for each
|
||||
# detected account.
|
||||
ledger = ledgers[acctid] = lstack.enter_context(
|
||||
open_trade_ledger(
|
||||
'ib',
|
||||
acctid,
|
||||
)
|
||||
)
|
||||
|
||||
# load all positions from `pps.toml`, cross check with
|
||||
# ib's positions data, and relay re-formatted pps as
|
||||
# msgs to the ems.
|
||||
# __2 cases__:
|
||||
# - new trades have taken place this session that we want to
|
||||
# always reprocess indempotently,
|
||||
# - no new trades yet but we want to reload and audit any
|
||||
# positions reported by ib's sys that may not yet be in
|
||||
# piker's ``pps.toml`` state-file.
|
||||
tables[acctid] = lstack.enter_context(
|
||||
open_pps(
|
||||
'ib',
|
||||
acctid,
|
||||
write_on_exit=True,
|
||||
)
|
||||
)
|
||||
|
||||
for account, proxy in proxies.items():
|
||||
client = aioclients[account]
|
||||
|
||||
# order_msgs is filled in by this helper
|
||||
await aggr_open_orders(
|
||||
order_msgs,
|
||||
client,
|
||||
proxy,
|
||||
accounts_def,
|
||||
)
|
||||
acctid: str = account.strip('ib.')
|
||||
ledger: dict = ledgers[acctid]
|
||||
table: PpTable = tables[acctid]
|
||||
|
||||
# update trades ledgers for all accounts from connected
|
||||
# api clients which report trades for **this session**.
|
||||
trades = await proxy.trades()
|
||||
if trades:
|
||||
(
|
||||
trans_by_acct,
|
||||
api_to_ledger_entries,
|
||||
) = await update_ledger_from_api_trades(
|
||||
trades,
|
||||
proxy,
|
||||
accounts_def_inv,
|
||||
)
|
||||
|
||||
# if new trades are detected from the API, prepare
|
||||
|
@ -591,9 +632,10 @@ async def trades_dialogue(
|
|||
trade_entries = api_to_ledger_entries.get(acctid)
|
||||
|
||||
if trade_entries:
|
||||
# write ledger with all new trades **AFTER**
|
||||
# we've updated the `pps.toml` from the
|
||||
# original ledger state! (i.e. this is
|
||||
# write ledger with all new trades
|
||||
# **AFTER** we've updated the
|
||||
# `pps.toml` from the original
|
||||
# ledger state! (i.e. this is
|
||||
# currently done on exit)
|
||||
ledger.update(trade_entries)
|
||||
|
||||
|
@ -601,6 +643,35 @@ async def trades_dialogue(
|
|||
if trans:
|
||||
table.update_from_trans(trans)
|
||||
|
||||
# process pp value reported from ib's system. we only
|
||||
# use these to cross-check sizing since average pricing
|
||||
# on their end uses the so called (bs) "FIFO" style
|
||||
# which more or less results in a price that's not
|
||||
# useful for traders who want to not lose money.. xb
|
||||
for pos in client.positions():
|
||||
# collect all ib-pp reported positions so that we can be
|
||||
# sure know which positions to update from the ledger if
|
||||
# any are missing from the ``pps.toml``
|
||||
bs_mktid, msg = pack_position(pos)
|
||||
|
||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||
acctid = acctid.strip('ib.')
|
||||
cids2pps[(acctid, bs_mktid)] = msg
|
||||
|
||||
assert msg.account in accounts, (
|
||||
f'Position for unknown account: {msg.account}')
|
||||
|
||||
ledger: dict = ledgers[acctid]
|
||||
table: PpTable = tables[acctid]
|
||||
pp: Position = table.pps.get(bs_mktid)
|
||||
|
||||
if (
|
||||
not pp
|
||||
or pp.size != msg.size
|
||||
):
|
||||
trans = norm_trade_records(ledger)
|
||||
table.update_from_trans(trans)
|
||||
|
||||
# XXX: not sure exactly why it wouldn't be in
|
||||
# the updated output (maybe this is a bug?) but
|
||||
# if you create a pos from TWS and then load it
|
||||
|
@ -630,17 +701,12 @@ async def trades_dialogue(
|
|||
f'piker: {pp.size}\n'
|
||||
)
|
||||
|
||||
# iterate all (newly) updated pps tables for every
|
||||
# client-account and build out position msgs to deliver to
|
||||
# EMS.
|
||||
for acctid, table in tables.items():
|
||||
active_pps, closed_pps = table.dump_active()
|
||||
|
||||
# load all positions from `pps.toml`, cross check with
|
||||
# ib's positions data, and relay re-formatted pps as
|
||||
# msgs to the ems.
|
||||
# __2 cases__:
|
||||
# - new trades have taken place this session that we want to
|
||||
# always reprocess indempotently,
|
||||
# - no new trades yet but we want to reload and audit any
|
||||
# positions reported by ib's sys that may not yet be in
|
||||
# piker's ``pps.toml`` state-file.
|
||||
for pps in [active_pps, closed_pps]:
|
||||
msgs = await update_and_audit_msgs(
|
||||
acctid,
|
||||
|
@ -661,22 +727,6 @@ async def trades_dialogue(
|
|||
tuple(name for name in accounts_def if name in accounts),
|
||||
))
|
||||
|
||||
# proxy wrapper for starting trade event stream
|
||||
async def open_trade_event_stream(
|
||||
client: Client,
|
||||
task_status: TaskStatus[
|
||||
trio.abc.ReceiveChannel
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
# each api client has a unique event stream
|
||||
async with tractor.to_asyncio.open_channel_from(
|
||||
recv_trade_updates,
|
||||
client=client,
|
||||
) as (first, trade_event_stream):
|
||||
|
||||
task_status.started(trade_event_stream)
|
||||
await trio.sleep_forever()
|
||||
|
||||
async with (
|
||||
ctx.open_stream() as ems_stream,
|
||||
trio.open_nursery() as n,
|
||||
|
@ -723,7 +773,7 @@ async def trades_dialogue(
|
|||
async def emit_pp_update(
|
||||
ems_stream: tractor.MsgStream,
|
||||
trade_entry: dict,
|
||||
accounts_def: bidict,
|
||||
accounts_def: bidict[str, str],
|
||||
proxies: dict,
|
||||
cids2pps: dict,
|
||||
|
||||
|
@ -733,16 +783,16 @@ async def emit_pp_update(
|
|||
) -> None:
|
||||
|
||||
# compute and relay incrementally updated piker pp
|
||||
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
|
||||
accounts_def_inv: bidict[str, str] = accounts_def.inverse
|
||||
acctid = accounts_def_inv[trade_entry['execution']['acctNumber']]
|
||||
proxy = proxies[acctid]
|
||||
|
||||
acctid = acctid.strip('ib.')
|
||||
(
|
||||
records_by_acct,
|
||||
api_to_ledger_entries,
|
||||
) = await update_ledger_from_api_trades(
|
||||
[trade_entry],
|
||||
proxy,
|
||||
accounts_def_inv,
|
||||
)
|
||||
trans = records_by_acct[acctid]
|
||||
r = list(trans.values())[0]
|
||||
|
@ -1244,7 +1294,7 @@ def parse_flex_dt(
|
|||
|
||||
|
||||
def api_trades_to_ledger_entries(
|
||||
accounts: bidict,
|
||||
accounts: bidict[str, str],
|
||||
|
||||
# TODO: maybe we should just be passing through the
|
||||
# ``ib_insync.order.Trade`` instance directly here
|
||||
|
|
Loading…
Reference in New Issue