Merge pull request #363 from pikers/ib_pps_upgrade

`ib` pps api layer upgrade
pydantic_zombie
goodboy 2022-07-27 14:50:28 -04:00 committed by GitHub
commit e2e66324cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 236 additions and 227 deletions

View File

@ -2,15 +2,19 @@
# start VNC server
x11vnc \
-ncache_cr \
-listen localhost \
-listen 127.0.0.1 \
-allow 127.0.0.1 \
-autoport 3003 \
-no6 \
-noipv6 \
-display :1 \
-bg \
-forever \
-shared \
-logappend /var/log/x11vnc.log \
-bg \
-noipv6 \
-autoport 3003 \
-ncache_cr \
-ncache \
# can't use this because of ``asyncvnc`` issue:
# https://github.com/barneygale/asyncvnc/issues/1
# -passwd 'ibcansmbz'

View File

@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
"""
from __future__ import annotations
from contextlib import ExitStack
from dataclasses import asdict
from functools import partial
from pprint import pformat
@ -35,8 +36,8 @@ from trio_typing import TaskStatus
import tractor
from ib_insync.contract import (
Contract,
Option,
Forex,
# Option,
# Forex,
)
from ib_insync.order import (
Trade,
@ -47,11 +48,17 @@ from ib_insync.objects import (
Execution,
CommissionReport,
)
from ib_insync.objects import Position
from ib_insync.objects import Position as IbPosition
import pendulum
from piker import config
from piker import pp
from piker.pp import (
Position,
Transaction,
open_trade_ledger,
open_pps,
PpTable,
)
from piker.log import get_console_log
from piker.clearing._messages import (
BrokerdOrder,
@ -65,8 +72,7 @@ from piker.clearing._messages import (
from piker.data._source import Symbol
from .api import (
_accounts2clients,
# _adhoc_futes_set,
_adhoc_symbol_map,
con2fqsn,
log,
get_config,
open_client_proxies,
@ -76,49 +82,12 @@ from .api import (
def pack_position(
pos: Position
pos: IbPosition
) -> dict[str, Any]:
con = pos.contract
if isinstance(con, Option):
# TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '')
else:
# TODO: lookup fqsn even for derivs.
symbol = con.symbol.lower()
# TODO: probably write a mofo exchange mapper routine since ib
# can't get it's shit together like, ever.
# try our best to figure out the exchange / venue
exch = (con.primaryExchange or con.exchange).lower()
if not exch:
if isinstance(con, Forex):
# bc apparently it's not in the contract obj?
exch = 'idealfx'
else:
# for wtv cucked reason some futes don't show their
# exchange (like CL.NYMEX) ...
entry = _adhoc_symbol_map.get(
con.symbol or con.localSymbol
)
if entry:
meta, kwargs = entry
cid = meta.get('conId')
if cid:
assert con.conId == meta['conId']
exch = meta['exchange']
assert exch, f'No clue:\n {con}'
fqsn = '.'.join((symbol, exch))
expiry = con.lastTradeDateOrContractMonth
if expiry:
fqsn += f'.{expiry}'
fqsn, calc_price = con2fqsn(con)
# TODO: options contracts into a sane format..
return (
@ -305,12 +274,10 @@ async def update_ledger_from_api_trades(
client: Union[Client, MethodProxy],
) -> tuple[
dict[str, pp.Transaction],
dict[str, Transaction],
dict[str, dict],
]:
conf = get_config()
# XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by
@ -331,39 +298,33 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch
conf = get_config()
entries = trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
# write recent session's trades to the user's (local) ledger file.
records: dict[str, pp.Transactions] = {}
# normalize recent session's trades to the `Transaction` type
trans_by_acct: dict[str, dict[str, Transaction]] = {}
for acctid, trades_by_id in entries.items():
# normalize to transaction form
records[acctid] = norm_trade_records(trades_by_id)
trans_by_acct[acctid] = norm_trade_records(trades_by_id)
return records, entries
return trans_by_acct, entries
async def update_and_audit_msgs(
acctid: str, # no `ib.` prefix is required!
pps: list[pp.Position],
pps: list[Position],
cids2pps: dict[tuple[str, int], BrokerdPosition],
validate: bool = False,
) -> list[BrokerdPosition]:
msgs: list[BrokerdPosition] = []
# pps: dict[int, pp.Position] = {}
for p in pps:
bsuid = p.bsuid
# build trade-session-actor local table
# of pps from unique symbol ids.
# pps[bsuid] = p
# retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent
# breakeven pp calcs.
@ -436,7 +397,8 @@ async def update_and_audit_msgs(
raise ValueError(
f'UNEXPECTED POSITION ib <-> piker ledger:\n'
f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n'
'MAYBE THEY LIQUIDATED YOU BRO!??!'
)
msgs.append(msg)
@ -462,6 +424,8 @@ async def trades_dialogue(
all_positions = []
accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
acctids = set()
cids2pps: dict[str, BrokerdPosition] = {}
# TODO: this causes a massive tractor bug when you run marketstored
# with ``--tsdb``... you should get:
@ -471,15 +435,129 @@ async def trades_dialogue(
# - hitting final control-c to kill daemon will lead to hang
# assert 0
# TODO: just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
async with (
trio.open_nursery() as nurse,
# trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
):
for account, proxy in proxies.items():
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {}
tables: dict[str, PpTable] = {}
with (
ExitStack() as lstack,
):
for account, proxy in proxies.items():
client = aioclients[account]
assert account in accounts_def
accounts.add(account)
acctid = account.strip('ib.')
acctids.add(acctid)
async def open_stream(
# open ledger and pptable wrapper for each
# detected account.
ledger = ledgers[acctid] = lstack.enter_context(
open_trade_ledger('ib', acctid)
)
table = tables[acctid] = lstack.enter_context(
open_pps('ib', acctid)
)
client = aioclients[account]
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
# for client in aioclients.values():
for pos in client.positions():
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
table = tables[acctid]
pp = table.pps.get(bsuid)
if (
not pp
or pp.size != msg.size
):
trans = norm_trade_records(ledger)
updated = table.update_from_trans(trans)
pp = updated[bsuid]
# update trades ledgers for all accounts from connected
# api clients which report trades for **this session**.
trades = await proxy.trades()
(
trans_by_acct,
api_to_ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
# if new trades are detected from the API, prepare
# them for the ledger file and update the pptable.
if api_to_ledger_entries:
trade_entries = api_to_ledger_entries[acctid]
# write ledger with all new trades **AFTER**
# we've updated the `pps.toml` from the
# original ledger state! (i.e. this is
# currently done on exit)
ledger.update(trade_entries)
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
updated = table.update_from_trans(trans)
assert msg.size == pp.size, 'WTF'
active_pps, closed_pps = table.dump_active()
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for pps in [active_pps, closed_pps]:
msgs = await update_and_audit_msgs(
acctid,
pps.values(),
cids2pps,
validate=True,
)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
'Positions reported by ib but not found in `pps.toml`!?\n'
f'{pformat(cids2pps)}'
)
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
# proxy wrapper for starting trade event stream
async def open_trade_event_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
@ -493,130 +571,36 @@ async def trades_dialogue(
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_stream)
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
trade_event_stream = await n.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
clients.append((client, trade_event_stream))
# start order request handler **before** local trades
# event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def)
assert account in accounts_def
accounts.add(account)
# allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon(
deliver_trade_events,
stream,
ems_stream,
accounts_def,
cids2pps,
proxies,
cids2pps: dict[str, BrokerdPosition] = {}
update_records: dict[str, bidict] = {}
ledgers,
tables,
)
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
for client in aioclients.values():
for pos in client.positions():
# TODO: make this thread-async!
table.write_config()
cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, cid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
update_records.setdefault(acctid, bidict())[cid] = msg.symbol
# update trades ledgers for all accounts from
# connected api clients which report trades for **this session**.
new_trades = {}
for account, proxy in proxies.items():
trades = await proxy.trades()
(
records_by_acct,
ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
new_trades.update(records_by_acct)
for acctid, trans in new_trades.items():
for t in trans:
bsuid = t.bsuid
if bsuid in update_records:
assert update_records[bsuid] == t.fqsn
else:
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
# load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for acctid, to_update in update_records.items():
trans = new_trades.get(acctid)
active, closed = pp.update_pps_conf(
'ib',
acctid,
trade_records=trans,
ledger_reload=to_update,
)
for pps in [active, closed]:
msgs = await update_and_audit_msgs(
acctid,
pps.values(),
cids2pps,
validate=True,
)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
'Positions reported by ib but not found in `pps.toml`!?\n'
f'{pformat(cids2pps)}'
)
# log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
# TODO: maybe just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
# write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def)
# allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon(
deliver_trade_events,
stream,
ems_stream,
accounts_def,
cids2pps,
proxies,
)
# block until cancelled
await trio.sleep_forever()
# block until cancelled
await trio.sleep_forever()
async def emit_pp_update(
@ -626,44 +610,43 @@ async def emit_pp_update(
proxies: dict,
cids2pps: dict,
ledgers,
tables,
) -> None:
# compute and relay incrementally updated piker pp
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid]
acctname = acctid.strip('ib.')
records_by_acct, ledger_entries = await update_ledger_from_api_trades(
acctid = acctid.strip('ib.')
(
records_by_acct,
api_to_ledger_entries,
) = await update_ledger_from_api_trades(
[trade_entry],
proxy,
)
records = records_by_acct[acctname]
r = records[0]
trans = records_by_acct[acctid]
r = list(trans.values())[0]
# update and load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as msgs to the
# ems. we report both the open and closed updates in one map since
# for incremental update we may have just fully closed a pp and need
# to relay that msg as well!
active, closed = pp.update_pps_conf(
'ib',
acctname,
trade_records=records,
ledger_reload={r.bsuid: r.fqsn},
)
table = tables[acctid]
table.update_from_trans(trans)
active, closed = table.dump_active()
# NOTE: write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
# NOTE: update ledger with all new trades
for acctid, trades_by_id in api_to_ledger_entries.items():
ledger = ledgers[acctid]
ledger.update(trades_by_id)
# generate pp msgs and cross check with ib's positions data, relay
# re-formatted pps as msgs to the ems.
for pos in filter(
bool,
[active.get(r.bsuid), closed.get(r.bsuid)]
):
msgs = await update_and_audit_msgs(
acctname,
acctid,
[pos],
cids2pps,
@ -672,6 +655,7 @@ async def emit_pp_update(
)
if msgs:
msg = msgs[0]
log.info('Emitting pp msg: {msg}')
break
await ems_stream.send(msg)
@ -685,6 +669,9 @@ async def deliver_trade_events(
cids2pps: dict[tuple[str, str], BrokerdPosition],
proxies: dict[str, MethodProxy],
ledgers,
tables,
) -> None:
'''
Format and relay all trade events for a given client to emsd.
@ -834,6 +821,8 @@ async def deliver_trade_events(
accounts_def,
proxies,
cids2pps,
ledgers,
tables,
)
case 'cost':
@ -866,6 +855,8 @@ async def deliver_trade_events(
accounts_def,
proxies,
cids2pps,
ledgers,
tables,
)
case 'error':
@ -886,6 +877,7 @@ async def deliver_trade_events(
case 'position':
cid, msg = pack_position(item)
log.info(f'New IB position msg: {msg}')
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg)
@ -916,14 +908,13 @@ async def deliver_trade_events(
def norm_trade_records(
ledger: dict[str, Any],
) -> list[pp.Transaction]:
) -> list[Transaction]:
'''
Normalize a flex report or API retrieved executions
ledger into our standard record format.
'''
records: list[pp.Transaction] = []
records: dict[str, Transaction] = {}
for tid, record in ledger.items():
conid = record.get('conId') or record['conid']
@ -1001,7 +992,7 @@ def norm_trade_records(
# which case, we can pull the fqsn from that table (see
# `trades_dialogue()` above).
records.append(pp.Transaction(
records[tid] = Transaction(
fqsn=fqsn,
tid=tid,
size=size,
@ -1010,7 +1001,7 @@ def norm_trade_records(
dt=dt,
expiry=expiry,
bsuid=conid,
))
)
return records
@ -1139,8 +1130,7 @@ def load_flex_trades(
trade_entries = report.extract('Trade')
ln = len(trade_entries)
# log.info(f'Loaded {ln} trades from flex query')
print(f'Loaded {ln} trades from flex query')
log.info(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names
@ -1149,14 +1139,20 @@ def load_flex_trades(
source_type='flex',
)
ledgers = {}
for acctid, trades_by_id in trades_by_account.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict)
log.info(
'New trades detected\n'
f'{pformat(tid_delta)}'
)
if tid_delta:
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
ledgers[acctid] = ledger
return ledgers
return ledger_dict
if __name__ == '__main__':

View File

@ -301,7 +301,13 @@ async def get_bars(
else:
log.warning('Sending CONNECTION RESET')
await data_reset_hack(reset_type='connection')
res = await data_reset_hack(reset_type='connection')
if not res:
log.warning(
'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app'
)
# break
with trio.move_on_after(timeout) as cs:
for name, ev in [
@ -842,7 +848,10 @@ async def data_reset_hack(
client.mouse.click()
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
await tractor.to_asyncio.run_task(vnc_click_hack)
try:
await tractor.to_asyncio.run_task(vnc_click_hack)
except OSError:
return False
# we don't really need the ``xdotool`` approach any more B)
return True