Compare commits
5 Commits
a3d46f713e
...
e1d57e8a8a
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | e1d57e8a8a | |
Tyler Goodlet | ad458e3fcd | |
Tyler Goodlet | f26c399ad3 | |
Tyler Goodlet | a741ed3161 | |
Tyler Goodlet | b9fbbeb44e |
|
@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from contextlib import ExitStack
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
@ -35,8 +36,8 @@ from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from ib_insync.contract import (
|
from ib_insync.contract import (
|
||||||
Contract,
|
Contract,
|
||||||
Option,
|
# Option,
|
||||||
Forex,
|
# Forex,
|
||||||
)
|
)
|
||||||
from ib_insync.order import (
|
from ib_insync.order import (
|
||||||
Trade,
|
Trade,
|
||||||
|
@ -47,11 +48,17 @@ from ib_insync.objects import (
|
||||||
Execution,
|
Execution,
|
||||||
CommissionReport,
|
CommissionReport,
|
||||||
)
|
)
|
||||||
from ib_insync.objects import Position
|
from ib_insync.objects import Position as IbPosition
|
||||||
import pendulum
|
import pendulum
|
||||||
|
|
||||||
from piker import config
|
from piker import config
|
||||||
from piker import pp
|
from piker.pp import (
|
||||||
|
Position,
|
||||||
|
Transaction,
|
||||||
|
open_trade_ledger,
|
||||||
|
open_pps,
|
||||||
|
PpTable,
|
||||||
|
)
|
||||||
from piker.log import get_console_log
|
from piker.log import get_console_log
|
||||||
from piker.clearing._messages import (
|
from piker.clearing._messages import (
|
||||||
BrokerdOrder,
|
BrokerdOrder,
|
||||||
|
@ -66,7 +73,8 @@ from piker.data._source import Symbol
|
||||||
from .api import (
|
from .api import (
|
||||||
_accounts2clients,
|
_accounts2clients,
|
||||||
# _adhoc_futes_set,
|
# _adhoc_futes_set,
|
||||||
_adhoc_symbol_map,
|
con2fqsn,
|
||||||
|
# _adhoc_symbol_map,
|
||||||
log,
|
log,
|
||||||
get_config,
|
get_config,
|
||||||
open_client_proxies,
|
open_client_proxies,
|
||||||
|
@ -76,49 +84,12 @@ from .api import (
|
||||||
|
|
||||||
|
|
||||||
def pack_position(
|
def pack_position(
|
||||||
pos: Position
|
pos: IbPosition
|
||||||
|
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
|
|
||||||
con = pos.contract
|
con = pos.contract
|
||||||
|
fqsn, calc_price = con2fqsn(con)
|
||||||
if isinstance(con, Option):
|
|
||||||
# TODO: option symbol parsing and sane display:
|
|
||||||
symbol = con.localSymbol.replace(' ', '')
|
|
||||||
|
|
||||||
else:
|
|
||||||
# TODO: lookup fqsn even for derivs.
|
|
||||||
symbol = con.symbol.lower()
|
|
||||||
|
|
||||||
# TODO: probably write a mofo exchange mapper routine since ib
|
|
||||||
# can't get it's shit together like, ever.
|
|
||||||
|
|
||||||
# try our best to figure out the exchange / venue
|
|
||||||
exch = (con.primaryExchange or con.exchange).lower()
|
|
||||||
if not exch:
|
|
||||||
|
|
||||||
if isinstance(con, Forex):
|
|
||||||
# bc apparently it's not in the contract obj?
|
|
||||||
exch = 'idealfx'
|
|
||||||
|
|
||||||
else:
|
|
||||||
# for wtv cucked reason some futes don't show their
|
|
||||||
# exchange (like CL.NYMEX) ...
|
|
||||||
entry = _adhoc_symbol_map.get(
|
|
||||||
con.symbol or con.localSymbol
|
|
||||||
)
|
|
||||||
if entry:
|
|
||||||
meta, kwargs = entry
|
|
||||||
cid = meta.get('conId')
|
|
||||||
if cid:
|
|
||||||
assert con.conId == meta['conId']
|
|
||||||
exch = meta['exchange']
|
|
||||||
|
|
||||||
assert exch, f'No clue:\n {con}'
|
|
||||||
fqsn = '.'.join((symbol, exch))
|
|
||||||
|
|
||||||
expiry = con.lastTradeDateOrContractMonth
|
|
||||||
if expiry:
|
|
||||||
fqsn += f'.{expiry}'
|
|
||||||
|
|
||||||
# TODO: options contracts into a sane format..
|
# TODO: options contracts into a sane format..
|
||||||
return (
|
return (
|
||||||
|
@ -305,12 +276,10 @@ async def update_ledger_from_api_trades(
|
||||||
client: Union[Client, MethodProxy],
|
client: Union[Client, MethodProxy],
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
dict[str, pp.Transaction],
|
dict[str, Transaction],
|
||||||
dict[str, dict],
|
dict[str, dict],
|
||||||
]:
|
]:
|
||||||
|
|
||||||
conf = get_config()
|
|
||||||
|
|
||||||
# XXX; ERRGGG..
|
# XXX; ERRGGG..
|
||||||
# pack in the "primary/listing exchange" value from a
|
# pack in the "primary/listing exchange" value from a
|
||||||
# contract lookup since it seems this isn't available by
|
# contract lookup since it seems this isn't available by
|
||||||
|
@ -331,39 +300,33 @@ async def update_ledger_from_api_trades(
|
||||||
|
|
||||||
entry['listingExchange'] = pexch
|
entry['listingExchange'] = pexch
|
||||||
|
|
||||||
|
conf = get_config()
|
||||||
entries = trades_to_ledger_entries(
|
entries = trades_to_ledger_entries(
|
||||||
conf['accounts'].inverse,
|
conf['accounts'].inverse,
|
||||||
trade_entries,
|
trade_entries,
|
||||||
)
|
)
|
||||||
|
# normalize recent session's trades to the `Transaction` type
|
||||||
# write recent session's trades to the user's (local) ledger file.
|
trans_by_acct: dict[str, dict[str, Transaction]] = {}
|
||||||
records: dict[str, pp.Transactions] = {}
|
|
||||||
|
|
||||||
for acctid, trades_by_id in entries.items():
|
for acctid, trades_by_id in entries.items():
|
||||||
# normalize to transaction form
|
# normalize to transaction form
|
||||||
records[acctid] = norm_trade_records(trades_by_id)
|
trans_by_acct[acctid] = norm_trade_records(trades_by_id)
|
||||||
|
|
||||||
return records, entries
|
return trans_by_acct, entries
|
||||||
|
|
||||||
|
|
||||||
async def update_and_audit_msgs(
|
async def update_and_audit_msgs(
|
||||||
acctid: str, # no `ib.` prefix is required!
|
acctid: str, # no `ib.` prefix is required!
|
||||||
pps: list[pp.Position],
|
pps: list[Position],
|
||||||
cids2pps: dict[tuple[str, int], BrokerdPosition],
|
cids2pps: dict[tuple[str, int], BrokerdPosition],
|
||||||
validate: bool = False,
|
validate: bool = False,
|
||||||
|
|
||||||
) -> list[BrokerdPosition]:
|
) -> list[BrokerdPosition]:
|
||||||
|
|
||||||
msgs: list[BrokerdPosition] = []
|
msgs: list[BrokerdPosition] = []
|
||||||
# pps: dict[int, pp.Position] = {}
|
|
||||||
|
|
||||||
for p in pps:
|
for p in pps:
|
||||||
bsuid = p.bsuid
|
bsuid = p.bsuid
|
||||||
|
|
||||||
# build trade-session-actor local table
|
|
||||||
# of pps from unique symbol ids.
|
|
||||||
# pps[bsuid] = p
|
|
||||||
|
|
||||||
# retreive equivalent ib reported position message
|
# retreive equivalent ib reported position message
|
||||||
# for comparison/audit versus the piker equivalent
|
# for comparison/audit versus the piker equivalent
|
||||||
# breakeven pp calcs.
|
# breakeven pp calcs.
|
||||||
|
@ -462,6 +425,8 @@ async def trades_dialogue(
|
||||||
all_positions = []
|
all_positions = []
|
||||||
accounts = set()
|
accounts = set()
|
||||||
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
|
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
|
||||||
|
acctids = set()
|
||||||
|
cids2pps: dict[str, BrokerdPosition] = {}
|
||||||
|
|
||||||
# TODO: this causes a massive tractor bug when you run marketstored
|
# TODO: this causes a massive tractor bug when you run marketstored
|
||||||
# with ``--tsdb``... you should get:
|
# with ``--tsdb``... you should get:
|
||||||
|
@ -471,15 +436,38 @@ async def trades_dialogue(
|
||||||
# - hitting final control-c to kill daemon will lead to hang
|
# - hitting final control-c to kill daemon will lead to hang
|
||||||
# assert 0
|
# assert 0
|
||||||
|
|
||||||
|
# TODO: just write on teardown?
|
||||||
|
# we might also want to delegate a specific actor for
|
||||||
|
# ledger writing / reading for speed?
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as nurse,
|
trio.open_nursery() as nurse,
|
||||||
open_client_proxies() as (proxies, aioclients),
|
open_client_proxies() as (proxies, aioclients),
|
||||||
|
):
|
||||||
|
# Open a trade ledgers stack for appending trade records over
|
||||||
|
# multiple accounts.
|
||||||
|
# TODO: we probably want to generalize this into a "ledgers" api..
|
||||||
|
ledgers: dict[str, dict] = {}
|
||||||
|
tables: dict[str, PpTable] = {}
|
||||||
|
with (
|
||||||
|
ExitStack() as lstack,
|
||||||
):
|
):
|
||||||
for account, proxy in proxies.items():
|
for account, proxy in proxies.items():
|
||||||
|
|
||||||
|
acctid = account.strip('ib.')
|
||||||
|
acctids.add(acctid)
|
||||||
|
|
||||||
|
# open ledger and pptable wrapper for each
|
||||||
|
# detected account.
|
||||||
|
ledger = ledgers[acctid] = lstack.enter_context(
|
||||||
|
open_trade_ledger('ib', acctid)
|
||||||
|
)
|
||||||
|
table = tables[acctid] = lstack.enter_context(
|
||||||
|
open_pps('ib', acctid)
|
||||||
|
)
|
||||||
|
|
||||||
client = aioclients[account]
|
client = aioclients[account]
|
||||||
|
|
||||||
async def open_stream(
|
async def open_trade_event_stream(
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
trio.abc.ReceiveChannel
|
trio.abc.ReceiveChannel
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
@ -493,75 +481,77 @@ async def trades_dialogue(
|
||||||
task_status.started(trade_event_stream)
|
task_status.started(trade_event_stream)
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
trade_event_stream = await nurse.start(open_stream)
|
trade_event_stream = await nurse.start(open_trade_event_stream)
|
||||||
|
|
||||||
clients.append((client, trade_event_stream))
|
clients.append((client, trade_event_stream))
|
||||||
|
|
||||||
assert account in accounts_def
|
assert account in accounts_def
|
||||||
accounts.add(account)
|
accounts.add(account)
|
||||||
|
|
||||||
cids2pps: dict[str, BrokerdPosition] = {}
|
# update trades ledgers for all accounts from connected
|
||||||
update_records: dict[str, bidict] = {}
|
# api clients which report trades for **this session**.
|
||||||
|
trades = await proxy.trades()
|
||||||
|
(
|
||||||
|
trans_by_acct,
|
||||||
|
api_ready_for_ledger_entries,
|
||||||
|
) = await update_ledger_from_api_trades(
|
||||||
|
trades,
|
||||||
|
proxy,
|
||||||
|
)
|
||||||
|
|
||||||
|
# if new trades are detected from the API, prepare
|
||||||
|
# them for the ledger file and update the pptable.
|
||||||
|
if api_ready_for_ledger_entries:
|
||||||
|
trade_entries = api_ready_for_ledger_entries[acctid]
|
||||||
|
ledger.update(trade_entries)
|
||||||
|
trans = trans_by_acct.get(acctid)
|
||||||
|
if trans:
|
||||||
|
table.update_from_trans(trans)
|
||||||
|
|
||||||
# process pp value reported from ib's system. we only use these
|
# process pp value reported from ib's system. we only use these
|
||||||
# to cross-check sizing since average pricing on their end uses
|
# to cross-check sizing since average pricing on their end uses
|
||||||
# the so called (bs) "FIFO" style which more or less results in
|
# the so called (bs) "FIFO" style which more or less results in
|
||||||
# a price that's not useful for traders who want to not lose
|
# a price that's not useful for traders who want to not lose
|
||||||
# money.. xb
|
# money.. xb
|
||||||
for client in aioclients.values():
|
# for client in aioclients.values():
|
||||||
for pos in client.positions():
|
for pos in client.positions():
|
||||||
|
|
||||||
cid, msg = pack_position(pos)
|
|
||||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
|
||||||
acctid = acctid.strip('ib.')
|
|
||||||
cids2pps[(acctid, cid)] = msg
|
|
||||||
assert msg.account in accounts, (
|
|
||||||
f'Position for unknown account: {msg.account}')
|
|
||||||
|
|
||||||
# collect all ib-pp reported positions so that we can be
|
# collect all ib-pp reported positions so that we can be
|
||||||
# sure know which positions to update from the ledger if
|
# sure know which positions to update from the ledger if
|
||||||
# any are missing from the ``pps.toml``
|
# any are missing from the ``pps.toml``
|
||||||
update_records.setdefault(acctid, bidict())[cid] = msg.symbol
|
bsuid, msg = pack_position(pos)
|
||||||
|
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||||
|
acctid = acctid.strip('ib.')
|
||||||
|
cids2pps[(acctid, bsuid)] = msg
|
||||||
|
assert msg.account in accounts, (
|
||||||
|
f'Position for unknown account: {msg.account}')
|
||||||
|
|
||||||
# update trades ledgers for all accounts from
|
table = tables[acctid]
|
||||||
# connected api clients which report trades for **this session**.
|
pp = table.pps.get(bsuid)
|
||||||
new_trades = {}
|
if (
|
||||||
for account, proxy in proxies.items():
|
not pp
|
||||||
trades = await proxy.trades()
|
or pp.size != msg.size
|
||||||
(
|
):
|
||||||
records_by_acct,
|
trans = norm_trade_records(ledger)
|
||||||
ledger_entries,
|
updated = table.update_from_trans(trans)
|
||||||
) = await update_ledger_from_api_trades(
|
pp = updated[bsuid]
|
||||||
trades,
|
assert msg.size == pp.size, 'WTF'
|
||||||
proxy,
|
|
||||||
)
|
|
||||||
new_trades.update(records_by_acct)
|
|
||||||
|
|
||||||
for acctid, trans in new_trades.items():
|
# TODO: figure out why these don't match?
|
||||||
for t in trans:
|
# assert pp.calc_be_price() == pp.be_price
|
||||||
bsuid = t.bsuid
|
|
||||||
if bsuid in update_records:
|
|
||||||
assert update_records[bsuid] == t.fqsn
|
|
||||||
else:
|
|
||||||
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
|
|
||||||
|
|
||||||
# load all positions from `pps.toml`, cross check with ib's
|
_, closed_pps = table.dump_active('ib')
|
||||||
# positions data, and relay re-formatted pps as msgs to the ems.
|
active_pps = table.pps
|
||||||
|
|
||||||
|
# load all positions from `pps.toml`, cross check with
|
||||||
|
# ib's positions data, and relay re-formatted pps as
|
||||||
|
# msgs to the ems.
|
||||||
# __2 cases__:
|
# __2 cases__:
|
||||||
# - new trades have taken place this session that we want to
|
# - new trades have taken place this session that we want to
|
||||||
# always reprocess indempotently,
|
# always reprocess indempotently,
|
||||||
# - no new trades yet but we want to reload and audit any
|
# - no new trades yet but we want to reload and audit any
|
||||||
# positions reported by ib's sys that may not yet be in
|
# positions reported by ib's sys that may not yet be in
|
||||||
# piker's ``pps.toml`` state-file.
|
# piker's ``pps.toml`` state-file.
|
||||||
for acctid, to_update in update_records.items():
|
for pps in [active_pps, closed_pps]:
|
||||||
trans = new_trades.get(acctid)
|
|
||||||
active, closed = pp.update_pps_conf(
|
|
||||||
'ib',
|
|
||||||
acctid,
|
|
||||||
trade_records=trans,
|
|
||||||
ledger_reload=to_update,
|
|
||||||
)
|
|
||||||
for pps in [active, closed]:
|
|
||||||
msgs = await update_and_audit_msgs(
|
msgs = await update_and_audit_msgs(
|
||||||
acctid,
|
acctid,
|
||||||
pps.values(),
|
pps.values(),
|
||||||
|
@ -576,32 +566,22 @@ async def trades_dialogue(
|
||||||
f'{pformat(cids2pps)}'
|
f'{pformat(cids2pps)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# log.info(f'Loaded {len(trades)} from this session')
|
|
||||||
# TODO: write trades to local ``trades.toml``
|
|
||||||
# - use above per-session trades data and write to local file
|
|
||||||
# - get the "flex reports" working and pull historical data and
|
|
||||||
# also save locally.
|
|
||||||
|
|
||||||
await ctx.started((
|
await ctx.started((
|
||||||
all_positions,
|
all_positions,
|
||||||
tuple(name for name in accounts_def if name in accounts),
|
tuple(name for name in accounts_def if name in accounts),
|
||||||
))
|
))
|
||||||
|
|
||||||
# TODO: maybe just write on teardown?
|
|
||||||
# we might also want to delegate a specific actor for
|
|
||||||
# ledger writing / reading for speed?
|
|
||||||
|
|
||||||
# write ledger with all new trades **AFTER** we've updated the
|
# write ledger with all new trades **AFTER** we've updated the
|
||||||
# `pps.toml` from the original ledger state!
|
# `pps.toml` from the original ledger state!
|
||||||
for acctid, trades_by_id in ledger_entries.items():
|
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
|
||||||
with pp.open_trade_ledger('ib', acctid) as ledger:
|
ledgers[acctid].update(trades_by_id)
|
||||||
ledger.update(trades_by_id)
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream() as ems_stream,
|
ctx.open_stream() as ems_stream,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
# start order request handler **before** local trades event loop
|
# start order request handler **before** local trades
|
||||||
|
# event loop
|
||||||
n.start_soon(handle_order_requests, ems_stream, accounts_def)
|
n.start_soon(handle_order_requests, ems_stream, accounts_def)
|
||||||
|
|
||||||
# allocate event relay tasks for each client connection
|
# allocate event relay tasks for each client connection
|
||||||
|
@ -613,6 +593,9 @@ async def trades_dialogue(
|
||||||
accounts_def,
|
accounts_def,
|
||||||
cids2pps,
|
cids2pps,
|
||||||
proxies,
|
proxies,
|
||||||
|
|
||||||
|
ledgers,
|
||||||
|
tables,
|
||||||
)
|
)
|
||||||
|
|
||||||
# block until cancelled
|
# block until cancelled
|
||||||
|
@ -626,44 +609,44 @@ async def emit_pp_update(
|
||||||
proxies: dict,
|
proxies: dict,
|
||||||
cids2pps: dict,
|
cids2pps: dict,
|
||||||
|
|
||||||
|
ledgers,
|
||||||
|
tables,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# compute and relay incrementally updated piker pp
|
# compute and relay incrementally updated piker pp
|
||||||
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
|
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
|
||||||
proxy = proxies[acctid]
|
proxy = proxies[acctid]
|
||||||
|
|
||||||
acctname = acctid.strip('ib.')
|
acctid = acctid.strip('ib.')
|
||||||
records_by_acct, ledger_entries = await update_ledger_from_api_trades(
|
(
|
||||||
|
records_by_acct,
|
||||||
|
api_ready_for_ledger_entries,
|
||||||
|
) = await update_ledger_from_api_trades(
|
||||||
[trade_entry],
|
[trade_entry],
|
||||||
proxy,
|
proxy,
|
||||||
)
|
)
|
||||||
records = records_by_acct[acctname]
|
trans = records_by_acct[acctid]
|
||||||
r = records[0]
|
r = list(trans.values())[0]
|
||||||
|
|
||||||
# update and load all positions from `pps.toml`, cross check with
|
table = tables[acctid]
|
||||||
# ib's positions data, and relay re-formatted pps as msgs to the
|
table.update_from_trans(trans)
|
||||||
# ems. we report both the open and closed updates in one map since
|
_, closed = table.dump_active('ib')
|
||||||
# for incremental update we may have just fully closed a pp and need
|
active = table.pps
|
||||||
# to relay that msg as well!
|
|
||||||
active, closed = pp.update_pps_conf(
|
|
||||||
'ib',
|
|
||||||
acctname,
|
|
||||||
trade_records=records,
|
|
||||||
ledger_reload={r.bsuid: r.fqsn},
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: write ledger with all new trades **AFTER** we've updated the
|
# NOTE: update ledger with all new trades
|
||||||
# `pps.toml` from the original ledger state!
|
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
|
||||||
for acctid, trades_by_id in ledger_entries.items():
|
ledger = ledgers[acctid]
|
||||||
with pp.open_trade_ledger('ib', acctid) as ledger:
|
|
||||||
ledger.update(trades_by_id)
|
ledger.update(trades_by_id)
|
||||||
|
|
||||||
|
# generate pp msgs and cross check with ib's positions data, relay
|
||||||
|
# re-formatted pps as msgs to the ems.
|
||||||
for pos in filter(
|
for pos in filter(
|
||||||
bool,
|
bool,
|
||||||
[active.get(r.bsuid), closed.get(r.bsuid)]
|
[active.get(r.bsuid), closed.get(r.bsuid)]
|
||||||
):
|
):
|
||||||
msgs = await update_and_audit_msgs(
|
msgs = await update_and_audit_msgs(
|
||||||
acctname,
|
acctid,
|
||||||
[pos],
|
[pos],
|
||||||
cids2pps,
|
cids2pps,
|
||||||
|
|
||||||
|
@ -685,6 +668,9 @@ async def deliver_trade_events(
|
||||||
cids2pps: dict[tuple[str, str], BrokerdPosition],
|
cids2pps: dict[tuple[str, str], BrokerdPosition],
|
||||||
proxies: dict[str, MethodProxy],
|
proxies: dict[str, MethodProxy],
|
||||||
|
|
||||||
|
ledgers,
|
||||||
|
tables,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Format and relay all trade events for a given client to emsd.
|
Format and relay all trade events for a given client to emsd.
|
||||||
|
@ -834,6 +820,8 @@ async def deliver_trade_events(
|
||||||
accounts_def,
|
accounts_def,
|
||||||
proxies,
|
proxies,
|
||||||
cids2pps,
|
cids2pps,
|
||||||
|
ledgers,
|
||||||
|
tables,
|
||||||
)
|
)
|
||||||
|
|
||||||
case 'cost':
|
case 'cost':
|
||||||
|
@ -866,6 +854,8 @@ async def deliver_trade_events(
|
||||||
accounts_def,
|
accounts_def,
|
||||||
proxies,
|
proxies,
|
||||||
cids2pps,
|
cids2pps,
|
||||||
|
ledgers,
|
||||||
|
tables,
|
||||||
)
|
)
|
||||||
|
|
||||||
case 'error':
|
case 'error':
|
||||||
|
@ -916,14 +906,13 @@ async def deliver_trade_events(
|
||||||
def norm_trade_records(
|
def norm_trade_records(
|
||||||
ledger: dict[str, Any],
|
ledger: dict[str, Any],
|
||||||
|
|
||||||
) -> list[pp.Transaction]:
|
) -> list[Transaction]:
|
||||||
'''
|
'''
|
||||||
Normalize a flex report or API retrieved executions
|
Normalize a flex report or API retrieved executions
|
||||||
ledger into our standard record format.
|
ledger into our standard record format.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
records: list[pp.Transaction] = []
|
records: dict[str, Transaction] = {}
|
||||||
|
|
||||||
for tid, record in ledger.items():
|
for tid, record in ledger.items():
|
||||||
|
|
||||||
conid = record.get('conId') or record['conid']
|
conid = record.get('conId') or record['conid']
|
||||||
|
@ -1001,7 +990,7 @@ def norm_trade_records(
|
||||||
# which case, we can pull the fqsn from that table (see
|
# which case, we can pull the fqsn from that table (see
|
||||||
# `trades_dialogue()` above).
|
# `trades_dialogue()` above).
|
||||||
|
|
||||||
records.append(pp.Transaction(
|
records[tid] = Transaction(
|
||||||
fqsn=fqsn,
|
fqsn=fqsn,
|
||||||
tid=tid,
|
tid=tid,
|
||||||
size=size,
|
size=size,
|
||||||
|
@ -1010,7 +999,7 @@ def norm_trade_records(
|
||||||
dt=dt,
|
dt=dt,
|
||||||
expiry=expiry,
|
expiry=expiry,
|
||||||
bsuid=conid,
|
bsuid=conid,
|
||||||
))
|
)
|
||||||
|
|
||||||
return records
|
return records
|
||||||
|
|
||||||
|
@ -1139,8 +1128,7 @@ def load_flex_trades(
|
||||||
|
|
||||||
trade_entries = report.extract('Trade')
|
trade_entries = report.extract('Trade')
|
||||||
ln = len(trade_entries)
|
ln = len(trade_entries)
|
||||||
# log.info(f'Loaded {ln} trades from flex query')
|
log.info(f'Loaded {ln} trades from flex query')
|
||||||
print(f'Loaded {ln} trades from flex query')
|
|
||||||
|
|
||||||
trades_by_account = trades_to_ledger_entries(
|
trades_by_account = trades_to_ledger_entries(
|
||||||
# get reverse map to user account names
|
# get reverse map to user account names
|
||||||
|
@ -1149,14 +1137,20 @@ def load_flex_trades(
|
||||||
source_type='flex',
|
source_type='flex',
|
||||||
)
|
)
|
||||||
|
|
||||||
ledgers = {}
|
for acctid in trades_by_account:
|
||||||
for acctid, trades_by_id in trades_by_account.items():
|
trades_by_id = trades_by_account[acctid]
|
||||||
with pp.open_trade_ledger('ib', acctid) as ledger:
|
with open_trade_ledger('ib', acctid) as ledger_dict:
|
||||||
ledger.update(trades_by_id)
|
tid_delta = set(trades_by_id) - set(ledger_dict)
|
||||||
|
log.info(
|
||||||
|
'New trades detected\n'
|
||||||
|
f'{pformat(tid_delta)}'
|
||||||
|
)
|
||||||
|
if tid_delta:
|
||||||
|
ledger_dict.update(
|
||||||
|
{tid: trades_by_id[tid] for tid in tid_delta}
|
||||||
|
)
|
||||||
|
|
||||||
ledgers[acctid] = ledger
|
return ledger_dict
|
||||||
|
|
||||||
return ledgers
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -301,7 +301,13 @@ async def get_bars(
|
||||||
else:
|
else:
|
||||||
|
|
||||||
log.warning('Sending CONNECTION RESET')
|
log.warning('Sending CONNECTION RESET')
|
||||||
await data_reset_hack(reset_type='connection')
|
res = await data_reset_hack(reset_type='connection')
|
||||||
|
if not res:
|
||||||
|
log.warning(
|
||||||
|
'NO VNC DETECTED!\n'
|
||||||
|
'Manually press ctrl-alt-f on your IB java app'
|
||||||
|
)
|
||||||
|
# break
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
for name, ev in [
|
for name, ev in [
|
||||||
|
@ -570,7 +576,6 @@ def normalize(
|
||||||
|
|
||||||
# check for special contract types
|
# check for special contract types
|
||||||
con = ticker.contract
|
con = ticker.contract
|
||||||
|
|
||||||
fqsn, calc_price = con2fqsn(con)
|
fqsn, calc_price = con2fqsn(con)
|
||||||
|
|
||||||
# convert named tuples to dicts so we send usable keys
|
# convert named tuples to dicts so we send usable keys
|
||||||
|
@ -842,7 +847,10 @@ async def data_reset_hack(
|
||||||
client.mouse.click()
|
client.mouse.click()
|
||||||
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
|
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
|
||||||
|
|
||||||
|
try:
|
||||||
await tractor.to_asyncio.run_task(vnc_click_hack)
|
await tractor.to_asyncio.run_task(vnc_click_hack)
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
|
||||||
# we don't really need the ``xdotool`` approach any more B)
|
# we don't really need the ``xdotool`` approach any more B)
|
||||||
return True
|
return True
|
||||||
|
@ -857,15 +865,24 @@ async def open_symbol_search(
|
||||||
# TODO: load user defined symbol set locally for fast search?
|
# TODO: load user defined symbol set locally for fast search?
|
||||||
await ctx.started({})
|
await ctx.started({})
|
||||||
|
|
||||||
async with open_data_client() as proxy:
|
# async with open_data_client() as proxy:
|
||||||
|
async with (
|
||||||
|
open_client_proxies() as (proxies, clients),
|
||||||
|
):
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
# await tractor.breakpoint()
|
||||||
|
proxy = proxies['ib.algopaper']
|
||||||
|
|
||||||
last = time.time()
|
last = time.time()
|
||||||
|
|
||||||
async for pattern in stream:
|
async for pattern in stream:
|
||||||
log.debug(f'received {pattern}')
|
log.info(f'received {pattern}')
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
|
||||||
|
# this causes tractor hang...
|
||||||
|
# assert 0
|
||||||
|
|
||||||
assert pattern, 'IB can not accept blank search pattern'
|
assert pattern, 'IB can not accept blank search pattern'
|
||||||
|
|
||||||
# throttle search requests to no faster then 1Hz
|
# throttle search requests to no faster then 1Hz
|
||||||
|
@ -893,7 +910,7 @@ async def open_symbol_search(
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
log.debug(f'searching for {pattern}')
|
log.info(f'searching for {pattern}')
|
||||||
|
|
||||||
last = time.time()
|
last = time.time()
|
||||||
|
|
||||||
|
@ -904,6 +921,8 @@ async def open_symbol_search(
|
||||||
async def stash_results(target: Awaitable[list]):
|
async def stash_results(target: Awaitable[list]):
|
||||||
stock_results.extend(await target)
|
stock_results.extend(await target)
|
||||||
|
|
||||||
|
for i in range(10):
|
||||||
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery() as sn:
|
async with trio.open_nursery() as sn:
|
||||||
sn.start_soon(
|
sn.start_soon(
|
||||||
stash_results,
|
stash_results,
|
||||||
|
@ -916,6 +935,12 @@ async def open_symbol_search(
|
||||||
# trigger async request
|
# trigger async request
|
||||||
await trio.sleep(0)
|
await trio.sleep(0)
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
log.warning(f'Search timeout? {proxy._aio_ns.ib.client}')
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
# # match against our ad-hoc set immediately
|
# # match against our ad-hoc set immediately
|
||||||
# adhoc_matches = fuzzy.extractBests(
|
# adhoc_matches = fuzzy.extractBests(
|
||||||
# pattern,
|
# pattern,
|
||||||
|
|
Loading…
Reference in New Issue