Merge pull request #336 from pikers/lifo_pps_ib

LIFO/"breakeven" pps for `ib`
ib_rt_pp_update_hotfix
goodboy 2022-06-29 10:07:56 -04:00 committed by GitHub
commit d5bc43e8dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1695 additions and 341 deletions

View File

@ -38,7 +38,10 @@ from .feed import (
open_symbol_search,
stream_quotes,
)
from .broker import trades_dialogue
from .broker import (
trades_dialogue,
norm_trade_records,
)
__all__ = [
'get_client',

View File

@ -38,15 +38,21 @@ import time
from types import SimpleNamespace
from bidict import bidict
import trio
import tractor
from tractor import to_asyncio
import ib_insync as ibis
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails
from ib_insync.order import Order
from ib_insync.ticker import Ticker
from ib_insync.objects import Position
import ib_insync as ibis
from ib_insync.objects import (
Position,
Fill,
Execution,
CommissionReport,
)
from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client
import numpy as np
@ -155,30 +161,23 @@ class NonShittyIB(ibis.IB):
self.client.apiEnd += self.disconnectedEvent
# map of symbols to contract ids
_adhoc_cmdty_data_map = {
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
# NOTE: some cmdtys/metals don't have trade data like gold/usd:
# https://groups.io/g/twsapi/message/44174
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
_futes_venues = (
'GLOBEX',
'NYMEX',
'CME',
'CMECRYPTO',
'COMEX',
'CMDTY', # special name case..
)
_adhoc_futes_set = {
# equities
'nq.globex',
'mnq.globex',
'mnq.globex', # micro
'es.globex',
'mes.globex',
'mes.globex', # micro
# cypto$
'brr.cmecrypto',
@ -195,20 +194,46 @@ _adhoc_futes_set = {
# metals
'xauusd.cmdty', # gold spot
'gc.nymex',
'mgc.nymex',
'mgc.nymex', # micro
# oil & gas
'cl.nymex',
'xagusd.cmdty', # silver spot
'ni.nymex', # silver futes
'qi.comex', # mini-silver futes
}
# map of symbols to contract ids
_adhoc_symbol_map = {
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
# NOTE: some cmdtys/metals don't have trade data like gold/usd:
# https://groups.io/g/twsapi/message/44174
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
for qsn in _adhoc_futes_set:
sym, venue = qsn.split('.')
assert venue.upper() in _futes_venues, f'{venue}'
_adhoc_symbol_map[sym.upper()] = (
{'exchange': venue},
{},
)
# exchanges we don't support at the moment due to not knowing
# how to do symbol-contract lookup correctly likely due
# to not having the data feeds subscribed.
_exch_skip_list = {
'ASX', # aussie stocks
'MEXI', # mexican stocks
'VALUE', # no idea
# no idea
'VALUE',
'FUNDSERV',
'SWB2',
}
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
@ -261,27 +286,29 @@ class Client:
# NOTE: the ib.client here is "throttled" to 45 rps by default
async def trades(
self,
# api_only: bool = False,
async def trades(self) -> dict[str, Any]:
'''
Return list of trade-fills from current session in ``dict``.
) -> dict[str, Any]:
# orders = await self.ib.reqCompletedOrdersAsync(
# apiOnly=api_only
# )
fills = await self.ib.reqExecutionsAsync()
norm_fills = []
'''
fills: list[Fill] = self.ib.fills()
norm_fills: list[dict] = []
for fill in fills:
fill = fill._asdict() # namedtuple
for key, val in fill.copy().items():
if isinstance(val, Contract):
for key, val in fill.items():
match val:
case Contract() | Execution() | CommissionReport():
fill[key] = asdict(val)
norm_fills.append(fill)
return norm_fills
async def orders(self) -> list[Order]:
return await self.ib.reqAllOpenOrdersAsync(
apiOnly=False,
)
async def bars(
self,
fqsn: str,
@ -483,6 +510,14 @@ class Client:
return con
async def get_con(
self,
conid: int,
) -> Contract:
return await self.ib.qualifyContractsAsync(
ibis.Contract(conId=conid)
)
async def find_contract(
self,
pattern: str,
@ -553,7 +588,7 @@ class Client:
# commodities
elif exch == 'CMDTY': # eg. XAUUSD.CMDTY
con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym]
con_kwargs, bars_kwargs = _adhoc_symbol_map[sym]
con = ibis.Commodity(**con_kwargs)
con.bars_kwargs = bars_kwargs
@ -811,10 +846,23 @@ _scan_ignore: set[tuple[str, int]] = set()
def get_config() -> dict[str, Any]:
conf, path = config.load()
conf, path = config.load('brokers')
section = conf.get('ib')
accounts = section.get('accounts')
if not accounts:
raise ValueError(
'brokers.toml -> `ib.accounts` must be defined\n'
f'location: {path}'
)
names = list(accounts.keys())
accts = section['accounts'] = bidict(accounts)
log.info(
f'brokers.toml defines {len(accts)} accounts: '
f'{pformat(names)}'
)
if section is None:
log.warning(f'No config section found for ib in {path}')
return {}
@ -990,7 +1038,7 @@ async def load_aio_clients(
for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect()
_client_cache.pop((host, port))
_client_cache.pop((host, port), None)
async def load_clients_for_trio(
@ -1019,9 +1067,6 @@ async def load_clients_for_trio(
await asyncio.sleep(float('inf'))
_proxies: dict[str, MethodProxy] = {}
@acm
async def open_client_proxies() -> tuple[
dict[str, MethodProxy],
@ -1044,13 +1089,14 @@ async def open_client_proxies() -> tuple[
if cache_hit:
log.info(f'Re-using cached clients: {clients}')
proxies = {}
for acct_name, client in clients.items():
proxy = await stack.enter_async_context(
open_client_proxy(client),
)
_proxies[acct_name] = proxy
proxies[acct_name] = proxy
yield _proxies, clients
yield proxies, clients
def get_preferred_data_client(
@ -1199,11 +1245,13 @@ async def open_client_proxy(
event_table = {}
async with (
to_asyncio.open_channel_from(
open_aio_client_method_relay,
client=client,
event_consumers=event_table,
) as (first, chan),
trio.open_nursery() as relay_n,
):

View File

@ -26,8 +26,10 @@ from typing import (
Any,
Optional,
AsyncIterator,
Union,
)
from bidict import bidict
import trio
from trio_typing import TaskStatus
import tractor
@ -42,10 +44,13 @@ from ib_insync.order import (
from ib_insync.objects import (
Fill,
Execution,
CommissionReport,
)
from ib_insync.objects import Position
import pendulum
from piker import config
from piker import pp
from piker.log import get_console_log
from piker.clearing._messages import (
BrokerdOrder,
@ -56,13 +61,16 @@ from piker.clearing._messages import (
BrokerdFill,
BrokerdError,
)
from piker.data._source import Symbol
from .api import (
_accounts2clients,
_adhoc_futes_set,
# _adhoc_futes_set,
_adhoc_symbol_map,
log,
get_config,
open_client_proxies,
Client,
MethodProxy,
)
@ -80,29 +88,39 @@ def pack_position(
# TODO: lookup fqsn even for derivs.
symbol = con.symbol.lower()
# try our best to figure out the exchange / venue
exch = (con.primaryExchange or con.exchange).lower()
symkey = '.'.join((symbol, exch))
if not exch:
# attempt to lookup the symbol from our
# hacked set..
for sym in _adhoc_futes_set:
if symbol in sym:
symkey = sym
break
# for wtv cucked reason some futes don't show their
# exchange (like CL.NYMEX) ...
entry = _adhoc_symbol_map.get(
con.symbol or con.localSymbol
)
if entry:
meta, kwargs = entry
cid = meta.get('conId')
if cid:
assert con.conId == meta['conId']
exch = meta['exchange']
assert exch, f'No clue:\n {con}'
fqsn = '.'.join((symbol, exch))
expiry = con.lastTradeDateOrContractMonth
if expiry:
symkey += f'.{expiry}'
fqsn += f'.{expiry}'
# TODO: options contracts into a sane format..
return BrokerdPosition(
return (
con.conId,
BrokerdPosition(
broker='ib',
account=pos.account,
symbol=symkey,
symbol=fqsn,
currency=con.currency,
size=float(pos.position),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
),
)
@ -205,19 +223,35 @@ async def recv_trade_updates(
# sync with trio task
to_trio.send_nowait(None)
def push_tradesies(eventkit_obj, obj, fill=None):
"""Push events to trio task.
def push_tradesies(
eventkit_obj,
obj,
fill: Optional[Fill] = None,
report: Optional[CommissionReport] = None,
):
'''
Push events to trio task.
"""
if fill is not None:
'''
match eventkit_obj.name():
case 'orderStatusEvent':
item = ('status', obj)
case 'commissionReportEvent':
assert report
item = ('cost', report)
case 'execDetailsEvent':
# execution details event
item = ('fill', (obj, fill))
elif eventkit_obj.name() == 'positionEvent':
case 'positionEvent':
item = ('position', obj)
else:
item = ('status', obj)
case _:
log.error(f'Error unknown event {obj}')
return
log.info(f'eventkit event ->\n{pformat(item)}')
@ -233,15 +267,15 @@ async def recv_trade_updates(
'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account
# 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's
# emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report)
'commissionReportEvent',
# XXX: not sure yet if we need these
# 'updatePortfolioEvent',
# XXX: these all seem to be weird ib_insync intrernal
# XXX: these all seem to be weird ib_insync internal
# events that we probably don't care that much about
# given the internal design is wonky af..
# 'newOrderEvent',
@ -257,6 +291,149 @@ async def recv_trade_updates(
await client.ib.disconnectedEvent
async def update_ledger_from_api_trades(
trade_entries: list[dict[str, Any]],
client: Union[Client, MethodProxy],
) -> dict[str, pp.Transaction]:
# construct piker pps from trade ledger, underneath using
# LIFO style breakeven pricing calcs.
conf = get_config()
# XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by
# default from the `.fills()` method endpoint...
for entry in trade_entries:
condict = entry['contract']
conid = condict['conId']
pexch = condict['primaryExchange']
if not pexch:
cons = await client.get_con(conid=conid)
if cons:
con = cons[0]
pexch = con.primaryExchange or con.exchange
else:
# for futes it seems like the primary is always empty?
pexch = condict['exchange']
entry['listingExchange'] = pexch
entries = trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
# write recent session's trades to the user's (local) ledger file.
records: dict[str, pp.Transactions] = {}
for acctid, trades_by_id in entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
# normalize to transaction form
records[acctid] = norm_trade_records(trades_by_id)
return records
async def update_and_audit_msgs(
acctid: str, # no `ib.` prefix is required!
pps: list[pp.Position],
cids2pps: dict[tuple[str, int], BrokerdPosition],
validate: bool = False,
) -> list[BrokerdPosition]:
msgs: list[BrokerdPosition] = []
# pps: dict[int, pp.Position] = {}
for p in pps:
bsuid = p.bsuid
# build trade-session-actor local table
# of pps from unique symbol ids.
# pps[bsuid] = p
# retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent
# breakeven pp calcs.
ibppmsg = cids2pps.get((acctid, bsuid))
if ibppmsg:
msg = BrokerdPosition(
broker='ib',
# XXX: ok so this is annoying, we're relaying
# an account name with the backend suffix prefixed
# but when reading accounts from ledgers we don't
# need it and/or it's prefixed in the section
# table..
account=ibppmsg.account,
# XXX: the `.ib` is stripped..?
symbol=ibppmsg.symbol,
currency=ibppmsg.currency,
size=p.size,
avg_price=p.be_price,
)
msgs.append(msg)
if validate:
ibsize = ibppmsg.size
pikersize = msg.size
diff = pikersize - ibsize
# if ib reports a lesser pp it's not as bad since we can
# presume we're at least not more in the shit then we
# thought.
if diff:
raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
)
msg.size = ibsize
if ibppmsg.avg_price != msg.avg_price:
# TODO: make this a "propoganda" log level?
log.warning(
'The mega-cucks at IB want you to believe with their '
f'"FIFO" positioning for {msg.symbol}:\n'
f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n'
f'piker, LIFO breakeven PnL price: {msg.avg_price}'
)
else:
# make brand new message
msg = BrokerdPosition(
broker='ib',
# XXX: ok so this is annoying, we're relaying
# an account name with the backend suffix prefixed
# but when reading accounts from ledgers we don't
# need it and/or it's prefixed in the section
# table.. we should just strip this from the message
# right since `.broker` is already included?
account=f'ib.{acctid}',
# XXX: the `.ib` is stripped..?
symbol=p.symbol.front_fqsn(),
# currency=ibppmsg.currency,
size=p.size,
avg_price=p.be_price,
)
if validate and p.size:
raise ValueError(
f'UNEXPECTED POSITION ib <-> piker ledger:\n'
f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
)
msgs.append(msg)
return msgs
@tractor.context
async def trades_dialogue(
@ -277,6 +454,14 @@ async def trades_dialogue(
accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
# TODO: this causes a massive tractor bug when you run marketstored
# with ``--tsdb``... you should get:
# - first error the assertion
# - chart should get that error and die
# - pikerd goes to debugger again from trio nursery multi-error
# - hitting final control-c to kill daemon will lead to hang
# assert 0
async with (
trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
@ -306,22 +491,79 @@ async def trades_dialogue(
assert account in accounts_def
accounts.add(account)
cids2pps: dict[str, BrokerdPosition] = {}
update_records: dict[str, bidict] = {}
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose
# money.. xb
for client in aioclients.values():
for pos in client.positions():
msg = pack_position(pos)
msg.account = accounts_def.inverse[msg.account]
cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, cid)] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
all_positions.append(msg.dict())
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
update_records.setdefault(acctid, bidict())[cid] = msg.symbol
trades: list[dict] = []
for proxy in proxies.values():
trades.append(await proxy.trades())
# update trades ledgers for all accounts from
# connected api clients which report trades for **this session**.
new_trades = {}
for account, proxy in proxies.items():
trades = await proxy.trades()
new_trades.update(await update_ledger_from_api_trades(
trades,
proxy,
))
log.info(f'Loaded {len(trades)} from this session')
for acctid, trans in new_trades.items():
for t in trans:
bsuid = t.bsuid
if bsuid in update_records:
assert update_records[bsuid] == t.fqsn
else:
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
# load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for acctid, to_update in update_records.items():
trans = new_trades.get(acctid)
active, closed = pp.update_pps_conf(
'ib',
acctid,
trade_records=trans,
ledger_reload=to_update,
)
for pps in [active, closed]:
msgs = await update_and_audit_msgs(
acctid,
pps.values(),
cids2pps,
validate=True,
)
all_positions.extend(msg.dict() for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
'Positions reported by ib but not found in `pps.toml`!?\n'
f'{pformat(cids2pps)}'
)
# log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
@ -345,32 +587,89 @@ async def trades_dialogue(
deliver_trade_events,
stream,
ems_stream,
accounts_def
accounts_def,
cids2pps,
proxies,
)
# block until cancelled
await trio.sleep_forever()
async def emit_pp_update(
ems_stream: tractor.MsgStream,
trade_entry: dict,
accounts_def: bidict,
proxies: dict,
cids2pps: dict,
) -> None:
# compute and relay incrementally updated piker pp
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid]
acctname = acctid.strip('ib.')
records = (await update_ledger_from_api_trades(
[trade_entry],
proxy,
))[acctname]
r = records[0]
# update and load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as msgs to the
# ems. we report both the open and closed updates in one map since
# for incremental update we may have just fully closed a pp and need
# to relay that msg as well!
active, closed = pp.update_pps_conf(
'ib',
acctname,
trade_records=records,
ledger_reload={r.bsuid: r.fqsn},
)
for pos in filter(
bool,
[active.get(r.bsuid), closed.get(r.bsuid)]
):
msgs = await update_and_audit_msgs(
acctname,
[pos],
cids2pps,
# ib pp event might not have arrived yet
validate=False,
)
if msgs:
msg = msgs[0]
break
await ems_stream.send(msg.dict())
async def deliver_trade_events(
trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream,
accounts_def: dict[str, str],
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
cids2pps: dict[tuple[str, str], BrokerdPosition],
proxies: dict[str, MethodProxy],
) -> None:
'''Format and relay all trade events for a given client to the EMS.
'''
Format and relay all trade events for a given client to emsd.
'''
action_map = {'BOT': 'buy', 'SLD': 'sell'}
ids2fills: dict[str, dict] = {}
# TODO: for some reason we can receive a ``None`` here when the
# ib-gw goes down? Not sure exactly how that's happening looking
# at the eventkit code above but we should probably handle it...
async for event_name, item in trade_event_stream:
log.info(f'ib sending {event_name}:\n{pformat(item)}')
match event_name:
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
@ -394,7 +693,7 @@ async def deliver_trade_events(
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
if event_name == 'status':
case 'status':
# XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
@ -423,8 +722,9 @@ async def deliver_trade_events(
broker_details={'name': 'ib'},
)
await ems_stream.send(msg.dict())
elif event_name == 'fill':
case 'fill':
# for wtv reason this is a separate event type
# from IB, not sure why it's needed other then for extra
@ -438,17 +738,35 @@ async def deliver_trade_events(
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
trade: Trade
fill: Fill
# TODO: maybe we can use matching to better handle these cases.
trade, fill = item
execu: Execution = fill.execution
execid = execu.execId
# TODO: normalize out commissions details?
details = {
# TODO:
# - normalize out commissions details?
# - this is the same as the unpacking loop above in
# ``trades_to_ledger_entries()`` no?
trade_entry = ids2fills.setdefault(execid, {})
cost_already_rx = bool(trade_entry)
# if the costs report was already received this
# should be not empty right?
comms = fill.commissionReport.commission
if cost_already_rx:
assert comms
trade_entry.update(
{
'contract': asdict(fill.contract),
'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport),
'broker_time': execu.time, # supposedly server fill time
'commissionReport': asdict(fill.commissionReport),
# supposedly server fill time?
'broker_time': execu.time,
'name': 'ib',
}
)
msg = BrokerdFill(
# should match the value returned from `.submit_limit()`
@ -459,14 +777,60 @@ async def deliver_trade_events(
size=execu.shares,
price=execu.price,
broker_details=details,
broker_details=trade_entry,
# XXX: required by order mode currently
broker_time=details['broker_time'],
broker_time=trade_entry['broker_time'],
)
await ems_stream.send(msg.dict())
elif event_name == 'error':
# 2 cases:
# - fill comes first or
# - comms report comes first
comms = fill.commissionReport.commission
if comms:
# UGHHH since the commision report object might be
# filled in **after** we already serialized to dict..
# def need something better for all this.
trade_entry.update(
{'commissionReport': asdict(fill.commissionReport)}
)
if comms or cost_already_rx:
# only send a pp update once we have a cost report
await emit_pp_update(
ems_stream,
trade_entry,
accounts_def,
proxies,
cids2pps,
)
case 'cost':
cr: CommissionReport = item
execid = cr.execId
trade_entry = ids2fills.setdefault(execid, {})
fill_already_rx = bool(trade_entry)
# no fill msg has arrived yet so just fill out the
# cost report for now and when the fill arrives a pp
# msg can be emitted.
trade_entry.update(
{'commissionReport': asdict(cr)}
)
if fill_already_rx:
await emit_pp_update(
ems_stream,
trade_entry,
accounts_def,
proxies,
cids2pps,
)
case 'error':
err: dict = item
# f$#$% gawd dammit insync..
@ -480,13 +844,15 @@ async def deliver_trade_events(
# TODO: what schema for this msg if we're going to make it
# portable across all backends?
# msg = BrokerdError(**err)
continue
elif event_name == 'position':
msg = pack_position(item)
msg.account = accounts_def.inverse[msg.account]
case 'position':
elif event_name == 'event':
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict())
case 'event':
# it's either a general system status event or an external
# trade event?
@ -498,8 +864,6 @@ async def deliver_trade_events(
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
continue
# msg.reqid = 'tws-' + str(-1 * reqid)
# mark msg as from "external system"
@ -507,19 +871,200 @@ async def deliver_trade_events(
# considering multiplayer/group trades tracking
# msg.broker_details['external_src'] = 'tws'
# XXX: we always serialize to a dict for msgpack
# translations, ideally we can move to an msgspec (or other)
# encoder # that can be enabled in ``tractor`` ahead of
# time so we can pass through the message types directly.
await ems_stream.send(msg.dict())
case _:
log.error(f'WTF: {event_name}: {item}')
def norm_trade_records(
ledger: dict[str, Any],
) -> list[pp.Transaction]:
'''
Normalize a flex report or API retrieved executions
ledger into our standard record format.
'''
records: list[pp.Transaction] = []
for tid, record in ledger.items():
conid = record.get('conId') or record['conid']
comms = record.get('commission') or -1*record['ibCommission']
price = record.get('price') or record['tradePrice']
# the api doesn't do the -/+ on the quantity for you but flex
# records do.. are you fucking serious ib...!?
size = record.get('quantity') or record['shares'] * {
'BOT': 1,
'SLD': -1,
}[record['side']]
exch = record['exchange']
lexch = record.get('listingExchange')
suffix = lexch or exch
symbol = record['symbol']
# likely an opts contract record from a flex report..
# TODO: no idea how to parse ^ the strike part from flex..
# (00010000 any, or 00007500 tsla, ..)
# we probably must do the contract lookup for this?
if ' ' in symbol or '--' in exch:
underlying, _, tail = symbol.partition(' ')
suffix = exch = 'opt'
expiry = tail[:6]
# otype = tail[6]
# strike = tail[7:]
print(f'skipping opts contract {symbol}')
continue
# timestamping is way different in API records
date = record.get('date')
if not date:
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
else:
# epoch_dt = pendulum.from_timestamp(record.get('time'))
dt = pendulum.parse(date)
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
instr = record.get('assetCategory')
if instr == 'FUT':
symbol = record['description'][:3]
# try to build out piker fqsn from record.
expiry = record.get(
'lastTradeDateOrContractMonth') or record.get('expiry')
if expiry:
expiry = str(expiry).strip(' ')
suffix = f'{exch}.{expiry}'
expiry = pendulum.parse(expiry)
fqsn = Symbol.from_fqsn(
fqsn=f'{symbol}.{suffix}.ib',
info={},
).front_fqsn().rstrip('.ib')
# NOTE: for flex records the normal fields for defining an fqsn
# sometimes won't be available so we rely on two approaches for
# the "reverse lookup" of piker style fqsn keys:
# - when dealing with API trade records received from
# `IB.trades()` we do a contract lookup at he time of processing
# - when dealing with flex records, it is assumed the record
# is at least a day old and thus the TWS position reporting system
# should already have entries if the pps are still open, in
# which case, we can pull the fqsn from that table (see
# `trades_dialogue()` above).
records.append(pp.Transaction(
fqsn=fqsn,
tid=tid,
size=size,
price=price,
cost=comms,
dt=dt,
expiry=expiry,
bsuid=conid,
))
return records
def trades_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
source_type: str = 'api',
) -> dict:
'''
Convert either of API execution objects or flex report
entry objects into ``dict`` form, pretty much straight up
without modification.
'''
trades_by_account = {}
for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api':
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }
# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, val in t.items():
match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)
case 'time':
# ib has wack ns timestamps, or is that us?
continue
case _:
entry[section] = val
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt)
acctid = accounts[entry['acctNumber']]
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
return trades_by_account
def load_flex_trades(
path: Optional[str] = None,
) -> dict[str, str]:
) -> dict[str, Any]:
from pprint import pprint
from ib_insync import flexreport, util
conf = get_config()
@ -555,36 +1100,38 @@ def load_flex_trades(
report = flexreport.FlexReport(path=path)
trade_entries = report.extract('Trade')
trades = {
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave this as an ``int``?
str(t.__dict__['tradeID']): t.__dict__
for t in trade_entries
}
ln = len(trade_entries)
# log.info(f'Loaded {ln} trades from flex query')
print(f'Loaded {ln} trades from flex query')
ln = len(trades)
log.info(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names
conf['accounts'].inverse,
trade_entries,
source_type='flex',
)
trades_by_account = {}
for tid, trade in trades.items():
trades_by_account.setdefault(
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
str(trade['accountId']), {}
)[tid] = trade
ledgers = {}
for acctid, trades_by_id in trades_by_account.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
section = {'ib': trades_by_account}
pprint(section)
ledgers[acctid] = ledger
# TODO: load the config first and append in
# the new trades loaded here..
try:
config.write(section, 'trades')
except KeyError:
import pdbpp; pdbpp.set_trace() # noqa
return ledgers
if __name__ == '__main__':
import sys
import os
args = sys.argv
if len(args) > 1:
args = args[1:]
for arg in args:
path = os.path.abspath(arg)
load_flex_trades(path=path)
else:
# expect brokers.toml to have an entry and
# pull from the web service.
load_flex_trades()

View File

@ -217,8 +217,8 @@ async def get_bars(
)
elif (
err.code == 162
and 'HMDS query returned no data' in err.message
err.code == 162 and
'HMDS query returned no data' in err.message
):
# XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt
@ -237,6 +237,13 @@ async def get_bars(
frame_size=2000,
)
# elif (
# err.code == 162 and
# 'Trading TWS session is connected from a different IP address' in err.message
# ):
# log.warning("ignoring ip address warning")
# continue
elif _pacing in msg:
log.warning(
@ -909,17 +916,17 @@ async def open_symbol_search(
# trigger async request
await trio.sleep(0)
# match against our ad-hoc set immediately
adhoc_matches = fuzzy.extractBests(
pattern,
list(_adhoc_futes_set),
score_cutoff=90,
)
log.info(f'fuzzy matched adhocs: {adhoc_matches}')
adhoc_match_results = {}
if adhoc_matches:
# TODO: do we need to pull contract details?
adhoc_match_results = {i[0]: {} for i in adhoc_matches}
# # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extractBests(
# pattern,
# list(_adhoc_futes_set),
# score_cutoff=90,
# )
# log.info(f'fuzzy matched adhocs: {adhoc_matches}')
# adhoc_match_results = {}
# if adhoc_matches:
# # TODO: do we need to pull contract details?
# adhoc_match_results = {i[0]: {} for i in adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}')
stock_matches = fuzzy.extractBests(
@ -928,7 +935,8 @@ async def open_symbol_search(
score_cutoff=50,
)
matches = adhoc_match_results | {
# matches = adhoc_match_results | {
matches = {
item[0]: {} for item in stock_matches
}
# TODO: we used to deliver contract details

View File

@ -23,53 +23,10 @@ from typing import Optional
from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol
from ._messages import BrokerdPosition, Status
class Position(BaseModel):
'''
Basic pp (personal position) model with attached fills history.
This type should be IPC wire ready?
'''
symbol: Symbol
# last size and avg entry price
size: float
avg_price: float # TODO: contextual pricing
# ordered record of known constituent trade messages
fills: list[Status] = []
def update_from_msg(
self,
msg: BrokerdPosition,
) -> None:
# XXX: better place to do this?
symbol = self.symbol
lot_size_digits = symbol.lot_size_digits
avg_price, size = (
round(msg['avg_price'], ndigits=symbol.tick_size_digits),
round(msg['size'], ndigits=lot_size_digits),
)
self.avg_price = avg_price
self.size = size
@property
def dsize(self) -> float:
'''
The "dollar" size of the pp, normally in trading (fiat) unit
terms.
'''
return self.avg_price * self.size
from ..pp import Position
_size_units = bidict({
@ -173,7 +130,7 @@ class Allocator(BaseModel):
l_sub_pp = self.units_limit - abs_live_size
elif size_unit == 'currency':
live_cost_basis = abs_live_size * live_pp.avg_price
live_cost_basis = abs_live_size * live_pp.be_price
slot_size = currency_per_slot / price
l_sub_pp = (self.currency_limit - live_cost_basis) / price
@ -205,7 +162,7 @@ class Allocator(BaseModel):
if size_unit == 'currency':
# compute the "projected" limit's worth of units at the
# current pp (weighted) price:
slot_size = currency_per_slot / live_pp.avg_price
slot_size = currency_per_slot / live_pp.be_price
else:
slot_size = u_per_slot
@ -244,7 +201,12 @@ class Allocator(BaseModel):
if order_size < slot_size:
# compute a fractional slots size to display
slots_used = self.slots_used(
Position(symbol=sym, size=order_size, avg_price=price)
Position(
symbol=sym,
size=order_size,
be_price=price,
bsuid=sym,
)
)
return {
@ -271,8 +233,8 @@ class Allocator(BaseModel):
abs_pp_size = abs(pp.size)
if self.size_unit == 'currency':
# live_currency_size = size or (abs_pp_size * pp.avg_price)
live_currency_size = abs_pp_size * pp.avg_price
# live_currency_size = size or (abs_pp_size * pp.be_price)
live_currency_size = abs_pp_size * pp.be_price
prop = live_currency_size / self.currency_limit
else:
@ -342,7 +304,7 @@ def mk_allocator(
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':
startup_size = startup_pp.size * startup_pp.avg_price
startup_size = startup_pp.size * startup_pp.be_price
if startup_size > alloc.currency_limit:
alloc.currency_limit = round(startup_size, ndigits=2)

View File

@ -258,6 +258,6 @@ class BrokerdPosition(BaseModel):
broker: str
account: str
symbol: str
currency: str
size: float
avg_price: float
currency: str = ''

View File

@ -31,6 +31,8 @@ import tractor
from dataclasses import dataclass
from .. import data
from ..data._source import Symbol
from ..pp import Position
from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
from ..log import get_logger
@ -257,29 +259,14 @@ class PaperBoi:
)
)
# "avg position price" calcs
# TODO: eventually it'd be nice to have a small set of routines
# to do this stuff from a sequence of cleared orders to enable
# so called "contextual positions".
new_size = size + pp_msg.size
# old size minus the new size gives us size differential with
# +ve -> increase in pp size
# -ve -> decrease in pp size
size_diff = abs(new_size) - abs(pp_msg.size)
if new_size == 0:
pp_msg.avg_price = 0
elif size_diff > 0:
# only update the "average position price" when the position
# size increases not when it decreases (i.e. the position is
# being made smaller)
pp_msg.avg_price = (
abs(size) * price + pp_msg.avg_price * abs(pp_msg.size)
) / abs(new_size)
pp_msg.size = new_size
# delegate update to `.pp.Position.lifo_update()`
pp = Position(
Symbol(key=symbol),
size=pp_msg.size,
be_price=pp_msg.avg_price,
bsuid=symbol,
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict())
@ -390,7 +377,8 @@ async def handle_order_requests(
account = request_msg['account']
if account != 'paper':
log.error(
'This is a paper account, only a `paper` selection is valid'
'This is a paper account,'
' only a `paper` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
@ -464,7 +452,7 @@ async def trades_dialogue(
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)
await ctx.started(({}, {'paper',}))
await ctx.started(({}, ['paper']))
async with (
ctx.open_stream() as ems_stream,

View File

@ -83,9 +83,9 @@ def pikerd(loglevel, host, tl, pdb, tsdb):
)
log.info(
f'`marketstore` up!\n'
f'`marketstored` pid: {pid}\n'
f'docker container id: {cid}\n'
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)

View File

@ -21,6 +21,7 @@ Broker configuration mgmt.
import platform
import sys
import os
from os import path
from os.path import dirname
import shutil
from typing import Optional
@ -111,6 +112,7 @@ if _parent_user:
_conf_names: set[str] = {
'brokers',
'pps',
'trades',
'watchlists',
}
@ -147,19 +149,21 @@ def get_conf_path(
conf_name: str = 'brokers',
) -> str:
"""Return the default config path normally under
``~/.config/piker`` on linux.
'''
Return the top-level default config path normally under
``~/.config/piker`` on linux for a given ``conf_name``, the config
name.
Contains files such as:
- brokers.toml
- pp.toml
- watchlists.toml
- trades.toml
# maybe coming soon ;)
- signals.toml
- strats.toml
"""
'''
assert conf_name in _conf_names
fn = _conf_fn_w_ext(conf_name)
return os.path.join(
@ -173,7 +177,7 @@ def repodir():
Return the abspath to the repo directory.
'''
dirpath = os.path.abspath(
dirpath = path.abspath(
# we're 3 levels down in **this** module file
dirname(dirname(os.path.realpath(__file__)))
)
@ -182,7 +186,9 @@ def repodir():
def load(
conf_name: str = 'brokers',
path: str = None
path: str = None,
**tomlkws,
) -> (dict, str):
'''
@ -190,6 +196,7 @@ def load(
'''
path = path or get_conf_path(conf_name)
if not os.path.isfile(path):
fn = _conf_fn_w_ext(conf_name)
@ -202,8 +209,11 @@ def load(
# if one exists.
if os.path.isfile(template):
shutil.copyfile(template, path)
else:
with open(path, 'w'):
pass # touch
config = toml.load(path)
config = toml.load(path, **tomlkws)
log.debug(f"Read config file {path}")
return config, path
@ -212,6 +222,7 @@ def write(
config: dict, # toml config as dict
name: str = 'brokers',
path: str = None,
**toml_kwargs,
) -> None:
''''
@ -235,11 +246,14 @@ def write(
f"{path}"
)
with open(path, 'w') as cf:
return toml.dump(config, cf)
return toml.dump(
config,
cf,
**toml_kwargs,
)
def load_accounts(
providers: Optional[list[str]] = None
) -> bidict[str, Optional[str]]:

View File

@ -23,7 +23,7 @@ import decimal
from bidict import bidict
import numpy as np
from pydantic import BaseModel
from msgspec import Struct
# from numba import from_dtype
@ -126,7 +126,7 @@ def unpack_fqsn(fqsn: str) -> tuple[str, str, str]:
)
class Symbol(BaseModel):
class Symbol(Struct):
'''
I guess this is some kinda container thing for dealing with
all the different meta-data formats from brokers?
@ -152,9 +152,7 @@ class Symbol(BaseModel):
info: dict[str, Any],
suffix: str = '',
# XXX: like wtf..
# ) -> 'Symbol':
) -> None:
) -> Symbol:
tick_size = info.get('price_tick_size', 0.01)
lot_tick_size = info.get('lot_tick_size', 0.0)
@ -175,9 +173,7 @@ class Symbol(BaseModel):
fqsn: str,
info: dict[str, Any],
# XXX: like wtf..
# ) -> 'Symbol':
) -> None:
) -> Symbol:
broker, key, suffix = unpack_fqsn(fqsn)
return cls.from_broker_info(
broker,
@ -240,7 +236,7 @@ class Symbol(BaseModel):
'''
tokens = self.tokens()
fqsn = '.'.join(tokens)
fqsn = '.'.join(map(str.lower, tokens))
return fqsn
def iterfqsns(self) -> list[str]:

781
piker/pp.py 100644
View File

@ -0,0 +1,781 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Personal/Private position parsing, calculating, summarizing in a way
that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends)
'''
from collections import deque
from contextlib import contextmanager as cm
# from pprint import pformat
import os
from os import path
from math import copysign
import re
import time
from typing import (
Any,
Optional,
Union,
)
from msgspec import Struct
import pendulum
from pendulum import datetime, now
import tomli
import toml
from . import config
from .brokers import get_brokermod
from .clearing._messages import BrokerdPosition, Status
from .data._source import Symbol
from .log import get_logger
log = get_logger(__name__)
@cm
def open_trade_ledger(
broker: str,
account: str,
) -> str:
'''
Indempotently create and read in a trade log file from the
``<configuration_dir>/ledgers/`` directory.
Files are named per broker account of the form
``<brokername>_<accountname>.toml``. The ``accountname`` here is the
name as defined in the user's ``brokers.toml`` config.
'''
ldir = path.join(config._config_dir, 'ledgers')
if not path.isdir(ldir):
os.makedirs(ldir)
fname = f'trades_{broker}_{account}.toml'
tradesfile = path.join(ldir, fname)
if not path.isfile(tradesfile):
log.info(
f'Creating new local trades ledger: {tradesfile}'
)
with open(tradesfile, 'w') as cf:
pass # touch
with open(tradesfile, 'rb') as cf:
start = time.time()
ledger = tomli.load(cf)
print(f'Ledger load took {time.time() - start}s')
cpy = ledger.copy()
try:
yield cpy
finally:
if cpy != ledger:
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ledger for {tradesfile}:\n')
ledger.update(cpy)
# we write on close the mutated ledger data
with open(tradesfile, 'w') as cf:
return toml.dump(ledger, cf)
class Transaction(Struct):
# TODO: should this be ``.to`` (see below)?
fqsn: str
tid: Union[str, int] # unique transaction id
size: float
price: float
cost: float # commisions or other additional costs
dt: datetime
expiry: Optional[datetime] = None
# optional key normally derived from the broker
# backend which ensures the instrument-symbol this record
# is for is truly unique.
bsuid: Optional[Union[str, int]] = None
# optional fqsn for the source "asset"/money symbol?
# from: Optional[str] = None
class Position(Struct):
'''
Basic pp (personal/piker position) model with attached clearing
transaction history.
'''
symbol: Symbol
# can be +ve or -ve for long/short
size: float
# "breakeven price" above or below which pnl moves above and below
# zero for the entirety of the current "trade state".
be_price: float
# unique backend symbol id
bsuid: str
# ordered record of known constituent trade messages
clears: dict[
Union[str, int, Status], # trade id
dict[str, Any], # transaction history summaries
] = {}
expiry: Optional[datetime] = None
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def to_pretoml(self) -> dict:
'''
Prep this position's data contents for export to toml including
re-structuring of the ``.clears`` table to an array of
inline-subtables for better ``pps.toml`` compactness.
'''
d = self.to_dict()
clears = d.pop('clears')
expiry = d.pop('expiry')
if expiry:
d['expiry'] = str(expiry)
clears_list = []
for tid, data in clears.items():
inline_table = toml.TomlDecoder().get_empty_inline_table()
inline_table['tid'] = tid
for k, v in data.items():
inline_table[k] = v
clears_list.append(inline_table)
d['clears'] = clears_list
return d
def update_from_msg(
self,
msg: BrokerdPosition,
) -> None:
# XXX: better place to do this?
symbol = self.symbol
lot_size_digits = symbol.lot_size_digits
be_price, size = (
round(
msg['avg_price'],
ndigits=symbol.tick_size_digits
),
round(
msg['size'],
ndigits=lot_size_digits
),
)
self.be_price = be_price
self.size = size
@property
def dsize(self) -> float:
'''
The "dollar" size of the pp, normally in trading (fiat) unit
terms.
'''
return self.be_price * self.size
def update(
self,
t: Transaction,
) -> None:
self.clears[t.tid] = {
'cost': t.cost,
'price': t.price,
'size': t.size,
'dt': str(t.dt),
}
def lifo_update(
self,
size: float,
price: float,
cost: float = 0,
# TODO: idea: "real LIFO" dynamic positioning.
# - when a trade takes place where the pnl for
# the (set of) trade(s) is below the breakeven price
# it may be that the trader took a +ve pnl on a short(er)
# term trade in the same account.
# - in this case we could recalc the be price to
# be reverted back to it's prior value before the nearest term
# trade was opened.?
# dynamic_breakeven_price: bool = False,
) -> (float, float):
'''
Incremental update using a LIFO-style weighted mean.
'''
# "avg position price" calcs
# TODO: eventually it'd be nice to have a small set of routines
# to do this stuff from a sequence of cleared orders to enable
# so called "contextual positions".
new_size = self.size + size
# old size minus the new size gives us size diff with
# +ve -> increase in pp size
# -ve -> decrease in pp size
size_diff = abs(new_size) - abs(self.size)
if new_size == 0:
self.be_price = 0
elif size_diff > 0:
# XXX: LOFI incremental update:
# only update the "average price" when
# the size increases not when it decreases (i.e. the
# position is being made smaller)
self.be_price = (
# weight of current exec = (size * price) + cost
(abs(size) * price)
+
(copysign(1, new_size) * cost) # transaction cost
+
# weight of existing be price
self.be_price * abs(self.size) # weight of previous pp
) / abs(new_size) # normalized by the new size: weighted mean.
self.size = new_size
return new_size, self.be_price
def minimize_clears(
self,
) -> dict[str, dict]:
'''
Minimize the position's clears entries by removing
all transactions before the last net zero size to avoid
unecessary history irrelevant to the current pp state.
'''
size: float = 0
clears_since_zero: deque[tuple(str, dict)] = deque()
# scan for the last "net zero" position by
# iterating clears in reverse.
for tid, clear in reversed(self.clears.items()):
size += clear['size']
clears_since_zero.appendleft((tid, clear))
if size == 0:
break
self.clears = dict(clears_since_zero)
return self.clears
def update_pps(
records: dict[str, Transaction],
pps: Optional[dict[str, Position]] = None
) -> dict[str, Position]:
'''
Compile a set of positions from a trades ledger.
'''
pps: dict[str, Position] = pps or {}
# lifo update all pps from records
for r in records:
pp = pps.setdefault(
r.bsuid,
# if no existing pp, allocate fresh one.
Position(
Symbol.from_fqsn(
r.fqsn,
info={},
),
size=0.0,
be_price=0.0,
bsuid=r.bsuid,
expiry=r.expiry,
)
)
# don't do updates for ledger records we already have
# included in the current pps state.
if r.tid in pp.clears:
# NOTE: likely you'll see repeats of the same
# ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from
# the current session, so we need to make sure we don't
# "double count" these in pp calculations.
continue
# lifo style "breakeven" price calc
pp.lifo_update(
r.size,
r.price,
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
# it will be dynamic based on exit stratetgy).
cost=2*r.cost,
)
# track clearing data
pp.update(r)
assert len(set(pp.clears)) == len(pp.clears)
return pps
def load_pps_from_ledger(
brokername: str,
acctname: str,
# post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None,
) -> dict[str, Position]:
'''
Open a ledger file by broker name and account and read in and
process any trade records into our normalized ``Transaction``
form and then pass these into the position processing routine
and deliver the two dict-sets of the active and closed pps.
'''
with open_trade_ledger(
brokername,
acctname,
) as ledger:
if not ledger:
# null case, no ledger file with content
return {}
brokermod = get_brokermod(brokername)
src_records = brokermod.norm_trade_records(ledger)
if filter_by:
bsuids = set(filter_by)
records = list(filter(lambda r: r.bsuid in bsuids, src_records))
else:
records = src_records
return update_pps(records)
def get_pps(
brokername: str,
acctids: Optional[set[str]] = set(),
) -> dict[str, dict[str, Position]]:
'''
Read out broker-specific position entries from
incremental update file: ``pps.toml``.
'''
conf, path = config.load(
'pps',
# load dicts as inlines to preserve compactness
# _dict=toml.decoder.InlineTableDict,
)
all_active = {}
all_closed = {}
# try to load any ledgers if no section found
bconf, path = config.load('brokers')
accounts = bconf[brokername]['accounts']
for account in accounts:
# TODO: instead of this filter we could
# always send all known pps but just not audit
# them since an active client might not be up?
if (
acctids and
f'{brokername}.{account}' not in acctids
):
continue
active, closed = update_pps_conf(brokername, account)
all_active.setdefault(account, {}).update(active)
all_closed.setdefault(account, {}).update(closed)
return all_active, all_closed
# TODO: instead see if we can hack tomli and tomli-w to do the same:
# - https://github.com/hukkin/tomli
# - https://github.com/hukkin/tomli-w
class PpsEncoder(toml.TomlEncoder):
'''
Special "styled" encoder that makes a ``pps.toml`` redable and
compact by putting `.clears` tables inline and everything else
flat-ish.
'''
separator = ','
def dump_list(self, v):
'''
Dump an inline list with a newline after every element and
with consideration for denoted inline table types.
'''
retval = "[\n"
for u in v:
if isinstance(u, toml.decoder.InlineTableDict):
out = self.dump_inline_table(u)
else:
out = str(self.dump_value(u))
retval += " " + out + "," + "\n"
retval += "]"
return retval
def dump_inline_table(self, section):
"""Preserve inline table in its compact syntax instead of expanding
into subsection.
https://github.com/toml-lang/toml#user-content-inline-table
"""
val_list = []
for k, v in section.items():
# if isinstance(v, toml.decoder.InlineTableDict):
if isinstance(v, dict):
val = self.dump_inline_table(v)
else:
val = str(self.dump_value(v))
val_list.append(k + " = " + val)
retval = "{ " + ", ".join(val_list) + " }"
return retval
def dump_sections(self, o, sup):
retstr = ""
if sup != "" and sup[-1] != ".":
sup += '.'
retdict = self._dict()
arraystr = ""
for section in o:
qsection = str(section)
value = o[section]
if not re.match(r'^[A-Za-z0-9_-]+$', section):
qsection = toml.encoder._dump_str(section)
# arrayoftables = False
if (
self.preserve
and isinstance(value, toml.decoder.InlineTableDict)
):
retstr += (
qsection
+
" = "
+
self.dump_inline_table(o[section])
+
'\n' # only on the final terminating left brace
)
# XXX: this code i'm pretty sure is just blatantly bad
# and/or wrong..
# if isinstance(o[section], list):
# for a in o[section]:
# if isinstance(a, dict):
# arrayoftables = True
# if arrayoftables:
# for a in o[section]:
# arraytabstr = "\n"
# arraystr += "[[" + sup + qsection + "]]\n"
# s, d = self.dump_sections(a, sup + qsection)
# if s:
# if s[0] == "[":
# arraytabstr += s
# else:
# arraystr += s
# while d:
# newd = self._dict()
# for dsec in d:
# s1, d1 = self.dump_sections(d[dsec], sup +
# qsection + "." +
# dsec)
# if s1:
# arraytabstr += ("[" + sup + qsection +
# "." + dsec + "]\n")
# arraytabstr += s1
# for s1 in d1:
# newd[dsec + "." + s1] = d1[s1]
# d = newd
# arraystr += arraytabstr
elif isinstance(value, dict):
retdict[qsection] = o[section]
elif o[section] is not None:
retstr += (
qsection
+
" = "
+
str(self.dump_value(o[section]))
)
# if not isinstance(value, dict):
if not isinstance(value, toml.decoder.InlineTableDict):
# inline tables should not contain newlines:
# https://toml.io/en/v1.0.0#inline-table
retstr += '\n'
else:
raise ValueError(value)
retstr += arraystr
return (retstr, retdict)
def load_pps_from_toml(
brokername: str,
acctid: str,
# XXX: there is an edge case here where we may want to either audit
# the retrieved ``pps.toml`` output or reprocess it since there was
# an error on write on the last attempt to update the state file
# even though the ledger *was* updated. For this cases we allow the
# caller to pass in a symbol set they'd like to reload from the
# underlying ledger to be reprocessed in computing pps state.
reload_records: Optional[dict[str, str]] = None,
update_from_ledger: bool = False,
) -> tuple[dict, dict[str, Position]]:
'''
Load and marshal to objects all pps from either an existing
``pps.toml`` config, or from scratch from a ledger file when
none yet exists.
'''
conf, path = config.load('pps')
brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {})
pp_objs = {}
# no pps entry yet for this broker/account so parse any available
# ledgers to build a brand new pps state.
if not pps or update_from_ledger:
pp_objs = load_pps_from_ledger(
brokername,
acctid,
)
# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
elif (
pps and reload_records
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
pp_objs = load_pps_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
if not pps:
log.warning(
f'No trade history could be loaded for {brokername}:{acctid}'
)
# unmarshal/load ``pps.toml`` config entries into object form.
for fqsn, entry in pps.items():
bsuid = entry['bsuid']
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our
# ``pps.toml``).
clears = {}
for clears_table in clears_list:
tid = clears_table.pop('tid')
clears[tid] = clears_table
size = entry['size']
# TODO: an audit system for existing pps entries?
# if not len(clears) == abs(size):
# pp_objs = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=reload_records,
# )
# reason = 'size <-> len(clears) mismatch'
# raise ValueError(
# '`pps.toml` entry is invalid:\n'
# f'{fqsn}\n'
# f'{pformat(entry)}'
# )
expiry = entry.get('expiry')
if expiry:
expiry = pendulum.parse(expiry)
pp_objs[bsuid] = Position(
Symbol.from_fqsn(fqsn, info={}),
size=size,
be_price=entry['be_price'],
expiry=expiry,
bsuid=entry['bsuid'],
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
# already included in the current incremental update
# state, since today's records may have already been
# processed!
clears=clears,
)
return conf, pp_objs
def update_pps_conf(
brokername: str,
acctid: str,
trade_records: Optional[list[Transaction]] = None,
ledger_reload: Optional[dict[str, str]] = None,
) -> tuple[
dict[str, Position],
dict[str, Position],
]:
# this maps `.bsuid` values to positions
pp_objs: dict[Union[str, int], Position]
if trade_records and ledger_reload:
for r in trade_records:
ledger_reload[r.bsuid] = r.fqsn
conf, pp_objs = load_pps_from_toml(
brokername,
acctid,
reload_records=ledger_reload,
)
# update all pp objects from any (new) trade records which
# were passed in (aka incremental update case).
if trade_records:
pp_objs = update_pps(
trade_records,
pps=pp_objs,
)
pp_entries = {} # dict-serialize all active pps
# NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react
# based on the closure.
closed_pp_objs: dict[str, Position] = {}
for bsuid in list(pp_objs):
pp = pp_objs[bsuid]
pp.minimize_clears()
if (
pp.size == 0
# drop time-expired positions (normally derivatives)
or (pp.expiry and pp.expiry < now())
):
# if expired the position is closed
pp.size = 0
# position is already closed aka "net zero"
closed_pp = pp_objs.pop(bsuid, None)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
else:
# serialize to pre-toml form
asdict = pp.to_pretoml()
if pp.expiry is None:
asdict.pop('expiry', None)
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
print(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.rstrip(f'.{brokername}')
pp_entries[brokerless_key] = asdict
conf[brokername][acctid] = pp_entries
# TODO: why tf haven't they already done this for inline tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[toml.decoder.InlineTableDict] = enc.dump_inline_table
config.write(
conf,
'pps',
encoder=enc,
)
# deliver object form of all pps in table to caller
return pp_objs, closed_pp_objs
if __name__ == '__main__':
import sys
args = sys.argv
assert len(args) > 1, 'Specifiy account(s) from `brokers.toml`'
args = args[1:]
for acctid in args:
broker, name = acctid.split('.')
update_pps_conf(broker, name)

View File

@ -19,6 +19,7 @@ Position info and display
"""
from __future__ import annotations
from copy import copy
from dataclasses import dataclass
from functools import partial
from math import floor, copysign
@ -105,8 +106,8 @@ async def update_pnl_from_feed(
# compute and display pnl status
order_mode.pane.pnl_label.format(
pnl=copysign(1, size) * pnl(
# live.avg_price,
order_mode.current_pp.live_pp.avg_price,
# live.be_price,
order_mode.current_pp.live_pp.be_price,
tick['price'],
),
)
@ -356,7 +357,7 @@ class SettingsPane:
# last historical close price
last = feed.shm.array[-1][['close']][0]
pnl_value = copysign(1, size) * pnl(
tracker.live_pp.avg_price,
tracker.live_pp.be_price,
last,
)
@ -476,7 +477,7 @@ class PositionTracker:
self.alloc = alloc
self.startup_pp = startup_pp
self.live_pp = startup_pp.copy()
self.live_pp = copy(startup_pp)
view = chart.getViewBox()
@ -556,7 +557,7 @@ class PositionTracker:
pp = position or self.live_pp
self.update_line(
pp.avg_price,
pp.be_price,
pp.size,
self.chart.linked.symbol.lot_size_digits,
)
@ -570,7 +571,7 @@ class PositionTracker:
self.hide()
else:
self._level_marker.level = pp.avg_price
self._level_marker.level = pp.be_price
# these updates are critical to avoid lag on view/scene changes
self._level_marker.update() # trigger paint

View File

@ -33,10 +33,10 @@ import trio
from PyQt5.QtCore import Qt
from .. import config
from ..pp import Position
from ..clearing._client import open_ems, OrderBook
from ..clearing._allocate import (
mk_allocator,
Position,
)
from ._style import _font
from ..data._source import Symbol
@ -59,7 +59,8 @@ log = get_logger(__name__)
class OrderDialog(BaseModel):
'''Trade dialogue meta-data describing the lifetime
'''
Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart.
'''
@ -87,7 +88,8 @@ def on_level_change_update_next_order_info(
tracker: PositionTracker,
) -> None:
'''A callback applied for each level change to the line
'''
A callback applied for each level change to the line
which will recompute the order size based on allocator
settings. this is assigned inside
``OrderMode.line_from_order()``
@ -604,7 +606,10 @@ async def open_order_mode(
startup_pp = Position(
symbol=symbol,
size=0,
avg_price=0,
be_price=0,
# XXX: BLEH, do we care about this on the client side?
bsuid=symbol,
)
msg = pps_by_account.get(account_name)
if msg:

View File

@ -41,6 +41,7 @@ setup(
},
install_requires=[
'toml',
'tomli', # fastest pure py reader
'click',
'colorlog',
'attrs',