Compare commits

..

No commits in common. "e1d57e8a8a9dfd1143e601efeddd08a460d03e5f" and "a3d46f713e9d15115af80b021354d85569d8e9c4" have entirely different histories.

2 changed files with 247 additions and 266 deletions

View File

@ -18,7 +18,6 @@ Order and trades endpoints for use with ``piker``'s EMS.
"""
from __future__ import annotations
from contextlib import ExitStack
from dataclasses import asdict
from functools import partial
from pprint import pformat
@ -36,8 +35,8 @@ from trio_typing import TaskStatus
import tractor
from ib_insync.contract import (
Contract,
# Option,
# Forex,
Option,
Forex,
)
from ib_insync.order import (
Trade,
@ -48,17 +47,11 @@ from ib_insync.objects import (
Execution,
CommissionReport,
)
from ib_insync.objects import Position as IbPosition
from ib_insync.objects import Position
import pendulum
from piker import config
from piker.pp import (
Position,
Transaction,
open_trade_ledger,
open_pps,
PpTable,
)
from piker import pp
from piker.log import get_console_log
from piker.clearing._messages import (
BrokerdOrder,
@ -73,8 +66,7 @@ from piker.data._source import Symbol
from .api import (
_accounts2clients,
# _adhoc_futes_set,
con2fqsn,
# _adhoc_symbol_map,
_adhoc_symbol_map,
log,
get_config,
open_client_proxies,
@ -84,12 +76,49 @@ from .api import (
def pack_position(
pos: IbPosition
pos: Position
) -> dict[str, Any]:
con = pos.contract
fqsn, calc_price = con2fqsn(con)
if isinstance(con, Option):
# TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '')
else:
# TODO: lookup fqsn even for derivs.
symbol = con.symbol.lower()
# TODO: probably write a mofo exchange mapper routine since ib
# can't get it's shit together like, ever.
# try our best to figure out the exchange / venue
exch = (con.primaryExchange or con.exchange).lower()
if not exch:
if isinstance(con, Forex):
# bc apparently it's not in the contract obj?
exch = 'idealfx'
else:
# for wtv cucked reason some futes don't show their
# exchange (like CL.NYMEX) ...
entry = _adhoc_symbol_map.get(
con.symbol or con.localSymbol
)
if entry:
meta, kwargs = entry
cid = meta.get('conId')
if cid:
assert con.conId == meta['conId']
exch = meta['exchange']
assert exch, f'No clue:\n {con}'
fqsn = '.'.join((symbol, exch))
expiry = con.lastTradeDateOrContractMonth
if expiry:
fqsn += f'.{expiry}'
# TODO: options contracts into a sane format..
return (
@ -276,10 +305,12 @@ async def update_ledger_from_api_trades(
client: Union[Client, MethodProxy],
) -> tuple[
dict[str, Transaction],
dict[str, pp.Transaction],
dict[str, dict],
]:
conf = get_config()
# XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by
@ -300,33 +331,39 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch
conf = get_config()
entries = trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
# normalize recent session's trades to the `Transaction` type
trans_by_acct: dict[str, dict[str, Transaction]] = {}
# write recent session's trades to the user's (local) ledger file.
records: dict[str, pp.Transactions] = {}
for acctid, trades_by_id in entries.items():
# normalize to transaction form
trans_by_acct[acctid] = norm_trade_records(trades_by_id)
records[acctid] = norm_trade_records(trades_by_id)
return trans_by_acct, entries
return records, entries
async def update_and_audit_msgs(
acctid: str, # no `ib.` prefix is required!
pps: list[Position],
pps: list[pp.Position],
cids2pps: dict[tuple[str, int], BrokerdPosition],
validate: bool = False,
) -> list[BrokerdPosition]:
msgs: list[BrokerdPosition] = []
# pps: dict[int, pp.Position] = {}
for p in pps:
bsuid = p.bsuid
# build trade-session-actor local table
# of pps from unique symbol ids.
# pps[bsuid] = p
# retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent
# breakeven pp calcs.
@ -425,8 +462,6 @@ async def trades_dialogue(
all_positions = []
accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
acctids = set()
cids2pps: dict[str, BrokerdPosition] = {}
# TODO: this causes a massive tractor bug when you run marketstored
# with ``--tsdb``... you should get:
@ -436,170 +471,152 @@ async def trades_dialogue(
# - hitting final control-c to kill daemon will lead to hang
# assert 0
# TODO: just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
async with (
trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {}
tables: dict[str, PpTable] = {}
with (
ExitStack() as lstack,
):
for account, proxy in proxies.items():
for account, proxy in proxies.items():
acctid = account.strip('ib.')
acctids.add(acctid)
client = aioclients[account]
# open ledger and pptable wrapper for each
# detected account.
ledger = ledgers[acctid] = lstack.enter_context(
open_trade_ledger('ib', acctid)
)
table = tables[acctid] = lstack.enter_context(
open_pps('ib', acctid)
)
client = aioclients[account]
async def open_trade_event_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
assert account in accounts_def
accounts.add(account)
# update trades ledgers for all accounts from connected
# api clients which report trades for **this session**.
trades = await proxy.trades()
(
trans_by_acct,
api_ready_for_ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
# if new trades are detected from the API, prepare
# them for the ledger file and update the pptable.
if api_ready_for_ledger_entries:
trade_entries = api_ready_for_ledger_entries[acctid]
ledger.update(trade_entries)
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
# for client in aioclients.values():
for pos in client.positions():
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
table = tables[acctid]
pp = table.pps.get(bsuid)
if (
not pp
or pp.size != msg.size
):
trans = norm_trade_records(ledger)
updated = table.update_from_trans(trans)
pp = updated[bsuid]
assert msg.size == pp.size, 'WTF'
# TODO: figure out why these don't match?
# assert pp.calc_be_price() == pp.be_price
_, closed_pps = table.dump_active('ib')
active_pps = table.pps
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for pps in [active_pps, closed_pps]:
msgs = await update_and_audit_msgs(
acctid,
pps.values(),
cids2pps,
validate=True,
)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
'Positions reported by ib but not found in `pps.toml`!?\n'
f'{pformat(cids2pps)}'
)
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
# write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
ledgers[acctid].update(trades_by_id)
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
async def open_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# start order request handler **before** local trades
# event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def)
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
# allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon(
deliver_trade_events,
stream,
ems_stream,
accounts_def,
cids2pps,
proxies,
task_status.started(trade_event_stream)
await trio.sleep_forever()
ledgers,
tables,
)
trade_event_stream = await nurse.start(open_stream)
# block until cancelled
await trio.sleep_forever()
clients.append((client, trade_event_stream))
assert account in accounts_def
accounts.add(account)
cids2pps: dict[str, BrokerdPosition] = {}
update_records: dict[str, bidict] = {}
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
for client in aioclients.values():
for pos in client.positions():
cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, cid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
update_records.setdefault(acctid, bidict())[cid] = msg.symbol
# update trades ledgers for all accounts from
# connected api clients which report trades for **this session**.
new_trades = {}
for account, proxy in proxies.items():
trades = await proxy.trades()
(
records_by_acct,
ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
new_trades.update(records_by_acct)
for acctid, trans in new_trades.items():
for t in trans:
bsuid = t.bsuid
if bsuid in update_records:
assert update_records[bsuid] == t.fqsn
else:
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
# load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for acctid, to_update in update_records.items():
trans = new_trades.get(acctid)
active, closed = pp.update_pps_conf(
'ib',
acctid,
trade_records=trans,
ledger_reload=to_update,
)
for pps in [active, closed]:
msgs = await update_and_audit_msgs(
acctid,
pps.values(),
cids2pps,
validate=True,
)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
'Positions reported by ib but not found in `pps.toml`!?\n'
f'{pformat(cids2pps)}'
)
# log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
# TODO: maybe just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
# write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def)
# allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon(
deliver_trade_events,
stream,
ems_stream,
accounts_def,
cids2pps,
proxies,
)
# block until cancelled
await trio.sleep_forever()
async def emit_pp_update(
@ -609,44 +626,44 @@ async def emit_pp_update(
proxies: dict,
cids2pps: dict,
ledgers,
tables,
) -> None:
# compute and relay incrementally updated piker pp
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid]
acctid = acctid.strip('ib.')
(
records_by_acct,
api_ready_for_ledger_entries,
) = await update_ledger_from_api_trades(
acctname = acctid.strip('ib.')
records_by_acct, ledger_entries = await update_ledger_from_api_trades(
[trade_entry],
proxy,
)
trans = records_by_acct[acctid]
r = list(trans.values())[0]
records = records_by_acct[acctname]
r = records[0]
table = tables[acctid]
table.update_from_trans(trans)
_, closed = table.dump_active('ib')
active = table.pps
# update and load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as msgs to the
# ems. we report both the open and closed updates in one map since
# for incremental update we may have just fully closed a pp and need
# to relay that msg as well!
active, closed = pp.update_pps_conf(
'ib',
acctname,
trade_records=records,
ledger_reload={r.bsuid: r.fqsn},
)
# NOTE: update ledger with all new trades
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
ledger = ledgers[acctid]
ledger.update(trades_by_id)
# NOTE: write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
# generate pp msgs and cross check with ib's positions data, relay
# re-formatted pps as msgs to the ems.
for pos in filter(
bool,
[active.get(r.bsuid), closed.get(r.bsuid)]
):
msgs = await update_and_audit_msgs(
acctid,
acctname,
[pos],
cids2pps,
@ -668,9 +685,6 @@ async def deliver_trade_events(
cids2pps: dict[tuple[str, str], BrokerdPosition],
proxies: dict[str, MethodProxy],
ledgers,
tables,
) -> None:
'''
Format and relay all trade events for a given client to emsd.
@ -820,8 +834,6 @@ async def deliver_trade_events(
accounts_def,
proxies,
cids2pps,
ledgers,
tables,
)
case 'cost':
@ -854,8 +866,6 @@ async def deliver_trade_events(
accounts_def,
proxies,
cids2pps,
ledgers,
tables,
)
case 'error':
@ -906,13 +916,14 @@ async def deliver_trade_events(
def norm_trade_records(
ledger: dict[str, Any],
) -> list[Transaction]:
) -> list[pp.Transaction]:
'''
Normalize a flex report or API retrieved executions
ledger into our standard record format.
'''
records: dict[str, Transaction] = {}
records: list[pp.Transaction] = []
for tid, record in ledger.items():
conid = record.get('conId') or record['conid']
@ -990,7 +1001,7 @@ def norm_trade_records(
# which case, we can pull the fqsn from that table (see
# `trades_dialogue()` above).
records[tid] = Transaction(
records.append(pp.Transaction(
fqsn=fqsn,
tid=tid,
size=size,
@ -999,7 +1010,7 @@ def norm_trade_records(
dt=dt,
expiry=expiry,
bsuid=conid,
)
))
return records
@ -1128,7 +1139,8 @@ def load_flex_trades(
trade_entries = report.extract('Trade')
ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query')
# log.info(f'Loaded {ln} trades from flex query')
print(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names
@ -1137,20 +1149,14 @@ def load_flex_trades(
source_type='flex',
)
for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict)
log.info(
'New trades detected\n'
f'{pformat(tid_delta)}'
)
if tid_delta:
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
ledgers = {}
for acctid, trades_by_id in trades_by_account.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
return ledger_dict
ledgers[acctid] = ledger
return ledgers
if __name__ == '__main__':

View File

@ -301,13 +301,7 @@ async def get_bars(
else:
log.warning('Sending CONNECTION RESET')
res = await data_reset_hack(reset_type='connection')
if not res:
log.warning(
'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app'
)
# break
await data_reset_hack(reset_type='connection')
with trio.move_on_after(timeout) as cs:
for name, ev in [
@ -576,6 +570,7 @@ def normalize(
# check for special contract types
con = ticker.contract
fqsn, calc_price = con2fqsn(con)
# convert named tuples to dicts so we send usable keys
@ -847,10 +842,7 @@ async def data_reset_hack(
client.mouse.click()
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
try:
await tractor.to_asyncio.run_task(vnc_click_hack)
except OSError:
return False
await tractor.to_asyncio.run_task(vnc_click_hack)
# we don't really need the ``xdotool`` approach any more B)
return True
@ -865,24 +857,15 @@ async def open_symbol_search(
# TODO: load user defined symbol set locally for fast search?
await ctx.started({})
# async with open_data_client() as proxy:
async with (
open_client_proxies() as (proxies, clients),
):
async with open_data_client() as proxy:
async with ctx.open_stream() as stream:
# await tractor.breakpoint()
proxy = proxies['ib.algopaper']
last = time.time()
async for pattern in stream:
log.info(f'received {pattern}')
log.debug(f'received {pattern}')
now = time.time()
# this causes tractor hang...
# assert 0
assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz
@ -910,7 +893,7 @@ async def open_symbol_search(
continue
log.info(f'searching for {pattern}')
log.debug(f'searching for {pattern}')
last = time.time()
@ -921,25 +904,17 @@ async def open_symbol_search(
async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
for i in range(10):
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn:
sn.start_soon(
stash_results,
proxy.search_symbols(
pattern=pattern,
upto=5,
),
)
async with trio.open_nursery() as sn:
sn.start_soon(
stash_results,
proxy.search_symbols(
pattern=pattern,
upto=5,
),
)
# trigger async request
await trio.sleep(0)
if cs.cancelled_caught:
log.warning(f'Search timeout? {proxy._aio_ns.ib.client}')
continue
else:
break
# trigger async request
await trio.sleep(0)
# # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extractBests(