Compare commits
No commits in common. "e1d57e8a8a9dfd1143e601efeddd08a460d03e5f" and "a3d46f713e9d15115af80b021354d85569d8e9c4" have entirely different histories.
e1d57e8a8a
...
a3d46f713e
|
@ -18,7 +18,6 @@ Order and trades endpoints for use with ``piker``'s EMS.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextlib import ExitStack
|
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
@ -36,8 +35,8 @@ from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from ib_insync.contract import (
|
from ib_insync.contract import (
|
||||||
Contract,
|
Contract,
|
||||||
# Option,
|
Option,
|
||||||
# Forex,
|
Forex,
|
||||||
)
|
)
|
||||||
from ib_insync.order import (
|
from ib_insync.order import (
|
||||||
Trade,
|
Trade,
|
||||||
|
@ -48,17 +47,11 @@ from ib_insync.objects import (
|
||||||
Execution,
|
Execution,
|
||||||
CommissionReport,
|
CommissionReport,
|
||||||
)
|
)
|
||||||
from ib_insync.objects import Position as IbPosition
|
from ib_insync.objects import Position
|
||||||
import pendulum
|
import pendulum
|
||||||
|
|
||||||
from piker import config
|
from piker import config
|
||||||
from piker.pp import (
|
from piker import pp
|
||||||
Position,
|
|
||||||
Transaction,
|
|
||||||
open_trade_ledger,
|
|
||||||
open_pps,
|
|
||||||
PpTable,
|
|
||||||
)
|
|
||||||
from piker.log import get_console_log
|
from piker.log import get_console_log
|
||||||
from piker.clearing._messages import (
|
from piker.clearing._messages import (
|
||||||
BrokerdOrder,
|
BrokerdOrder,
|
||||||
|
@ -73,8 +66,7 @@ from piker.data._source import Symbol
|
||||||
from .api import (
|
from .api import (
|
||||||
_accounts2clients,
|
_accounts2clients,
|
||||||
# _adhoc_futes_set,
|
# _adhoc_futes_set,
|
||||||
con2fqsn,
|
_adhoc_symbol_map,
|
||||||
# _adhoc_symbol_map,
|
|
||||||
log,
|
log,
|
||||||
get_config,
|
get_config,
|
||||||
open_client_proxies,
|
open_client_proxies,
|
||||||
|
@ -84,12 +76,49 @@ from .api import (
|
||||||
|
|
||||||
|
|
||||||
def pack_position(
|
def pack_position(
|
||||||
pos: IbPosition
|
pos: Position
|
||||||
|
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
|
|
||||||
con = pos.contract
|
con = pos.contract
|
||||||
fqsn, calc_price = con2fqsn(con)
|
|
||||||
|
if isinstance(con, Option):
|
||||||
|
# TODO: option symbol parsing and sane display:
|
||||||
|
symbol = con.localSymbol.replace(' ', '')
|
||||||
|
|
||||||
|
else:
|
||||||
|
# TODO: lookup fqsn even for derivs.
|
||||||
|
symbol = con.symbol.lower()
|
||||||
|
|
||||||
|
# TODO: probably write a mofo exchange mapper routine since ib
|
||||||
|
# can't get it's shit together like, ever.
|
||||||
|
|
||||||
|
# try our best to figure out the exchange / venue
|
||||||
|
exch = (con.primaryExchange or con.exchange).lower()
|
||||||
|
if not exch:
|
||||||
|
|
||||||
|
if isinstance(con, Forex):
|
||||||
|
# bc apparently it's not in the contract obj?
|
||||||
|
exch = 'idealfx'
|
||||||
|
|
||||||
|
else:
|
||||||
|
# for wtv cucked reason some futes don't show their
|
||||||
|
# exchange (like CL.NYMEX) ...
|
||||||
|
entry = _adhoc_symbol_map.get(
|
||||||
|
con.symbol or con.localSymbol
|
||||||
|
)
|
||||||
|
if entry:
|
||||||
|
meta, kwargs = entry
|
||||||
|
cid = meta.get('conId')
|
||||||
|
if cid:
|
||||||
|
assert con.conId == meta['conId']
|
||||||
|
exch = meta['exchange']
|
||||||
|
|
||||||
|
assert exch, f'No clue:\n {con}'
|
||||||
|
fqsn = '.'.join((symbol, exch))
|
||||||
|
|
||||||
|
expiry = con.lastTradeDateOrContractMonth
|
||||||
|
if expiry:
|
||||||
|
fqsn += f'.{expiry}'
|
||||||
|
|
||||||
# TODO: options contracts into a sane format..
|
# TODO: options contracts into a sane format..
|
||||||
return (
|
return (
|
||||||
|
@ -276,10 +305,12 @@ async def update_ledger_from_api_trades(
|
||||||
client: Union[Client, MethodProxy],
|
client: Union[Client, MethodProxy],
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
dict[str, Transaction],
|
dict[str, pp.Transaction],
|
||||||
dict[str, dict],
|
dict[str, dict],
|
||||||
]:
|
]:
|
||||||
|
|
||||||
|
conf = get_config()
|
||||||
|
|
||||||
# XXX; ERRGGG..
|
# XXX; ERRGGG..
|
||||||
# pack in the "primary/listing exchange" value from a
|
# pack in the "primary/listing exchange" value from a
|
||||||
# contract lookup since it seems this isn't available by
|
# contract lookup since it seems this isn't available by
|
||||||
|
@ -300,33 +331,39 @@ async def update_ledger_from_api_trades(
|
||||||
|
|
||||||
entry['listingExchange'] = pexch
|
entry['listingExchange'] = pexch
|
||||||
|
|
||||||
conf = get_config()
|
|
||||||
entries = trades_to_ledger_entries(
|
entries = trades_to_ledger_entries(
|
||||||
conf['accounts'].inverse,
|
conf['accounts'].inverse,
|
||||||
trade_entries,
|
trade_entries,
|
||||||
)
|
)
|
||||||
# normalize recent session's trades to the `Transaction` type
|
|
||||||
trans_by_acct: dict[str, dict[str, Transaction]] = {}
|
# write recent session's trades to the user's (local) ledger file.
|
||||||
|
records: dict[str, pp.Transactions] = {}
|
||||||
|
|
||||||
for acctid, trades_by_id in entries.items():
|
for acctid, trades_by_id in entries.items():
|
||||||
# normalize to transaction form
|
# normalize to transaction form
|
||||||
trans_by_acct[acctid] = norm_trade_records(trades_by_id)
|
records[acctid] = norm_trade_records(trades_by_id)
|
||||||
|
|
||||||
return trans_by_acct, entries
|
return records, entries
|
||||||
|
|
||||||
|
|
||||||
async def update_and_audit_msgs(
|
async def update_and_audit_msgs(
|
||||||
acctid: str, # no `ib.` prefix is required!
|
acctid: str, # no `ib.` prefix is required!
|
||||||
pps: list[Position],
|
pps: list[pp.Position],
|
||||||
cids2pps: dict[tuple[str, int], BrokerdPosition],
|
cids2pps: dict[tuple[str, int], BrokerdPosition],
|
||||||
validate: bool = False,
|
validate: bool = False,
|
||||||
|
|
||||||
) -> list[BrokerdPosition]:
|
) -> list[BrokerdPosition]:
|
||||||
|
|
||||||
msgs: list[BrokerdPosition] = []
|
msgs: list[BrokerdPosition] = []
|
||||||
|
# pps: dict[int, pp.Position] = {}
|
||||||
|
|
||||||
for p in pps:
|
for p in pps:
|
||||||
bsuid = p.bsuid
|
bsuid = p.bsuid
|
||||||
|
|
||||||
|
# build trade-session-actor local table
|
||||||
|
# of pps from unique symbol ids.
|
||||||
|
# pps[bsuid] = p
|
||||||
|
|
||||||
# retreive equivalent ib reported position message
|
# retreive equivalent ib reported position message
|
||||||
# for comparison/audit versus the piker equivalent
|
# for comparison/audit versus the piker equivalent
|
||||||
# breakeven pp calcs.
|
# breakeven pp calcs.
|
||||||
|
@ -425,8 +462,6 @@ async def trades_dialogue(
|
||||||
all_positions = []
|
all_positions = []
|
||||||
accounts = set()
|
accounts = set()
|
||||||
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
|
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
|
||||||
acctids = set()
|
|
||||||
cids2pps: dict[str, BrokerdPosition] = {}
|
|
||||||
|
|
||||||
# TODO: this causes a massive tractor bug when you run marketstored
|
# TODO: this causes a massive tractor bug when you run marketstored
|
||||||
# with ``--tsdb``... you should get:
|
# with ``--tsdb``... you should get:
|
||||||
|
@ -436,170 +471,152 @@ async def trades_dialogue(
|
||||||
# - hitting final control-c to kill daemon will lead to hang
|
# - hitting final control-c to kill daemon will lead to hang
|
||||||
# assert 0
|
# assert 0
|
||||||
|
|
||||||
# TODO: just write on teardown?
|
|
||||||
# we might also want to delegate a specific actor for
|
|
||||||
# ledger writing / reading for speed?
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as nurse,
|
trio.open_nursery() as nurse,
|
||||||
open_client_proxies() as (proxies, aioclients),
|
open_client_proxies() as (proxies, aioclients),
|
||||||
):
|
):
|
||||||
# Open a trade ledgers stack for appending trade records over
|
for account, proxy in proxies.items():
|
||||||
# multiple accounts.
|
|
||||||
# TODO: we probably want to generalize this into a "ledgers" api..
|
|
||||||
ledgers: dict[str, dict] = {}
|
|
||||||
tables: dict[str, PpTable] = {}
|
|
||||||
with (
|
|
||||||
ExitStack() as lstack,
|
|
||||||
):
|
|
||||||
for account, proxy in proxies.items():
|
|
||||||
|
|
||||||
acctid = account.strip('ib.')
|
client = aioclients[account]
|
||||||
acctids.add(acctid)
|
|
||||||
|
|
||||||
# open ledger and pptable wrapper for each
|
async def open_stream(
|
||||||
# detected account.
|
task_status: TaskStatus[
|
||||||
ledger = ledgers[acctid] = lstack.enter_context(
|
trio.abc.ReceiveChannel
|
||||||
open_trade_ledger('ib', acctid)
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
)
|
|
||||||
table = tables[acctid] = lstack.enter_context(
|
|
||||||
open_pps('ib', acctid)
|
|
||||||
)
|
|
||||||
|
|
||||||
client = aioclients[account]
|
|
||||||
|
|
||||||
async def open_trade_event_stream(
|
|
||||||
task_status: TaskStatus[
|
|
||||||
trio.abc.ReceiveChannel
|
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
|
||||||
):
|
|
||||||
# each api client has a unique event stream
|
|
||||||
async with tractor.to_asyncio.open_channel_from(
|
|
||||||
recv_trade_updates,
|
|
||||||
client=client,
|
|
||||||
) as (first, trade_event_stream):
|
|
||||||
|
|
||||||
task_status.started(trade_event_stream)
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
trade_event_stream = await nurse.start(open_trade_event_stream)
|
|
||||||
clients.append((client, trade_event_stream))
|
|
||||||
|
|
||||||
assert account in accounts_def
|
|
||||||
accounts.add(account)
|
|
||||||
|
|
||||||
# update trades ledgers for all accounts from connected
|
|
||||||
# api clients which report trades for **this session**.
|
|
||||||
trades = await proxy.trades()
|
|
||||||
(
|
|
||||||
trans_by_acct,
|
|
||||||
api_ready_for_ledger_entries,
|
|
||||||
) = await update_ledger_from_api_trades(
|
|
||||||
trades,
|
|
||||||
proxy,
|
|
||||||
)
|
|
||||||
|
|
||||||
# if new trades are detected from the API, prepare
|
|
||||||
# them for the ledger file and update the pptable.
|
|
||||||
if api_ready_for_ledger_entries:
|
|
||||||
trade_entries = api_ready_for_ledger_entries[acctid]
|
|
||||||
ledger.update(trade_entries)
|
|
||||||
trans = trans_by_acct.get(acctid)
|
|
||||||
if trans:
|
|
||||||
table.update_from_trans(trans)
|
|
||||||
|
|
||||||
# process pp value reported from ib's system. we only use these
|
|
||||||
# to cross-check sizing since average pricing on their end uses
|
|
||||||
# the so called (bs) "FIFO" style which more or less results in
|
|
||||||
# a price that's not useful for traders who want to not lose
|
|
||||||
# money.. xb
|
|
||||||
# for client in aioclients.values():
|
|
||||||
for pos in client.positions():
|
|
||||||
|
|
||||||
# collect all ib-pp reported positions so that we can be
|
|
||||||
# sure know which positions to update from the ledger if
|
|
||||||
# any are missing from the ``pps.toml``
|
|
||||||
bsuid, msg = pack_position(pos)
|
|
||||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
|
||||||
acctid = acctid.strip('ib.')
|
|
||||||
cids2pps[(acctid, bsuid)] = msg
|
|
||||||
assert msg.account in accounts, (
|
|
||||||
f'Position for unknown account: {msg.account}')
|
|
||||||
|
|
||||||
table = tables[acctid]
|
|
||||||
pp = table.pps.get(bsuid)
|
|
||||||
if (
|
|
||||||
not pp
|
|
||||||
or pp.size != msg.size
|
|
||||||
):
|
|
||||||
trans = norm_trade_records(ledger)
|
|
||||||
updated = table.update_from_trans(trans)
|
|
||||||
pp = updated[bsuid]
|
|
||||||
assert msg.size == pp.size, 'WTF'
|
|
||||||
|
|
||||||
# TODO: figure out why these don't match?
|
|
||||||
# assert pp.calc_be_price() == pp.be_price
|
|
||||||
|
|
||||||
_, closed_pps = table.dump_active('ib')
|
|
||||||
active_pps = table.pps
|
|
||||||
|
|
||||||
# load all positions from `pps.toml`, cross check with
|
|
||||||
# ib's positions data, and relay re-formatted pps as
|
|
||||||
# msgs to the ems.
|
|
||||||
# __2 cases__:
|
|
||||||
# - new trades have taken place this session that we want to
|
|
||||||
# always reprocess indempotently,
|
|
||||||
# - no new trades yet but we want to reload and audit any
|
|
||||||
# positions reported by ib's sys that may not yet be in
|
|
||||||
# piker's ``pps.toml`` state-file.
|
|
||||||
for pps in [active_pps, closed_pps]:
|
|
||||||
msgs = await update_and_audit_msgs(
|
|
||||||
acctid,
|
|
||||||
pps.values(),
|
|
||||||
cids2pps,
|
|
||||||
validate=True,
|
|
||||||
)
|
|
||||||
all_positions.extend(msg for msg in msgs)
|
|
||||||
|
|
||||||
if not all_positions and cids2pps:
|
|
||||||
raise RuntimeError(
|
|
||||||
'Positions reported by ib but not found in `pps.toml`!?\n'
|
|
||||||
f'{pformat(cids2pps)}'
|
|
||||||
)
|
|
||||||
|
|
||||||
await ctx.started((
|
|
||||||
all_positions,
|
|
||||||
tuple(name for name in accounts_def if name in accounts),
|
|
||||||
))
|
|
||||||
|
|
||||||
# write ledger with all new trades **AFTER** we've updated the
|
|
||||||
# `pps.toml` from the original ledger state!
|
|
||||||
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
|
|
||||||
ledgers[acctid].update(trades_by_id)
|
|
||||||
|
|
||||||
async with (
|
|
||||||
ctx.open_stream() as ems_stream,
|
|
||||||
trio.open_nursery() as n,
|
|
||||||
):
|
):
|
||||||
# start order request handler **before** local trades
|
# each api client has a unique event stream
|
||||||
# event loop
|
async with tractor.to_asyncio.open_channel_from(
|
||||||
n.start_soon(handle_order_requests, ems_stream, accounts_def)
|
recv_trade_updates,
|
||||||
|
client=client,
|
||||||
|
) as (first, trade_event_stream):
|
||||||
|
|
||||||
# allocate event relay tasks for each client connection
|
task_status.started(trade_event_stream)
|
||||||
for client, stream in clients:
|
await trio.sleep_forever()
|
||||||
n.start_soon(
|
|
||||||
deliver_trade_events,
|
|
||||||
stream,
|
|
||||||
ems_stream,
|
|
||||||
accounts_def,
|
|
||||||
cids2pps,
|
|
||||||
proxies,
|
|
||||||
|
|
||||||
ledgers,
|
trade_event_stream = await nurse.start(open_stream)
|
||||||
tables,
|
|
||||||
)
|
|
||||||
|
|
||||||
# block until cancelled
|
clients.append((client, trade_event_stream))
|
||||||
await trio.sleep_forever()
|
|
||||||
|
assert account in accounts_def
|
||||||
|
accounts.add(account)
|
||||||
|
|
||||||
|
cids2pps: dict[str, BrokerdPosition] = {}
|
||||||
|
update_records: dict[str, bidict] = {}
|
||||||
|
|
||||||
|
# process pp value reported from ib's system. we only use these
|
||||||
|
# to cross-check sizing since average pricing on their end uses
|
||||||
|
# the so called (bs) "FIFO" style which more or less results in
|
||||||
|
# a price that's not useful for traders who want to not lose
|
||||||
|
# money.. xb
|
||||||
|
for client in aioclients.values():
|
||||||
|
for pos in client.positions():
|
||||||
|
|
||||||
|
cid, msg = pack_position(pos)
|
||||||
|
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||||
|
acctid = acctid.strip('ib.')
|
||||||
|
cids2pps[(acctid, cid)] = msg
|
||||||
|
assert msg.account in accounts, (
|
||||||
|
f'Position for unknown account: {msg.account}')
|
||||||
|
|
||||||
|
# collect all ib-pp reported positions so that we can be
|
||||||
|
# sure know which positions to update from the ledger if
|
||||||
|
# any are missing from the ``pps.toml``
|
||||||
|
update_records.setdefault(acctid, bidict())[cid] = msg.symbol
|
||||||
|
|
||||||
|
# update trades ledgers for all accounts from
|
||||||
|
# connected api clients which report trades for **this session**.
|
||||||
|
new_trades = {}
|
||||||
|
for account, proxy in proxies.items():
|
||||||
|
trades = await proxy.trades()
|
||||||
|
(
|
||||||
|
records_by_acct,
|
||||||
|
ledger_entries,
|
||||||
|
) = await update_ledger_from_api_trades(
|
||||||
|
trades,
|
||||||
|
proxy,
|
||||||
|
)
|
||||||
|
new_trades.update(records_by_acct)
|
||||||
|
|
||||||
|
for acctid, trans in new_trades.items():
|
||||||
|
for t in trans:
|
||||||
|
bsuid = t.bsuid
|
||||||
|
if bsuid in update_records:
|
||||||
|
assert update_records[bsuid] == t.fqsn
|
||||||
|
else:
|
||||||
|
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
|
||||||
|
|
||||||
|
# load all positions from `pps.toml`, cross check with ib's
|
||||||
|
# positions data, and relay re-formatted pps as msgs to the ems.
|
||||||
|
# __2 cases__:
|
||||||
|
# - new trades have taken place this session that we want to
|
||||||
|
# always reprocess indempotently,
|
||||||
|
# - no new trades yet but we want to reload and audit any
|
||||||
|
# positions reported by ib's sys that may not yet be in
|
||||||
|
# piker's ``pps.toml`` state-file.
|
||||||
|
for acctid, to_update in update_records.items():
|
||||||
|
trans = new_trades.get(acctid)
|
||||||
|
active, closed = pp.update_pps_conf(
|
||||||
|
'ib',
|
||||||
|
acctid,
|
||||||
|
trade_records=trans,
|
||||||
|
ledger_reload=to_update,
|
||||||
|
)
|
||||||
|
for pps in [active, closed]:
|
||||||
|
msgs = await update_and_audit_msgs(
|
||||||
|
acctid,
|
||||||
|
pps.values(),
|
||||||
|
cids2pps,
|
||||||
|
validate=True,
|
||||||
|
)
|
||||||
|
all_positions.extend(msg for msg in msgs)
|
||||||
|
|
||||||
|
if not all_positions and cids2pps:
|
||||||
|
raise RuntimeError(
|
||||||
|
'Positions reported by ib but not found in `pps.toml`!?\n'
|
||||||
|
f'{pformat(cids2pps)}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# log.info(f'Loaded {len(trades)} from this session')
|
||||||
|
# TODO: write trades to local ``trades.toml``
|
||||||
|
# - use above per-session trades data and write to local file
|
||||||
|
# - get the "flex reports" working and pull historical data and
|
||||||
|
# also save locally.
|
||||||
|
|
||||||
|
await ctx.started((
|
||||||
|
all_positions,
|
||||||
|
tuple(name for name in accounts_def if name in accounts),
|
||||||
|
))
|
||||||
|
|
||||||
|
# TODO: maybe just write on teardown?
|
||||||
|
# we might also want to delegate a specific actor for
|
||||||
|
# ledger writing / reading for speed?
|
||||||
|
|
||||||
|
# write ledger with all new trades **AFTER** we've updated the
|
||||||
|
# `pps.toml` from the original ledger state!
|
||||||
|
for acctid, trades_by_id in ledger_entries.items():
|
||||||
|
with pp.open_trade_ledger('ib', acctid) as ledger:
|
||||||
|
ledger.update(trades_by_id)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
ctx.open_stream() as ems_stream,
|
||||||
|
trio.open_nursery() as n,
|
||||||
|
):
|
||||||
|
# start order request handler **before** local trades event loop
|
||||||
|
n.start_soon(handle_order_requests, ems_stream, accounts_def)
|
||||||
|
|
||||||
|
# allocate event relay tasks for each client connection
|
||||||
|
for client, stream in clients:
|
||||||
|
n.start_soon(
|
||||||
|
deliver_trade_events,
|
||||||
|
stream,
|
||||||
|
ems_stream,
|
||||||
|
accounts_def,
|
||||||
|
cids2pps,
|
||||||
|
proxies,
|
||||||
|
)
|
||||||
|
|
||||||
|
# block until cancelled
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
async def emit_pp_update(
|
async def emit_pp_update(
|
||||||
|
@ -609,44 +626,44 @@ async def emit_pp_update(
|
||||||
proxies: dict,
|
proxies: dict,
|
||||||
cids2pps: dict,
|
cids2pps: dict,
|
||||||
|
|
||||||
ledgers,
|
|
||||||
tables,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# compute and relay incrementally updated piker pp
|
# compute and relay incrementally updated piker pp
|
||||||
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
|
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
|
||||||
proxy = proxies[acctid]
|
proxy = proxies[acctid]
|
||||||
|
|
||||||
acctid = acctid.strip('ib.')
|
acctname = acctid.strip('ib.')
|
||||||
(
|
records_by_acct, ledger_entries = await update_ledger_from_api_trades(
|
||||||
records_by_acct,
|
|
||||||
api_ready_for_ledger_entries,
|
|
||||||
) = await update_ledger_from_api_trades(
|
|
||||||
[trade_entry],
|
[trade_entry],
|
||||||
proxy,
|
proxy,
|
||||||
)
|
)
|
||||||
trans = records_by_acct[acctid]
|
records = records_by_acct[acctname]
|
||||||
r = list(trans.values())[0]
|
r = records[0]
|
||||||
|
|
||||||
table = tables[acctid]
|
# update and load all positions from `pps.toml`, cross check with
|
||||||
table.update_from_trans(trans)
|
# ib's positions data, and relay re-formatted pps as msgs to the
|
||||||
_, closed = table.dump_active('ib')
|
# ems. we report both the open and closed updates in one map since
|
||||||
active = table.pps
|
# for incremental update we may have just fully closed a pp and need
|
||||||
|
# to relay that msg as well!
|
||||||
|
active, closed = pp.update_pps_conf(
|
||||||
|
'ib',
|
||||||
|
acctname,
|
||||||
|
trade_records=records,
|
||||||
|
ledger_reload={r.bsuid: r.fqsn},
|
||||||
|
)
|
||||||
|
|
||||||
# NOTE: update ledger with all new trades
|
# NOTE: write ledger with all new trades **AFTER** we've updated the
|
||||||
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
|
# `pps.toml` from the original ledger state!
|
||||||
ledger = ledgers[acctid]
|
for acctid, trades_by_id in ledger_entries.items():
|
||||||
ledger.update(trades_by_id)
|
with pp.open_trade_ledger('ib', acctid) as ledger:
|
||||||
|
ledger.update(trades_by_id)
|
||||||
|
|
||||||
# generate pp msgs and cross check with ib's positions data, relay
|
|
||||||
# re-formatted pps as msgs to the ems.
|
|
||||||
for pos in filter(
|
for pos in filter(
|
||||||
bool,
|
bool,
|
||||||
[active.get(r.bsuid), closed.get(r.bsuid)]
|
[active.get(r.bsuid), closed.get(r.bsuid)]
|
||||||
):
|
):
|
||||||
msgs = await update_and_audit_msgs(
|
msgs = await update_and_audit_msgs(
|
||||||
acctid,
|
acctname,
|
||||||
[pos],
|
[pos],
|
||||||
cids2pps,
|
cids2pps,
|
||||||
|
|
||||||
|
@ -668,9 +685,6 @@ async def deliver_trade_events(
|
||||||
cids2pps: dict[tuple[str, str], BrokerdPosition],
|
cids2pps: dict[tuple[str, str], BrokerdPosition],
|
||||||
proxies: dict[str, MethodProxy],
|
proxies: dict[str, MethodProxy],
|
||||||
|
|
||||||
ledgers,
|
|
||||||
tables,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Format and relay all trade events for a given client to emsd.
|
Format and relay all trade events for a given client to emsd.
|
||||||
|
@ -820,8 +834,6 @@ async def deliver_trade_events(
|
||||||
accounts_def,
|
accounts_def,
|
||||||
proxies,
|
proxies,
|
||||||
cids2pps,
|
cids2pps,
|
||||||
ledgers,
|
|
||||||
tables,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
case 'cost':
|
case 'cost':
|
||||||
|
@ -854,8 +866,6 @@ async def deliver_trade_events(
|
||||||
accounts_def,
|
accounts_def,
|
||||||
proxies,
|
proxies,
|
||||||
cids2pps,
|
cids2pps,
|
||||||
ledgers,
|
|
||||||
tables,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
case 'error':
|
case 'error':
|
||||||
|
@ -906,13 +916,14 @@ async def deliver_trade_events(
|
||||||
def norm_trade_records(
|
def norm_trade_records(
|
||||||
ledger: dict[str, Any],
|
ledger: dict[str, Any],
|
||||||
|
|
||||||
) -> list[Transaction]:
|
) -> list[pp.Transaction]:
|
||||||
'''
|
'''
|
||||||
Normalize a flex report or API retrieved executions
|
Normalize a flex report or API retrieved executions
|
||||||
ledger into our standard record format.
|
ledger into our standard record format.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
records: dict[str, Transaction] = {}
|
records: list[pp.Transaction] = []
|
||||||
|
|
||||||
for tid, record in ledger.items():
|
for tid, record in ledger.items():
|
||||||
|
|
||||||
conid = record.get('conId') or record['conid']
|
conid = record.get('conId') or record['conid']
|
||||||
|
@ -990,7 +1001,7 @@ def norm_trade_records(
|
||||||
# which case, we can pull the fqsn from that table (see
|
# which case, we can pull the fqsn from that table (see
|
||||||
# `trades_dialogue()` above).
|
# `trades_dialogue()` above).
|
||||||
|
|
||||||
records[tid] = Transaction(
|
records.append(pp.Transaction(
|
||||||
fqsn=fqsn,
|
fqsn=fqsn,
|
||||||
tid=tid,
|
tid=tid,
|
||||||
size=size,
|
size=size,
|
||||||
|
@ -999,7 +1010,7 @@ def norm_trade_records(
|
||||||
dt=dt,
|
dt=dt,
|
||||||
expiry=expiry,
|
expiry=expiry,
|
||||||
bsuid=conid,
|
bsuid=conid,
|
||||||
)
|
))
|
||||||
|
|
||||||
return records
|
return records
|
||||||
|
|
||||||
|
@ -1128,7 +1139,8 @@ def load_flex_trades(
|
||||||
|
|
||||||
trade_entries = report.extract('Trade')
|
trade_entries = report.extract('Trade')
|
||||||
ln = len(trade_entries)
|
ln = len(trade_entries)
|
||||||
log.info(f'Loaded {ln} trades from flex query')
|
# log.info(f'Loaded {ln} trades from flex query')
|
||||||
|
print(f'Loaded {ln} trades from flex query')
|
||||||
|
|
||||||
trades_by_account = trades_to_ledger_entries(
|
trades_by_account = trades_to_ledger_entries(
|
||||||
# get reverse map to user account names
|
# get reverse map to user account names
|
||||||
|
@ -1137,20 +1149,14 @@ def load_flex_trades(
|
||||||
source_type='flex',
|
source_type='flex',
|
||||||
)
|
)
|
||||||
|
|
||||||
for acctid in trades_by_account:
|
ledgers = {}
|
||||||
trades_by_id = trades_by_account[acctid]
|
for acctid, trades_by_id in trades_by_account.items():
|
||||||
with open_trade_ledger('ib', acctid) as ledger_dict:
|
with pp.open_trade_ledger('ib', acctid) as ledger:
|
||||||
tid_delta = set(trades_by_id) - set(ledger_dict)
|
ledger.update(trades_by_id)
|
||||||
log.info(
|
|
||||||
'New trades detected\n'
|
|
||||||
f'{pformat(tid_delta)}'
|
|
||||||
)
|
|
||||||
if tid_delta:
|
|
||||||
ledger_dict.update(
|
|
||||||
{tid: trades_by_id[tid] for tid in tid_delta}
|
|
||||||
)
|
|
||||||
|
|
||||||
return ledger_dict
|
ledgers[acctid] = ledger
|
||||||
|
|
||||||
|
return ledgers
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -301,13 +301,7 @@ async def get_bars(
|
||||||
else:
|
else:
|
||||||
|
|
||||||
log.warning('Sending CONNECTION RESET')
|
log.warning('Sending CONNECTION RESET')
|
||||||
res = await data_reset_hack(reset_type='connection')
|
await data_reset_hack(reset_type='connection')
|
||||||
if not res:
|
|
||||||
log.warning(
|
|
||||||
'NO VNC DETECTED!\n'
|
|
||||||
'Manually press ctrl-alt-f on your IB java app'
|
|
||||||
)
|
|
||||||
# break
|
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
for name, ev in [
|
for name, ev in [
|
||||||
|
@ -576,6 +570,7 @@ def normalize(
|
||||||
|
|
||||||
# check for special contract types
|
# check for special contract types
|
||||||
con = ticker.contract
|
con = ticker.contract
|
||||||
|
|
||||||
fqsn, calc_price = con2fqsn(con)
|
fqsn, calc_price = con2fqsn(con)
|
||||||
|
|
||||||
# convert named tuples to dicts so we send usable keys
|
# convert named tuples to dicts so we send usable keys
|
||||||
|
@ -847,10 +842,7 @@ async def data_reset_hack(
|
||||||
client.mouse.click()
|
client.mouse.click()
|
||||||
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
|
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
|
||||||
|
|
||||||
try:
|
await tractor.to_asyncio.run_task(vnc_click_hack)
|
||||||
await tractor.to_asyncio.run_task(vnc_click_hack)
|
|
||||||
except OSError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# we don't really need the ``xdotool`` approach any more B)
|
# we don't really need the ``xdotool`` approach any more B)
|
||||||
return True
|
return True
|
||||||
|
@ -865,24 +857,15 @@ async def open_symbol_search(
|
||||||
# TODO: load user defined symbol set locally for fast search?
|
# TODO: load user defined symbol set locally for fast search?
|
||||||
await ctx.started({})
|
await ctx.started({})
|
||||||
|
|
||||||
# async with open_data_client() as proxy:
|
async with open_data_client() as proxy:
|
||||||
async with (
|
|
||||||
open_client_proxies() as (proxies, clients),
|
|
||||||
):
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
# await tractor.breakpoint()
|
|
||||||
proxy = proxies['ib.algopaper']
|
|
||||||
|
|
||||||
last = time.time()
|
last = time.time()
|
||||||
|
|
||||||
async for pattern in stream:
|
async for pattern in stream:
|
||||||
log.info(f'received {pattern}')
|
log.debug(f'received {pattern}')
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
|
||||||
# this causes tractor hang...
|
|
||||||
# assert 0
|
|
||||||
|
|
||||||
assert pattern, 'IB can not accept blank search pattern'
|
assert pattern, 'IB can not accept blank search pattern'
|
||||||
|
|
||||||
# throttle search requests to no faster then 1Hz
|
# throttle search requests to no faster then 1Hz
|
||||||
|
@ -910,7 +893,7 @@ async def open_symbol_search(
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
log.info(f'searching for {pattern}')
|
log.debug(f'searching for {pattern}')
|
||||||
|
|
||||||
last = time.time()
|
last = time.time()
|
||||||
|
|
||||||
|
@ -921,25 +904,17 @@ async def open_symbol_search(
|
||||||
async def stash_results(target: Awaitable[list]):
|
async def stash_results(target: Awaitable[list]):
|
||||||
stock_results.extend(await target)
|
stock_results.extend(await target)
|
||||||
|
|
||||||
for i in range(10):
|
async with trio.open_nursery() as sn:
|
||||||
with trio.move_on_after(3) as cs:
|
sn.start_soon(
|
||||||
async with trio.open_nursery() as sn:
|
stash_results,
|
||||||
sn.start_soon(
|
proxy.search_symbols(
|
||||||
stash_results,
|
pattern=pattern,
|
||||||
proxy.search_symbols(
|
upto=5,
|
||||||
pattern=pattern,
|
),
|
||||||
upto=5,
|
)
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# trigger async request
|
# trigger async request
|
||||||
await trio.sleep(0)
|
await trio.sleep(0)
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
|
||||||
log.warning(f'Search timeout? {proxy._aio_ns.ib.client}')
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
# # match against our ad-hoc set immediately
|
# # match against our ad-hoc set immediately
|
||||||
# adhoc_matches = fuzzy.extractBests(
|
# adhoc_matches = fuzzy.extractBests(
|
||||||
|
|
Loading…
Reference in New Issue