Compare commits

..

No commits in common. "e1d57e8a8a9dfd1143e601efeddd08a460d03e5f" and "a3d46f713e9d15115af80b021354d85569d8e9c4" have entirely different histories.

2 changed files with 247 additions and 266 deletions

View File

@ -18,7 +18,6 @@ Order and trades endpoints for use with ``piker``'s EMS.
""" """
from __future__ import annotations from __future__ import annotations
from contextlib import ExitStack
from dataclasses import asdict from dataclasses import asdict
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
@ -36,8 +35,8 @@ from trio_typing import TaskStatus
import tractor import tractor
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
# Option, Option,
# Forex, Forex,
) )
from ib_insync.order import ( from ib_insync.order import (
Trade, Trade,
@ -48,17 +47,11 @@ from ib_insync.objects import (
Execution, Execution,
CommissionReport, CommissionReport,
) )
from ib_insync.objects import Position as IbPosition from ib_insync.objects import Position
import pendulum import pendulum
from piker import config from piker import config
from piker.pp import ( from piker import pp
Position,
Transaction,
open_trade_ledger,
open_pps,
PpTable,
)
from piker.log import get_console_log from piker.log import get_console_log
from piker.clearing._messages import ( from piker.clearing._messages import (
BrokerdOrder, BrokerdOrder,
@ -73,8 +66,7 @@ from piker.data._source import Symbol
from .api import ( from .api import (
_accounts2clients, _accounts2clients,
# _adhoc_futes_set, # _adhoc_futes_set,
con2fqsn, _adhoc_symbol_map,
# _adhoc_symbol_map,
log, log,
get_config, get_config,
open_client_proxies, open_client_proxies,
@ -84,12 +76,49 @@ from .api import (
def pack_position( def pack_position(
pos: IbPosition pos: Position
) -> dict[str, Any]: ) -> dict[str, Any]:
con = pos.contract con = pos.contract
fqsn, calc_price = con2fqsn(con)
if isinstance(con, Option):
# TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '')
else:
# TODO: lookup fqsn even for derivs.
symbol = con.symbol.lower()
# TODO: probably write a mofo exchange mapper routine since ib
# can't get it's shit together like, ever.
# try our best to figure out the exchange / venue
exch = (con.primaryExchange or con.exchange).lower()
if not exch:
if isinstance(con, Forex):
# bc apparently it's not in the contract obj?
exch = 'idealfx'
else:
# for wtv cucked reason some futes don't show their
# exchange (like CL.NYMEX) ...
entry = _adhoc_symbol_map.get(
con.symbol or con.localSymbol
)
if entry:
meta, kwargs = entry
cid = meta.get('conId')
if cid:
assert con.conId == meta['conId']
exch = meta['exchange']
assert exch, f'No clue:\n {con}'
fqsn = '.'.join((symbol, exch))
expiry = con.lastTradeDateOrContractMonth
if expiry:
fqsn += f'.{expiry}'
# TODO: options contracts into a sane format.. # TODO: options contracts into a sane format..
return ( return (
@ -276,10 +305,12 @@ async def update_ledger_from_api_trades(
client: Union[Client, MethodProxy], client: Union[Client, MethodProxy],
) -> tuple[ ) -> tuple[
dict[str, Transaction], dict[str, pp.Transaction],
dict[str, dict], dict[str, dict],
]: ]:
conf = get_config()
# XXX; ERRGGG.. # XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a # pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by # contract lookup since it seems this isn't available by
@ -300,33 +331,39 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch entry['listingExchange'] = pexch
conf = get_config()
entries = trades_to_ledger_entries( entries = trades_to_ledger_entries(
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,
) )
# normalize recent session's trades to the `Transaction` type
trans_by_acct: dict[str, dict[str, Transaction]] = {} # write recent session's trades to the user's (local) ledger file.
records: dict[str, pp.Transactions] = {}
for acctid, trades_by_id in entries.items(): for acctid, trades_by_id in entries.items():
# normalize to transaction form # normalize to transaction form
trans_by_acct[acctid] = norm_trade_records(trades_by_id) records[acctid] = norm_trade_records(trades_by_id)
return trans_by_acct, entries return records, entries
async def update_and_audit_msgs( async def update_and_audit_msgs(
acctid: str, # no `ib.` prefix is required! acctid: str, # no `ib.` prefix is required!
pps: list[Position], pps: list[pp.Position],
cids2pps: dict[tuple[str, int], BrokerdPosition], cids2pps: dict[tuple[str, int], BrokerdPosition],
validate: bool = False, validate: bool = False,
) -> list[BrokerdPosition]: ) -> list[BrokerdPosition]:
msgs: list[BrokerdPosition] = [] msgs: list[BrokerdPosition] = []
# pps: dict[int, pp.Position] = {}
for p in pps: for p in pps:
bsuid = p.bsuid bsuid = p.bsuid
# build trade-session-actor local table
# of pps from unique symbol ids.
# pps[bsuid] = p
# retreive equivalent ib reported position message # retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent # for comparison/audit versus the piker equivalent
# breakeven pp calcs. # breakeven pp calcs.
@ -425,8 +462,6 @@ async def trades_dialogue(
all_positions = [] all_positions = []
accounts = set() accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
acctids = set()
cids2pps: dict[str, BrokerdPosition] = {}
# TODO: this causes a massive tractor bug when you run marketstored # TODO: this causes a massive tractor bug when you run marketstored
# with ``--tsdb``... you should get: # with ``--tsdb``... you should get:
@ -436,38 +471,15 @@ async def trades_dialogue(
# - hitting final control-c to kill daemon will lead to hang # - hitting final control-c to kill daemon will lead to hang
# assert 0 # assert 0
# TODO: just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
async with ( async with (
trio.open_nursery() as nurse, trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients), open_client_proxies() as (proxies, aioclients),
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {}
tables: dict[str, PpTable] = {}
with (
ExitStack() as lstack,
): ):
for account, proxy in proxies.items(): for account, proxy in proxies.items():
acctid = account.strip('ib.')
acctids.add(acctid)
# open ledger and pptable wrapper for each
# detected account.
ledger = ledgers[acctid] = lstack.enter_context(
open_trade_ledger('ib', acctid)
)
table = tables[acctid] = lstack.enter_context(
open_pps('ib', acctid)
)
client = aioclients[account] client = aioclients[account]
async def open_trade_event_stream( async def open_stream(
task_status: TaskStatus[ task_status: TaskStatus[
trio.abc.ReceiveChannel trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
@ -481,77 +493,75 @@ async def trades_dialogue(
task_status.started(trade_event_stream) task_status.started(trade_event_stream)
await trio.sleep_forever() await trio.sleep_forever()
trade_event_stream = await nurse.start(open_trade_event_stream) trade_event_stream = await nurse.start(open_stream)
clients.append((client, trade_event_stream)) clients.append((client, trade_event_stream))
assert account in accounts_def assert account in accounts_def
accounts.add(account) accounts.add(account)
# update trades ledgers for all accounts from connected cids2pps: dict[str, BrokerdPosition] = {}
# api clients which report trades for **this session**. update_records: dict[str, bidict] = {}
trades = await proxy.trades()
(
trans_by_acct,
api_ready_for_ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
# if new trades are detected from the API, prepare
# them for the ledger file and update the pptable.
if api_ready_for_ledger_entries:
trade_entries = api_ready_for_ledger_entries[acctid]
ledger.update(trade_entries)
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
# process pp value reported from ib's system. we only use these # process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses # to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in # the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose # a price that's not useful for traders who want to not lose
# money.. xb # money.. xb
# for client in aioclients.values(): for client in aioclients.values():
for pos in client.positions(): for pos in client.positions():
cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, cid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
# collect all ib-pp reported positions so that we can be # collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if # sure know which positions to update from the ledger if
# any are missing from the ``pps.toml`` # any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos) update_records.setdefault(acctid, bidict())[cid] = msg.symbol
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
table = tables[acctid] # update trades ledgers for all accounts from
pp = table.pps.get(bsuid) # connected api clients which report trades for **this session**.
if ( new_trades = {}
not pp for account, proxy in proxies.items():
or pp.size != msg.size trades = await proxy.trades()
): (
trans = norm_trade_records(ledger) records_by_acct,
updated = table.update_from_trans(trans) ledger_entries,
pp = updated[bsuid] ) = await update_ledger_from_api_trades(
assert msg.size == pp.size, 'WTF' trades,
proxy,
)
new_trades.update(records_by_acct)
# TODO: figure out why these don't match? for acctid, trans in new_trades.items():
# assert pp.calc_be_price() == pp.be_price for t in trans:
bsuid = t.bsuid
if bsuid in update_records:
assert update_records[bsuid] == t.fqsn
else:
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
_, closed_pps = table.dump_active('ib') # load all positions from `pps.toml`, cross check with ib's
active_pps = table.pps # positions data, and relay re-formatted pps as msgs to the ems.
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
# __2 cases__: # __2 cases__:
# - new trades have taken place this session that we want to # - new trades have taken place this session that we want to
# always reprocess indempotently, # always reprocess indempotently,
# - no new trades yet but we want to reload and audit any # - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in # positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file. # piker's ``pps.toml`` state-file.
for pps in [active_pps, closed_pps]: for acctid, to_update in update_records.items():
trans = new_trades.get(acctid)
active, closed = pp.update_pps_conf(
'ib',
acctid,
trade_records=trans,
ledger_reload=to_update,
)
for pps in [active, closed]:
msgs = await update_and_audit_msgs( msgs = await update_and_audit_msgs(
acctid, acctid,
pps.values(), pps.values(),
@ -566,22 +576,32 @@ async def trades_dialogue(
f'{pformat(cids2pps)}' f'{pformat(cids2pps)}'
) )
# log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started(( await ctx.started((
all_positions, all_positions,
tuple(name for name in accounts_def if name in accounts), tuple(name for name in accounts_def if name in accounts),
)) ))
# TODO: maybe just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
# write ledger with all new trades **AFTER** we've updated the # write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state! # `pps.toml` from the original ledger state!
for acctid, trades_by_id in api_ready_for_ledger_entries.items(): for acctid, trades_by_id in ledger_entries.items():
ledgers[acctid].update(trades_by_id) with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
# start order request handler **before** local trades # start order request handler **before** local trades event loop
# event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def) n.start_soon(handle_order_requests, ems_stream, accounts_def)
# allocate event relay tasks for each client connection # allocate event relay tasks for each client connection
@ -593,9 +613,6 @@ async def trades_dialogue(
accounts_def, accounts_def,
cids2pps, cids2pps,
proxies, proxies,
ledgers,
tables,
) )
# block until cancelled # block until cancelled
@ -609,44 +626,44 @@ async def emit_pp_update(
proxies: dict, proxies: dict,
cids2pps: dict, cids2pps: dict,
ledgers,
tables,
) -> None: ) -> None:
# compute and relay incrementally updated piker pp # compute and relay incrementally updated piker pp
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid] proxy = proxies[acctid]
acctid = acctid.strip('ib.') acctname = acctid.strip('ib.')
( records_by_acct, ledger_entries = await update_ledger_from_api_trades(
records_by_acct,
api_ready_for_ledger_entries,
) = await update_ledger_from_api_trades(
[trade_entry], [trade_entry],
proxy, proxy,
) )
trans = records_by_acct[acctid] records = records_by_acct[acctname]
r = list(trans.values())[0] r = records[0]
table = tables[acctid] # update and load all positions from `pps.toml`, cross check with
table.update_from_trans(trans) # ib's positions data, and relay re-formatted pps as msgs to the
_, closed = table.dump_active('ib') # ems. we report both the open and closed updates in one map since
active = table.pps # for incremental update we may have just fully closed a pp and need
# to relay that msg as well!
active, closed = pp.update_pps_conf(
'ib',
acctname,
trade_records=records,
ledger_reload={r.bsuid: r.fqsn},
)
# NOTE: update ledger with all new trades # NOTE: write ledger with all new trades **AFTER** we've updated the
for acctid, trades_by_id in api_ready_for_ledger_entries.items(): # `pps.toml` from the original ledger state!
ledger = ledgers[acctid] for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id) ledger.update(trades_by_id)
# generate pp msgs and cross check with ib's positions data, relay
# re-formatted pps as msgs to the ems.
for pos in filter( for pos in filter(
bool, bool,
[active.get(r.bsuid), closed.get(r.bsuid)] [active.get(r.bsuid), closed.get(r.bsuid)]
): ):
msgs = await update_and_audit_msgs( msgs = await update_and_audit_msgs(
acctid, acctname,
[pos], [pos],
cids2pps, cids2pps,
@ -668,9 +685,6 @@ async def deliver_trade_events(
cids2pps: dict[tuple[str, str], BrokerdPosition], cids2pps: dict[tuple[str, str], BrokerdPosition],
proxies: dict[str, MethodProxy], proxies: dict[str, MethodProxy],
ledgers,
tables,
) -> None: ) -> None:
''' '''
Format and relay all trade events for a given client to emsd. Format and relay all trade events for a given client to emsd.
@ -820,8 +834,6 @@ async def deliver_trade_events(
accounts_def, accounts_def,
proxies, proxies,
cids2pps, cids2pps,
ledgers,
tables,
) )
case 'cost': case 'cost':
@ -854,8 +866,6 @@ async def deliver_trade_events(
accounts_def, accounts_def,
proxies, proxies,
cids2pps, cids2pps,
ledgers,
tables,
) )
case 'error': case 'error':
@ -906,13 +916,14 @@ async def deliver_trade_events(
def norm_trade_records( def norm_trade_records(
ledger: dict[str, Any], ledger: dict[str, Any],
) -> list[Transaction]: ) -> list[pp.Transaction]:
''' '''
Normalize a flex report or API retrieved executions Normalize a flex report or API retrieved executions
ledger into our standard record format. ledger into our standard record format.
''' '''
records: dict[str, Transaction] = {} records: list[pp.Transaction] = []
for tid, record in ledger.items(): for tid, record in ledger.items():
conid = record.get('conId') or record['conid'] conid = record.get('conId') or record['conid']
@ -990,7 +1001,7 @@ def norm_trade_records(
# which case, we can pull the fqsn from that table (see # which case, we can pull the fqsn from that table (see
# `trades_dialogue()` above). # `trades_dialogue()` above).
records[tid] = Transaction( records.append(pp.Transaction(
fqsn=fqsn, fqsn=fqsn,
tid=tid, tid=tid,
size=size, size=size,
@ -999,7 +1010,7 @@ def norm_trade_records(
dt=dt, dt=dt,
expiry=expiry, expiry=expiry,
bsuid=conid, bsuid=conid,
) ))
return records return records
@ -1128,7 +1139,8 @@ def load_flex_trades(
trade_entries = report.extract('Trade') trade_entries = report.extract('Trade')
ln = len(trade_entries) ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query') # log.info(f'Loaded {ln} trades from flex query')
print(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries( trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names # get reverse map to user account names
@ -1137,20 +1149,14 @@ def load_flex_trades(
source_type='flex', source_type='flex',
) )
for acctid in trades_by_account: ledgers = {}
trades_by_id = trades_by_account[acctid] for acctid, trades_by_id in trades_by_account.items():
with open_trade_ledger('ib', acctid) as ledger_dict: with pp.open_trade_ledger('ib', acctid) as ledger:
tid_delta = set(trades_by_id) - set(ledger_dict) ledger.update(trades_by_id)
log.info(
'New trades detected\n'
f'{pformat(tid_delta)}'
)
if tid_delta:
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
return ledger_dict ledgers[acctid] = ledger
return ledgers
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -301,13 +301,7 @@ async def get_bars(
else: else:
log.warning('Sending CONNECTION RESET') log.warning('Sending CONNECTION RESET')
res = await data_reset_hack(reset_type='connection') await data_reset_hack(reset_type='connection')
if not res:
log.warning(
'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app'
)
# break
with trio.move_on_after(timeout) as cs: with trio.move_on_after(timeout) as cs:
for name, ev in [ for name, ev in [
@ -576,6 +570,7 @@ def normalize(
# check for special contract types # check for special contract types
con = ticker.contract con = ticker.contract
fqsn, calc_price = con2fqsn(con) fqsn, calc_price = con2fqsn(con)
# convert named tuples to dicts so we send usable keys # convert named tuples to dicts so we send usable keys
@ -847,10 +842,7 @@ async def data_reset_hack(
client.mouse.click() client.mouse.click()
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
try:
await tractor.to_asyncio.run_task(vnc_click_hack) await tractor.to_asyncio.run_task(vnc_click_hack)
except OSError:
return False
# we don't really need the ``xdotool`` approach any more B) # we don't really need the ``xdotool`` approach any more B)
return True return True
@ -865,24 +857,15 @@ async def open_symbol_search(
# TODO: load user defined symbol set locally for fast search? # TODO: load user defined symbol set locally for fast search?
await ctx.started({}) await ctx.started({})
# async with open_data_client() as proxy: async with open_data_client() as proxy:
async with (
open_client_proxies() as (proxies, clients),
):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# await tractor.breakpoint()
proxy = proxies['ib.algopaper']
last = time.time() last = time.time()
async for pattern in stream: async for pattern in stream:
log.info(f'received {pattern}') log.debug(f'received {pattern}')
now = time.time() now = time.time()
# this causes tractor hang...
# assert 0
assert pattern, 'IB can not accept blank search pattern' assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz # throttle search requests to no faster then 1Hz
@ -910,7 +893,7 @@ async def open_symbol_search(
continue continue
log.info(f'searching for {pattern}') log.debug(f'searching for {pattern}')
last = time.time() last = time.time()
@ -921,8 +904,6 @@ async def open_symbol_search(
async def stash_results(target: Awaitable[list]): async def stash_results(target: Awaitable[list]):
stock_results.extend(await target) stock_results.extend(await target)
for i in range(10):
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn: async with trio.open_nursery() as sn:
sn.start_soon( sn.start_soon(
stash_results, stash_results,
@ -935,12 +916,6 @@ async def open_symbol_search(
# trigger async request # trigger async request
await trio.sleep(0) await trio.sleep(0)
if cs.cancelled_caught:
log.warning(f'Search timeout? {proxy._aio_ns.ib.client}')
continue
else:
break
# # match against our ad-hoc set immediately # # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extractBests( # adhoc_matches = fuzzy.extractBests(
# pattern, # pattern,