diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 3f6504a1..5c329ecc 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -38,7 +38,10 @@ from .feed import ( open_symbol_search, stream_quotes, ) -from .broker import trades_dialogue +from .broker import ( + trades_dialogue, + norm_trade_records, +) __all__ = [ 'get_client', diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index b12e723b..207f56f7 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -38,15 +38,21 @@ import time from types import SimpleNamespace +from bidict import bidict import trio import tractor from tractor import to_asyncio +import ib_insync as ibis from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails from ib_insync.order import Order from ib_insync.ticker import Ticker -from ib_insync.objects import Position -import ib_insync as ibis +from ib_insync.objects import ( + Position, + Fill, + Execution, + CommissionReport, +) from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client import numpy as np @@ -155,30 +161,23 @@ class NonShittyIB(ibis.IB): self.client.apiEnd += self.disconnectedEvent -# map of symbols to contract ids -_adhoc_cmdty_data_map = { - # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 - - # NOTE: some cmdtys/metals don't have trade data like gold/usd: - # https://groups.io/g/twsapi/message/44174 - 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), -} - _futes_venues = ( 'GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO', + 'COMEX', + 'CMDTY', # special name case.. ) _adhoc_futes_set = { # equities 'nq.globex', - 'mnq.globex', + 'mnq.globex', # micro 'es.globex', - 'mes.globex', + 'mes.globex', # micro # cypto$ 'brr.cmecrypto', @@ -195,20 +194,46 @@ _adhoc_futes_set = { # metals 'xauusd.cmdty', # gold spot 'gc.nymex', - 'mgc.nymex', + 'mgc.nymex', # micro + + # oil & gas + 'cl.nymex', 'xagusd.cmdty', # silver spot 'ni.nymex', # silver futes 'qi.comex', # mini-silver futes } + +# map of symbols to contract ids +_adhoc_symbol_map = { + # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 + + # NOTE: some cmdtys/metals don't have trade data like gold/usd: + # https://groups.io/g/twsapi/message/44174 + 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), +} +for qsn in _adhoc_futes_set: + sym, venue = qsn.split('.') + assert venue.upper() in _futes_venues, f'{venue}' + _adhoc_symbol_map[sym.upper()] = ( + {'exchange': venue}, + {}, + ) + + # exchanges we don't support at the moment due to not knowing # how to do symbol-contract lookup correctly likely due # to not having the data feeds subscribed. _exch_skip_list = { + 'ASX', # aussie stocks 'MEXI', # mexican stocks - 'VALUE', # no idea + + # no idea + 'VALUE', + 'FUNDSERV', + 'SWB2', } # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 @@ -261,27 +286,29 @@ class Client: # NOTE: the ib.client here is "throttled" to 45 rps by default - async def trades( - self, - # api_only: bool = False, + async def trades(self) -> dict[str, Any]: + ''' + Return list of trade-fills from current session in ``dict``. - ) -> dict[str, Any]: - - # orders = await self.ib.reqCompletedOrdersAsync( - # apiOnly=api_only - # ) - fills = await self.ib.reqExecutionsAsync() - norm_fills = [] + ''' + fills: list[Fill] = self.ib.fills() + norm_fills: list[dict] = [] for fill in fills: fill = fill._asdict() # namedtuple - for key, val in fill.copy().items(): - if isinstance(val, Contract): - fill[key] = asdict(val) + for key, val in fill.items(): + match val: + case Contract() | Execution() | CommissionReport(): + fill[key] = asdict(val) norm_fills.append(fill) return norm_fills + async def orders(self) -> list[Order]: + return await self.ib.reqAllOpenOrdersAsync( + apiOnly=False, + ) + async def bars( self, fqsn: str, @@ -483,6 +510,14 @@ class Client: return con + async def get_con( + self, + conid: int, + ) -> Contract: + return await self.ib.qualifyContractsAsync( + ibis.Contract(conId=conid) + ) + async def find_contract( self, pattern: str, @@ -553,7 +588,7 @@ class Client: # commodities elif exch == 'CMDTY': # eg. XAUUSD.CMDTY - con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] + con_kwargs, bars_kwargs = _adhoc_symbol_map[sym] con = ibis.Commodity(**con_kwargs) con.bars_kwargs = bars_kwargs @@ -811,10 +846,23 @@ _scan_ignore: set[tuple[str, int]] = set() def get_config() -> dict[str, Any]: - conf, path = config.load() - + conf, path = config.load('brokers') section = conf.get('ib') + accounts = section.get('accounts') + if not accounts: + raise ValueError( + 'brokers.toml -> `ib.accounts` must be defined\n' + f'location: {path}' + ) + + names = list(accounts.keys()) + accts = section['accounts'] = bidict(accounts) + log.info( + f'brokers.toml defines {len(accts)} accounts: ' + f'{pformat(names)}' + ) + if section is None: log.warning(f'No config section found for ib in {path}') return {} @@ -990,7 +1038,7 @@ async def load_aio_clients( for acct, client in _accounts2clients.items(): log.info(f'Disconnecting {acct}@{client}') client.ib.disconnect() - _client_cache.pop((host, port)) + _client_cache.pop((host, port), None) async def load_clients_for_trio( @@ -1019,9 +1067,6 @@ async def load_clients_for_trio( await asyncio.sleep(float('inf')) -_proxies: dict[str, MethodProxy] = {} - - @acm async def open_client_proxies() -> tuple[ dict[str, MethodProxy], @@ -1044,13 +1089,14 @@ async def open_client_proxies() -> tuple[ if cache_hit: log.info(f'Re-using cached clients: {clients}') + proxies = {} for acct_name, client in clients.items(): proxy = await stack.enter_async_context( open_client_proxy(client), ) - _proxies[acct_name] = proxy + proxies[acct_name] = proxy - yield _proxies, clients + yield proxies, clients def get_preferred_data_client( @@ -1199,11 +1245,13 @@ async def open_client_proxy( event_table = {} async with ( + to_asyncio.open_channel_from( open_aio_client_method_relay, client=client, event_consumers=event_table, ) as (first, chan), + trio.open_nursery() as relay_n, ): diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 721b6da8..4cc20b63 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -26,8 +26,10 @@ from typing import ( Any, Optional, AsyncIterator, + Union, ) +from bidict import bidict import trio from trio_typing import TaskStatus import tractor @@ -42,10 +44,13 @@ from ib_insync.order import ( from ib_insync.objects import ( Fill, Execution, + CommissionReport, ) from ib_insync.objects import Position +import pendulum from piker import config +from piker import pp from piker.log import get_console_log from piker.clearing._messages import ( BrokerdOrder, @@ -56,13 +61,16 @@ from piker.clearing._messages import ( BrokerdFill, BrokerdError, ) +from piker.data._source import Symbol from .api import ( _accounts2clients, - _adhoc_futes_set, + # _adhoc_futes_set, + _adhoc_symbol_map, log, get_config, open_client_proxies, Client, + MethodProxy, ) @@ -80,29 +88,39 @@ def pack_position( # TODO: lookup fqsn even for derivs. symbol = con.symbol.lower() + # try our best to figure out the exchange / venue exch = (con.primaryExchange or con.exchange).lower() - symkey = '.'.join((symbol, exch)) if not exch: - # attempt to lookup the symbol from our - # hacked set.. - for sym in _adhoc_futes_set: - if symbol in sym: - symkey = sym - break + # for wtv cucked reason some futes don't show their + # exchange (like CL.NYMEX) ... + entry = _adhoc_symbol_map.get( + con.symbol or con.localSymbol + ) + if entry: + meta, kwargs = entry + cid = meta.get('conId') + if cid: + assert con.conId == meta['conId'] + exch = meta['exchange'] + + assert exch, f'No clue:\n {con}' + fqsn = '.'.join((symbol, exch)) expiry = con.lastTradeDateOrContractMonth if expiry: - symkey += f'.{expiry}' + fqsn += f'.{expiry}' # TODO: options contracts into a sane format.. - - return BrokerdPosition( - broker='ib', - account=pos.account, - symbol=symkey, - currency=con.currency, - size=float(pos.position), - avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + return ( + con.conId, + BrokerdPosition( + broker='ib', + account=pos.account, + symbol=fqsn, + currency=con.currency, + size=float(pos.position), + avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + ), ) @@ -205,19 +223,35 @@ async def recv_trade_updates( # sync with trio task to_trio.send_nowait(None) - def push_tradesies(eventkit_obj, obj, fill=None): - """Push events to trio task. + def push_tradesies( + eventkit_obj, + obj, + fill: Optional[Fill] = None, + report: Optional[CommissionReport] = None, + ): + ''' + Push events to trio task. - """ - if fill is not None: - # execution details event - item = ('fill', (obj, fill)) + ''' + match eventkit_obj.name(): - elif eventkit_obj.name() == 'positionEvent': - item = ('position', obj) + case 'orderStatusEvent': + item = ('status', obj) - else: - item = ('status', obj) + case 'commissionReportEvent': + assert report + item = ('cost', report) + + case 'execDetailsEvent': + # execution details event + item = ('fill', (obj, fill)) + + case 'positionEvent': + item = ('position', obj) + + case _: + log.error(f'Error unknown event {obj}') + return log.info(f'eventkit event ->\n{pformat(item)}') @@ -233,15 +267,15 @@ async def recv_trade_updates( 'execDetailsEvent', # all "fill" updates 'positionEvent', # avg price updates per symbol per account - # 'commissionReportEvent', # XXX: ugh, it is a separate event from IB and it's # emitted as follows: # self.ib.commissionReportEvent.emit(trade, fill, report) + 'commissionReportEvent', # XXX: not sure yet if we need these # 'updatePortfolioEvent', - # XXX: these all seem to be weird ib_insync intrernal + # XXX: these all seem to be weird ib_insync internal # events that we probably don't care that much about # given the internal design is wonky af.. # 'newOrderEvent', @@ -257,6 +291,149 @@ async def recv_trade_updates( await client.ib.disconnectedEvent +async def update_ledger_from_api_trades( + trade_entries: list[dict[str, Any]], + client: Union[Client, MethodProxy], + +) -> dict[str, pp.Transaction]: + # construct piker pps from trade ledger, underneath using + # LIFO style breakeven pricing calcs. + conf = get_config() + + # XXX; ERRGGG.. + # pack in the "primary/listing exchange" value from a + # contract lookup since it seems this isn't available by + # default from the `.fills()` method endpoint... + for entry in trade_entries: + condict = entry['contract'] + conid = condict['conId'] + pexch = condict['primaryExchange'] + + if not pexch: + cons = await client.get_con(conid=conid) + if cons: + con = cons[0] + pexch = con.primaryExchange or con.exchange + else: + # for futes it seems like the primary is always empty? + pexch = condict['exchange'] + + entry['listingExchange'] = pexch + + entries = trades_to_ledger_entries( + conf['accounts'].inverse, + trade_entries, + ) + + # write recent session's trades to the user's (local) ledger file. + records: dict[str, pp.Transactions] = {} + for acctid, trades_by_id in entries.items(): + with pp.open_trade_ledger('ib', acctid) as ledger: + ledger.update(trades_by_id) + + # normalize to transaction form + records[acctid] = norm_trade_records(trades_by_id) + + return records + + +async def update_and_audit_msgs( + acctid: str, # no `ib.` prefix is required! + pps: list[pp.Position], + cids2pps: dict[tuple[str, int], BrokerdPosition], + validate: bool = False, + +) -> list[BrokerdPosition]: + + msgs: list[BrokerdPosition] = [] + # pps: dict[int, pp.Position] = {} + + for p in pps: + bsuid = p.bsuid + + # build trade-session-actor local table + # of pps from unique symbol ids. + # pps[bsuid] = p + + # retreive equivalent ib reported position message + # for comparison/audit versus the piker equivalent + # breakeven pp calcs. + ibppmsg = cids2pps.get((acctid, bsuid)) + + if ibppmsg: + msg = BrokerdPosition( + broker='ib', + + # XXX: ok so this is annoying, we're relaying + # an account name with the backend suffix prefixed + # but when reading accounts from ledgers we don't + # need it and/or it's prefixed in the section + # table.. + account=ibppmsg.account, + # XXX: the `.ib` is stripped..? + symbol=ibppmsg.symbol, + currency=ibppmsg.currency, + size=p.size, + avg_price=p.be_price, + ) + msgs.append(msg) + + if validate: + ibsize = ibppmsg.size + pikersize = msg.size + diff = pikersize - ibsize + + # if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we + # thought. + if diff: + raise ValueError( + f'POSITION MISMATCH ib <-> piker ledger:\n' + f'ib: {ibppmsg}\n' + f'piker: {msg}\n' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + ) + msg.size = ibsize + + if ibppmsg.avg_price != msg.avg_price: + + # TODO: make this a "propoganda" log level? + log.warning( + 'The mega-cucks at IB want you to believe with their ' + f'"FIFO" positioning for {msg.symbol}:\n' + f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n' + f'piker, LIFO breakeven PnL price: {msg.avg_price}' + ) + + else: + # make brand new message + msg = BrokerdPosition( + broker='ib', + + # XXX: ok so this is annoying, we're relaying + # an account name with the backend suffix prefixed + # but when reading accounts from ledgers we don't + # need it and/or it's prefixed in the section + # table.. we should just strip this from the message + # right since `.broker` is already included? + account=f'ib.{acctid}', + # XXX: the `.ib` is stripped..? + symbol=p.symbol.front_fqsn(), + # currency=ibppmsg.currency, + size=p.size, + avg_price=p.be_price, + ) + if validate and p.size: + raise ValueError( + f'UNEXPECTED POSITION ib <-> piker ledger:\n' + f'piker: {msg}\n' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + ) + msgs.append(msg) + + return msgs + + @tractor.context async def trades_dialogue( @@ -277,6 +454,14 @@ async def trades_dialogue( accounts = set() clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + # TODO: this causes a massive tractor bug when you run marketstored + # with ``--tsdb``... you should get: + # - first error the assertion + # - chart should get that error and die + # - pikerd goes to debugger again from trio nursery multi-error + # - hitting final control-c to kill daemon will lead to hang + # assert 0 + async with ( trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), @@ -306,22 +491,79 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) + cids2pps: dict[str, BrokerdPosition] = {} + update_records: dict[str, bidict] = {} + + # process pp value reported from ib's system. we only use these + # to cross-check sizing since average pricing on their end uses + # the so called (bs) "FIFO" style which more or less results in + # a price that's not useful for traders who want to not lose + # money.. xb for client in aioclients.values(): for pos in client.positions(): - msg = pack_position(pos) - msg.account = accounts_def.inverse[msg.account] - + cid, msg = pack_position(pos) + acctid = msg.account = accounts_def.inverse[msg.account] + acctid = acctid.strip('ib.') + cids2pps[(acctid, cid)] = msg assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') - all_positions.append(msg.dict()) + # collect all ib-pp reported positions so that we can be + # sure know which positions to update from the ledger if + # any are missing from the ``pps.toml`` + update_records.setdefault(acctid, bidict())[cid] = msg.symbol - trades: list[dict] = [] - for proxy in proxies.values(): - trades.append(await proxy.trades()) + # update trades ledgers for all accounts from + # connected api clients which report trades for **this session**. + new_trades = {} + for account, proxy in proxies.items(): + trades = await proxy.trades() + new_trades.update(await update_ledger_from_api_trades( + trades, + proxy, + )) - log.info(f'Loaded {len(trades)} from this session') + for acctid, trans in new_trades.items(): + for t in trans: + bsuid = t.bsuid + if bsuid in update_records: + assert update_records[bsuid] == t.fqsn + else: + update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn + + # load all positions from `pps.toml`, cross check with ib's + # positions data, and relay re-formatted pps as msgs to the ems. + # __2 cases__: + # - new trades have taken place this session that we want to + # always reprocess indempotently, + # - no new trades yet but we want to reload and audit any + # positions reported by ib's sys that may not yet be in + # piker's ``pps.toml`` state-file. + for acctid, to_update in update_records.items(): + trans = new_trades.get(acctid) + active, closed = pp.update_pps_conf( + 'ib', + acctid, + trade_records=trans, + ledger_reload=to_update, + ) + for pps in [active, closed]: + msgs = await update_and_audit_msgs( + acctid, + pps.values(), + cids2pps, + validate=True, + ) + all_positions.extend(msg.dict() for msg in msgs) + + if not all_positions and cids2pps: + raise RuntimeError( + 'Positions reported by ib but not found in `pps.toml`!?\n' + f'{pformat(cids2pps)}' + ) + + # log.info(f'Loaded {len(trades)} from this session') # TODO: write trades to local ``trades.toml`` # - use above per-session trades data and write to local file # - get the "flex reports" working and pull historical data and @@ -345,181 +587,484 @@ async def trades_dialogue( deliver_trade_events, stream, ems_stream, - accounts_def + accounts_def, + cids2pps, + proxies, ) # block until cancelled await trio.sleep_forever() +async def emit_pp_update( + ems_stream: tractor.MsgStream, + trade_entry: dict, + accounts_def: bidict, + proxies: dict, + cids2pps: dict, + +) -> None: + + # compute and relay incrementally updated piker pp + acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] + proxy = proxies[acctid] + + acctname = acctid.strip('ib.') + records = (await update_ledger_from_api_trades( + [trade_entry], + proxy, + ))[acctname] + r = records[0] + + # update and load all positions from `pps.toml`, cross check with + # ib's positions data, and relay re-formatted pps as msgs to the + # ems. we report both the open and closed updates in one map since + # for incremental update we may have just fully closed a pp and need + # to relay that msg as well! + active, closed = pp.update_pps_conf( + 'ib', + acctname, + trade_records=records, + ledger_reload={r.bsuid: r.fqsn}, + ) + + for pos in filter( + bool, + [active.get(r.bsuid), closed.get(r.bsuid)] + ): + msgs = await update_and_audit_msgs( + acctname, + [pos], + cids2pps, + + # ib pp event might not have arrived yet + validate=False, + ) + if msgs: + msg = msgs[0] + break + + await ems_stream.send(msg.dict()) + + async def deliver_trade_events( trade_event_stream: trio.MemoryReceiveChannel, ems_stream: tractor.MsgStream, - accounts_def: dict[str, str], + accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'` + cids2pps: dict[tuple[str, str], BrokerdPosition], + proxies: dict[str, MethodProxy], ) -> None: - '''Format and relay all trade events for a given client to the EMS. + ''' + Format and relay all trade events for a given client to emsd. ''' action_map = {'BOT': 'buy', 'SLD': 'sell'} + ids2fills: dict[str, dict] = {} # TODO: for some reason we can receive a ``None`` here when the # ib-gw goes down? Not sure exactly how that's happening looking # at the eventkit code above but we should probably handle it... async for event_name, item in trade_event_stream: - log.info(f'ib sending {event_name}:\n{pformat(item)}') - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) + match event_name: + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) - # XXX: here's some other sucky cases from the api - # - short-sale but securities haven't been located, in this - # case we should probably keep the order in some kind of - # weird state or cancel it outright? + # XXX: here's some other sucky cases from the api + # - short-sale but securities haven't been located, in this + # case we should probably keep the order in some kind of + # weird state or cancel it outright? - # status='PendingSubmit', message=''), - # status='Cancelled', message='Error 404, - # reqId 1550: Order held while securities are located.'), - # status='PreSubmitted', message='')], + # status='PendingSubmit', message=''), + # status='Cancelled', message='Error 404, + # reqId 1550: Order held while securities are located.'), + # status='PreSubmitted', message='')], - if event_name == 'status': + case 'status': - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... - # unwrap needed data from ib_insync internal types - trade: Trade = item - status: OrderStatus = trade.orderStatus + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = BrokerdStatus( + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( - reqid=trade.order.orderId, - time_ns=time.time_ns(), # cuz why not - account=accounts_def.inverse[trade.order.account], + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + account=accounts_def.inverse[trade.order.account], - # everyone doin camel case.. - status=status.status.lower(), # force lower case + # everyone doin camel case.. + status=status.status.lower(), # force lower case - filled=status.filled, - reason=status.whyHeld, + filled=status.filled, + reason=status.whyHeld, - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, - broker_details={'name': 'ib'}, - ) + broker_details={'name': 'ib'}, + ) + await ems_stream.send(msg.dict()) - elif event_name == 'fill': + case 'fill': - # for wtv reason this is a separate event type - # from IB, not sure why it's needed other then for extra - # complexity and over-engineering :eyeroll:. - # we may just end up dropping these events (or - # translating them to ``Status`` msgs) if we can - # show the equivalent status events are no more latent. + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. - # unpack ib_insync types - # pep-0526 style: - # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations - trade: Trade - fill: Fill - trade, fill = item - execu: Execution = fill.execution + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill - # TODO: normalize out commissions details? - details = { - 'contract': asdict(fill.contract), - 'execution': asdict(fill.execution), - 'commissions': asdict(fill.commissionReport), - 'broker_time': execu.time, # supposedly server fill time - 'name': 'ib', - } + # TODO: maybe we can use matching to better handle these cases. + trade, fill = item + execu: Execution = fill.execution + execid = execu.execId - msg = BrokerdFill( - # should match the value returned from `.submit_limit()` - reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not + # TODO: + # - normalize out commissions details? + # - this is the same as the unpacking loop above in + # ``trades_to_ledger_entries()`` no? + trade_entry = ids2fills.setdefault(execid, {}) + cost_already_rx = bool(trade_entry) - action=action_map[execu.side], - size=execu.shares, - price=execu.price, + # if the costs report was already received this + # should be not empty right? + comms = fill.commissionReport.commission + if cost_already_rx: + assert comms - broker_details=details, - # XXX: required by order mode currently - broker_time=details['broker_time'], + trade_entry.update( + { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissionReport': asdict(fill.commissionReport), + # supposedly server fill time? + 'broker_time': execu.time, + 'name': 'ib', + } + ) - ) + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not - elif event_name == 'error': + action=action_map[execu.side], + size=execu.shares, + price=execu.price, - err: dict = item + broker_details=trade_entry, + # XXX: required by order mode currently + broker_time=trade_entry['broker_time'], - # f$#$% gawd dammit insync.. - con = err['contract'] - if isinstance(con, Contract): - err['contract'] = asdict(con) + ) + await ems_stream.send(msg.dict()) - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') + # 2 cases: + # - fill comes first or + # - comms report comes first + comms = fill.commissionReport.commission + if comms: + # UGHHH since the commision report object might be + # filled in **after** we already serialized to dict.. + # def need something better for all this. + trade_entry.update( + {'commissionReport': asdict(fill.commissionReport)} + ) - # TODO: what schema for this msg if we're going to make it - # portable across all backends? - # msg = BrokerdError(**err) + if comms or cost_already_rx: + # only send a pp update once we have a cost report + await emit_pp_update( + ems_stream, + trade_entry, + accounts_def, + proxies, + cids2pps, + ) + + case 'cost': + + cr: CommissionReport = item + execid = cr.execId + + trade_entry = ids2fills.setdefault(execid, {}) + fill_already_rx = bool(trade_entry) + + # no fill msg has arrived yet so just fill out the + # cost report for now and when the fill arrives a pp + # msg can be emitted. + trade_entry.update( + {'commissionReport': asdict(cr)} + ) + + if fill_already_rx: + await emit_pp_update( + ems_stream, + trade_entry, + accounts_def, + proxies, + cids2pps, + ) + + case 'error': + err: dict = item + + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) + + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') + + # TODO: what schema for this msg if we're going to make it + # portable across all backends? + # msg = BrokerdError(**err) + + case 'position': + + cid, msg = pack_position(item) + # acctid = msg.account = accounts_def.inverse[msg.account] + # cuck ib and it's shitty fifo sys for pps! + # await ems_stream.send(msg.dict()) + + case 'event': + + # it's either a general system status event or an external + # trade event? + log.info(f"TWS system status: \n{pformat(item)}") + + # TODO: support this again but needs parsing at the callback + # level... + # reqid = item.get('reqid', 0) + # if getattr(msg, 'reqid', 0) < -1: + # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + # msg.reqid = 'tws-' + str(-1 * reqid) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + # msg.broker_details['external_src'] = 'tws' + + case _: + log.error(f'WTF: {event_name}: {item}') + + +def norm_trade_records( + ledger: dict[str, Any], + +) -> list[pp.Transaction]: + ''' + Normalize a flex report or API retrieved executions + ledger into our standard record format. + + ''' + records: list[pp.Transaction] = [] + + for tid, record in ledger.items(): + + conid = record.get('conId') or record['conid'] + comms = record.get('commission') or -1*record['ibCommission'] + price = record.get('price') or record['tradePrice'] + + # the api doesn't do the -/+ on the quantity for you but flex + # records do.. are you fucking serious ib...!? + size = record.get('quantity') or record['shares'] * { + 'BOT': 1, + 'SLD': -1, + }[record['side']] + + exch = record['exchange'] + lexch = record.get('listingExchange') + + suffix = lexch or exch + symbol = record['symbol'] + + # likely an opts contract record from a flex report.. + # TODO: no idea how to parse ^ the strike part from flex.. + # (00010000 any, or 00007500 tsla, ..) + # we probably must do the contract lookup for this? + if ' ' in symbol or '--' in exch: + underlying, _, tail = symbol.partition(' ') + suffix = exch = 'opt' + expiry = tail[:6] + # otype = tail[6] + # strike = tail[7:] + + print(f'skipping opts contract {symbol}') continue - elif event_name == 'position': - msg = pack_position(item) - msg.account = accounts_def.inverse[msg.account] + # timestamping is way different in API records + date = record.get('date') + if not date: + # probably a flex record with a wonky non-std timestamp.. + date, ts = record['dateTime'].split(';') + dt = pendulum.parse(date) + ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' + tsdt = pendulum.parse(ts) + dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) - elif event_name == 'event': + else: + # epoch_dt = pendulum.from_timestamp(record.get('time')) + dt = pendulum.parse(date) - # it's either a general system status event or an external - # trade event? - log.info(f"TWS system status: \n{pformat(item)}") + # special handling of symbol extraction from + # flex records using some ad-hoc schema parsing. + instr = record.get('assetCategory') + if instr == 'FUT': + symbol = record['description'][:3] - # TODO: support this again but needs parsing at the callback - # level... - # reqid = item.get('reqid', 0) - # if getattr(msg, 'reqid', 0) < -1: - # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + # try to build out piker fqsn from record. + expiry = record.get( + 'lastTradeDateOrContractMonth') or record.get('expiry') + if expiry: + expiry = str(expiry).strip(' ') + suffix = f'{exch}.{expiry}' + expiry = pendulum.parse(expiry) + fqsn = Symbol.from_fqsn( + fqsn=f'{symbol}.{suffix}.ib', + info={}, + ).front_fqsn().rstrip('.ib') + + # NOTE: for flex records the normal fields for defining an fqsn + # sometimes won't be available so we rely on two approaches for + # the "reverse lookup" of piker style fqsn keys: + # - when dealing with API trade records received from + # `IB.trades()` we do a contract lookup at he time of processing + # - when dealing with flex records, it is assumed the record + # is at least a day old and thus the TWS position reporting system + # should already have entries if the pps are still open, in + # which case, we can pull the fqsn from that table (see + # `trades_dialogue()` above). + + records.append(pp.Transaction( + fqsn=fqsn, + tid=tid, + size=size, + price=price, + cost=comms, + dt=dt, + expiry=expiry, + bsuid=conid, + )) + + return records + + +def trades_to_ledger_entries( + accounts: bidict, + trade_entries: list[object], + source_type: str = 'api', + +) -> dict: + ''' + Convert either of API execution objects or flex report + entry objects into ``dict`` form, pretty much straight up + without modification. + + ''' + trades_by_account = {} + + for t in trade_entries: + if source_type == 'flex': + entry = t.__dict__ + + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave a table key as an `int`? So i guess + # cast to strs for all keys.. + + # oddly for some so-called "BookTrade" entries + # this field seems to be blank, no cuckin clue. + # trade['ibExecID'] + tid = str(entry.get('ibExecID') or entry['tradeID']) + # date = str(entry['tradeDate']) + + # XXX: is it going to cause problems if a account name + # get's lost? The user should be able to find it based + # on the actual exec history right? + acctid = accounts[str(entry['accountId'])] + + elif source_type == 'api': + # NOTE: example of schema we pull from the API client. + # { + # 'commissionReport': CommissionReport(... + # 'contract': {... + # 'execution': Execution(... + # 'time': 1654801166.0 + # } + + # flatten all sub-dicts and values into one top level entry. + entry = {} + for section, val in t.items(): + match section: + case 'contract' | 'execution' | 'commissionReport': + # sub-dict cases + entry.update(val) + + case 'time': + # ib has wack ns timestamps, or is that us? + continue + + case _: + entry[section] = val + + tid = str(entry['execId']) + dt = pendulum.from_timestamp(entry['time']) + # TODO: why isn't this showing seconds in the str? + entry['date'] = str(dt) + acctid = accounts[entry['acctNumber']] + + if not tid: + # this is likely some kind of internal adjustment + # transaction, likely one of the following: + # - an expiry event that will show a "book trade" indicating + # some adjustment to cash balances: zeroing or itm settle. + # - a manual cash balance position adjustment likely done by + # the user from the accounts window in TWS where they can + # manually set the avg price and size: + # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST + log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') continue - # msg.reqid = 'tws-' + str(-1 * reqid) + trades_by_account.setdefault( + acctid, {} + )[tid] = entry - # mark msg as from "external system" - # TODO: probably something better then this.. and start - # considering multiplayer/group trades tracking - # msg.broker_details['external_src'] = 'tws' - - # XXX: we always serialize to a dict for msgpack - # translations, ideally we can move to an msgspec (or other) - # encoder # that can be enabled in ``tractor`` ahead of - # time so we can pass through the message types directly. - await ems_stream.send(msg.dict()) + return trades_by_account def load_flex_trades( path: Optional[str] = None, -) -> dict[str, str]: +) -> dict[str, Any]: - from pprint import pprint from ib_insync import flexreport, util conf = get_config() @@ -531,10 +1076,10 @@ def load_flex_trades( token = conf.get('flex_token') if not token: raise ValueError( - 'You must specify a ``flex_token`` field in your' - '`brokers.toml` in order load your trade log, see our' - 'intructions for how to set this up here:\n' - 'PUT LINK HERE!' + 'You must specify a ``flex_token`` field in your' + '`brokers.toml` in order load your trade log, see our' + 'intructions for how to set this up here:\n' + 'PUT LINK HERE!' ) qid = conf['flex_trades_query_id'] @@ -555,36 +1100,38 @@ def load_flex_trades( report = flexreport.FlexReport(path=path) trade_entries = report.extract('Trade') - trades = { - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave this as an ``int``? - str(t.__dict__['tradeID']): t.__dict__ - for t in trade_entries - } + ln = len(trade_entries) + # log.info(f'Loaded {ln} trades from flex query') + print(f'Loaded {ln} trades from flex query') - ln = len(trades) - log.info(f'Loaded {ln} trades from flex query') + trades_by_account = trades_to_ledger_entries( + # get reverse map to user account names + conf['accounts'].inverse, + trade_entries, + source_type='flex', + ) - trades_by_account = {} - for tid, trade in trades.items(): - trades_by_account.setdefault( - # oddly for some so-called "BookTrade" entries - # this field seems to be blank, no cuckin clue. - # trade['ibExecID'] - str(trade['accountId']), {} - )[tid] = trade + ledgers = {} + for acctid, trades_by_id in trades_by_account.items(): + with pp.open_trade_ledger('ib', acctid) as ledger: + ledger.update(trades_by_id) - section = {'ib': trades_by_account} - pprint(section) + ledgers[acctid] = ledger - # TODO: load the config first and append in - # the new trades loaded here.. - try: - config.write(section, 'trades') - except KeyError: - import pdbpp; pdbpp.set_trace() # noqa + return ledgers if __name__ == '__main__': - load_flex_trades() + import sys + import os + + args = sys.argv + if len(args) > 1: + args = args[1:] + for arg in args: + path = os.path.abspath(arg) + load_flex_trades(path=path) + else: + # expect brokers.toml to have an entry and + # pull from the web service. + load_flex_trades() diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 1b2bfb45..b22ddc1b 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -217,8 +217,8 @@ async def get_bars( ) elif ( - err.code == 162 - and 'HMDS query returned no data' in err.message + err.code == 162 and + 'HMDS query returned no data' in err.message ): # XXX: this is now done in the storage mgmt layer # and we shouldn't implicitly decrement the frame dt @@ -237,6 +237,13 @@ async def get_bars( frame_size=2000, ) + # elif ( + # err.code == 162 and + # 'Trading TWS session is connected from a different IP address' in err.message + # ): + # log.warning("ignoring ip address warning") + # continue + elif _pacing in msg: log.warning( @@ -909,17 +916,17 @@ async def open_symbol_search( # trigger async request await trio.sleep(0) - # match against our ad-hoc set immediately - adhoc_matches = fuzzy.extractBests( - pattern, - list(_adhoc_futes_set), - score_cutoff=90, - ) - log.info(f'fuzzy matched adhocs: {adhoc_matches}') - adhoc_match_results = {} - if adhoc_matches: - # TODO: do we need to pull contract details? - adhoc_match_results = {i[0]: {} for i in adhoc_matches} + # # match against our ad-hoc set immediately + # adhoc_matches = fuzzy.extractBests( + # pattern, + # list(_adhoc_futes_set), + # score_cutoff=90, + # ) + # log.info(f'fuzzy matched adhocs: {adhoc_matches}') + # adhoc_match_results = {} + # if adhoc_matches: + # # TODO: do we need to pull contract details? + # adhoc_match_results = {i[0]: {} for i in adhoc_matches} log.debug(f'fuzzy matching stocks {stock_results}') stock_matches = fuzzy.extractBests( @@ -928,7 +935,8 @@ async def open_symbol_search( score_cutoff=50, ) - matches = adhoc_match_results | { + # matches = adhoc_match_results | { + matches = { item[0]: {} for item in stock_matches } # TODO: we used to deliver contract details diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index 71d7d9a0..336a9b25 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -23,53 +23,10 @@ from typing import Optional from bidict import bidict from pydantic import BaseModel, validator +# from msgspec import Struct from ..data._source import Symbol -from ._messages import BrokerdPosition, Status - - -class Position(BaseModel): - ''' - Basic pp (personal position) model with attached fills history. - - This type should be IPC wire ready? - - ''' - symbol: Symbol - - # last size and avg entry price - size: float - avg_price: float # TODO: contextual pricing - - # ordered record of known constituent trade messages - fills: list[Status] = [] - - def update_from_msg( - self, - msg: BrokerdPosition, - - ) -> None: - - # XXX: better place to do this? - symbol = self.symbol - - lot_size_digits = symbol.lot_size_digits - avg_price, size = ( - round(msg['avg_price'], ndigits=symbol.tick_size_digits), - round(msg['size'], ndigits=lot_size_digits), - ) - - self.avg_price = avg_price - self.size = size - - @property - def dsize(self) -> float: - ''' - The "dollar" size of the pp, normally in trading (fiat) unit - terms. - - ''' - return self.avg_price * self.size +from ..pp import Position _size_units = bidict({ @@ -173,7 +130,7 @@ class Allocator(BaseModel): l_sub_pp = self.units_limit - abs_live_size elif size_unit == 'currency': - live_cost_basis = abs_live_size * live_pp.avg_price + live_cost_basis = abs_live_size * live_pp.be_price slot_size = currency_per_slot / price l_sub_pp = (self.currency_limit - live_cost_basis) / price @@ -205,7 +162,7 @@ class Allocator(BaseModel): if size_unit == 'currency': # compute the "projected" limit's worth of units at the # current pp (weighted) price: - slot_size = currency_per_slot / live_pp.avg_price + slot_size = currency_per_slot / live_pp.be_price else: slot_size = u_per_slot @@ -244,7 +201,12 @@ class Allocator(BaseModel): if order_size < slot_size: # compute a fractional slots size to display slots_used = self.slots_used( - Position(symbol=sym, size=order_size, avg_price=price) + Position( + symbol=sym, + size=order_size, + be_price=price, + bsuid=sym, + ) ) return { @@ -271,8 +233,8 @@ class Allocator(BaseModel): abs_pp_size = abs(pp.size) if self.size_unit == 'currency': - # live_currency_size = size or (abs_pp_size * pp.avg_price) - live_currency_size = abs_pp_size * pp.avg_price + # live_currency_size = size or (abs_pp_size * pp.be_price) + live_currency_size = abs_pp_size * pp.be_price prop = live_currency_size / self.currency_limit else: @@ -342,7 +304,7 @@ def mk_allocator( # if the current position is already greater then the limit # settings, increase the limit to the current position if alloc.size_unit == 'currency': - startup_size = startup_pp.size * startup_pp.avg_price + startup_size = startup_pp.size * startup_pp.be_price if startup_size > alloc.currency_limit: alloc.currency_limit = round(startup_size, ndigits=2) diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index 1f07f0b0..4bb0be00 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -258,6 +258,6 @@ class BrokerdPosition(BaseModel): broker: str account: str symbol: str - currency: str size: float avg_price: float + currency: str = '' diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 99039049..cf580876 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -31,6 +31,8 @@ import tractor from dataclasses import dataclass from .. import data +from ..data._source import Symbol +from ..pp import Position from ..data._normalize import iterticks from ..data._source import unpack_fqsn from ..log import get_logger @@ -257,29 +259,14 @@ class PaperBoi: ) ) - # "avg position price" calcs - # TODO: eventually it'd be nice to have a small set of routines - # to do this stuff from a sequence of cleared orders to enable - # so called "contextual positions". - new_size = size + pp_msg.size - - # old size minus the new size gives us size differential with - # +ve -> increase in pp size - # -ve -> decrease in pp size - size_diff = abs(new_size) - abs(pp_msg.size) - - if new_size == 0: - pp_msg.avg_price = 0 - - elif size_diff > 0: - # only update the "average position price" when the position - # size increases not when it decreases (i.e. the position is - # being made smaller) - pp_msg.avg_price = ( - abs(size) * price + pp_msg.avg_price * abs(pp_msg.size) - ) / abs(new_size) - - pp_msg.size = new_size + # delegate update to `.pp.Position.lifo_update()` + pp = Position( + Symbol(key=symbol), + size=pp_msg.size, + be_price=pp_msg.avg_price, + bsuid=symbol, + ) + pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) await self.ems_trades_stream.send(pp_msg.dict()) @@ -390,7 +377,8 @@ async def handle_order_requests( account = request_msg['account'] if account != 'paper': log.error( - 'This is a paper account, only a `paper` selection is valid' + 'This is a paper account,' + ' only a `paper` selection is valid' ) await ems_order_stream.send(BrokerdError( oid=request_msg['oid'], @@ -464,7 +452,7 @@ async def trades_dialogue( # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` # await ctx.started(all_positions) - await ctx.started(({}, {'paper',})) + await ctx.started(({}, ['paper'])) async with ( ctx.open_stream() as ems_stream, diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index e9512322..853860aa 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -83,9 +83,9 @@ def pikerd(loglevel, host, tl, pdb, tsdb): ) log.info( - f'`marketstore` up!\n' - f'`marketstored` pid: {pid}\n' - f'docker container id: {cid}\n' + f'`marketstored` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' f'config: {pformat(config)}' ) diff --git a/piker/config.py b/piker/config.py index d1926dec..c7a7acc9 100644 --- a/piker/config.py +++ b/piker/config.py @@ -21,6 +21,7 @@ Broker configuration mgmt. import platform import sys import os +from os import path from os.path import dirname import shutil from typing import Optional @@ -111,6 +112,7 @@ if _parent_user: _conf_names: set[str] = { 'brokers', + 'pps', 'trades', 'watchlists', } @@ -147,19 +149,21 @@ def get_conf_path( conf_name: str = 'brokers', ) -> str: - """Return the default config path normally under - ``~/.config/piker`` on linux. + ''' + Return the top-level default config path normally under + ``~/.config/piker`` on linux for a given ``conf_name``, the config + name. Contains files such as: - brokers.toml + - pp.toml - watchlists.toml - - trades.toml # maybe coming soon ;) - signals.toml - strats.toml - """ + ''' assert conf_name in _conf_names fn = _conf_fn_w_ext(conf_name) return os.path.join( @@ -173,7 +177,7 @@ def repodir(): Return the abspath to the repo directory. ''' - dirpath = os.path.abspath( + dirpath = path.abspath( # we're 3 levels down in **this** module file dirname(dirname(os.path.realpath(__file__))) ) @@ -182,7 +186,9 @@ def repodir(): def load( conf_name: str = 'brokers', - path: str = None + path: str = None, + + **tomlkws, ) -> (dict, str): ''' @@ -190,6 +196,7 @@ def load( ''' path = path or get_conf_path(conf_name) + if not os.path.isfile(path): fn = _conf_fn_w_ext(conf_name) @@ -202,8 +209,11 @@ def load( # if one exists. if os.path.isfile(template): shutil.copyfile(template, path) + else: + with open(path, 'w'): + pass # touch - config = toml.load(path) + config = toml.load(path, **tomlkws) log.debug(f"Read config file {path}") return config, path @@ -212,6 +222,7 @@ def write( config: dict, # toml config as dict name: str = 'brokers', path: str = None, + **toml_kwargs, ) -> None: '''' @@ -235,11 +246,14 @@ def write( f"{path}" ) with open(path, 'w') as cf: - return toml.dump(config, cf) + return toml.dump( + config, + cf, + **toml_kwargs, + ) def load_accounts( - providers: Optional[list[str]] = None ) -> bidict[str, Optional[str]]: diff --git a/piker/data/_source.py b/piker/data/_source.py index 9afcb191..73c218ca 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -23,7 +23,7 @@ import decimal from bidict import bidict import numpy as np -from pydantic import BaseModel +from msgspec import Struct # from numba import from_dtype @@ -126,7 +126,7 @@ def unpack_fqsn(fqsn: str) -> tuple[str, str, str]: ) -class Symbol(BaseModel): +class Symbol(Struct): ''' I guess this is some kinda container thing for dealing with all the different meta-data formats from brokers? @@ -152,9 +152,7 @@ class Symbol(BaseModel): info: dict[str, Any], suffix: str = '', - # XXX: like wtf.. - # ) -> 'Symbol': - ) -> None: + ) -> Symbol: tick_size = info.get('price_tick_size', 0.01) lot_tick_size = info.get('lot_tick_size', 0.0) @@ -175,9 +173,7 @@ class Symbol(BaseModel): fqsn: str, info: dict[str, Any], - # XXX: like wtf.. - # ) -> 'Symbol': - ) -> None: + ) -> Symbol: broker, key, suffix = unpack_fqsn(fqsn) return cls.from_broker_info( broker, @@ -240,7 +236,7 @@ class Symbol(BaseModel): ''' tokens = self.tokens() - fqsn = '.'.join(tokens) + fqsn = '.'.join(map(str.lower, tokens)) return fqsn def iterfqsns(self) -> list[str]: diff --git a/piker/pp.py b/piker/pp.py new file mode 100644 index 00000000..0a67d04f --- /dev/null +++ b/piker/pp.py @@ -0,0 +1,781 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License + +# along with this program. If not, see . +''' +Personal/Private position parsing, calculating, summarizing in a way +that doesn't try to cuk most humans who prefer to not lose their moneys.. +(looking at you `ib` and dirt-bird friends) + +''' +from collections import deque +from contextlib import contextmanager as cm +# from pprint import pformat +import os +from os import path +from math import copysign +import re +import time +from typing import ( + Any, + Optional, + Union, +) + +from msgspec import Struct +import pendulum +from pendulum import datetime, now +import tomli +import toml + +from . import config +from .brokers import get_brokermod +from .clearing._messages import BrokerdPosition, Status +from .data._source import Symbol +from .log import get_logger + +log = get_logger(__name__) + + +@cm +def open_trade_ledger( + broker: str, + account: str, + +) -> str: + ''' + Indempotently create and read in a trade log file from the + ``/ledgers/`` directory. + + Files are named per broker account of the form + ``_.toml``. The ``accountname`` here is the + name as defined in the user's ``brokers.toml`` config. + + ''' + ldir = path.join(config._config_dir, 'ledgers') + if not path.isdir(ldir): + os.makedirs(ldir) + + fname = f'trades_{broker}_{account}.toml' + tradesfile = path.join(ldir, fname) + + if not path.isfile(tradesfile): + log.info( + f'Creating new local trades ledger: {tradesfile}' + ) + with open(tradesfile, 'w') as cf: + pass # touch + with open(tradesfile, 'rb') as cf: + start = time.time() + ledger = tomli.load(cf) + print(f'Ledger load took {time.time() - start}s') + cpy = ledger.copy() + try: + yield cpy + finally: + if cpy != ledger: + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ledger for {tradesfile}:\n') + ledger.update(cpy) + + # we write on close the mutated ledger data + with open(tradesfile, 'w') as cf: + return toml.dump(ledger, cf) + + +class Transaction(Struct): + # TODO: should this be ``.to`` (see below)? + fqsn: str + + tid: Union[str, int] # unique transaction id + size: float + price: float + cost: float # commisions or other additional costs + dt: datetime + expiry: Optional[datetime] = None + + # optional key normally derived from the broker + # backend which ensures the instrument-symbol this record + # is for is truly unique. + bsuid: Optional[Union[str, int]] = None + + # optional fqsn for the source "asset"/money symbol? + # from: Optional[str] = None + + +class Position(Struct): + ''' + Basic pp (personal/piker position) model with attached clearing + transaction history. + + ''' + symbol: Symbol + + # can be +ve or -ve for long/short + size: float + + # "breakeven price" above or below which pnl moves above and below + # zero for the entirety of the current "trade state". + be_price: float + + # unique backend symbol id + bsuid: str + + # ordered record of known constituent trade messages + clears: dict[ + Union[str, int, Status], # trade id + dict[str, Any], # transaction history summaries + ] = {} + + expiry: Optional[datetime] = None + + def to_dict(self) -> dict: + return { + f: getattr(self, f) + for f in self.__struct_fields__ + } + + def to_pretoml(self) -> dict: + ''' + Prep this position's data contents for export to toml including + re-structuring of the ``.clears`` table to an array of + inline-subtables for better ``pps.toml`` compactness. + + ''' + d = self.to_dict() + clears = d.pop('clears') + expiry = d.pop('expiry') + + if expiry: + d['expiry'] = str(expiry) + + clears_list = [] + + for tid, data in clears.items(): + inline_table = toml.TomlDecoder().get_empty_inline_table() + inline_table['tid'] = tid + + for k, v in data.items(): + inline_table[k] = v + + clears_list.append(inline_table) + + d['clears'] = clears_list + + return d + + def update_from_msg( + self, + msg: BrokerdPosition, + + ) -> None: + + # XXX: better place to do this? + symbol = self.symbol + + lot_size_digits = symbol.lot_size_digits + be_price, size = ( + round( + msg['avg_price'], + ndigits=symbol.tick_size_digits + ), + round( + msg['size'], + ndigits=lot_size_digits + ), + ) + + self.be_price = be_price + self.size = size + + @property + def dsize(self) -> float: + ''' + The "dollar" size of the pp, normally in trading (fiat) unit + terms. + + ''' + return self.be_price * self.size + + def update( + self, + t: Transaction, + + ) -> None: + self.clears[t.tid] = { + 'cost': t.cost, + 'price': t.price, + 'size': t.size, + 'dt': str(t.dt), + } + + def lifo_update( + self, + size: float, + price: float, + cost: float = 0, + + # TODO: idea: "real LIFO" dynamic positioning. + # - when a trade takes place where the pnl for + # the (set of) trade(s) is below the breakeven price + # it may be that the trader took a +ve pnl on a short(er) + # term trade in the same account. + # - in this case we could recalc the be price to + # be reverted back to it's prior value before the nearest term + # trade was opened.? + # dynamic_breakeven_price: bool = False, + + ) -> (float, float): + ''' + Incremental update using a LIFO-style weighted mean. + + ''' + # "avg position price" calcs + # TODO: eventually it'd be nice to have a small set of routines + # to do this stuff from a sequence of cleared orders to enable + # so called "contextual positions". + new_size = self.size + size + + # old size minus the new size gives us size diff with + # +ve -> increase in pp size + # -ve -> decrease in pp size + size_diff = abs(new_size) - abs(self.size) + + if new_size == 0: + self.be_price = 0 + + elif size_diff > 0: + # XXX: LOFI incremental update: + # only update the "average price" when + # the size increases not when it decreases (i.e. the + # position is being made smaller) + self.be_price = ( + # weight of current exec = (size * price) + cost + (abs(size) * price) + + + (copysign(1, new_size) * cost) # transaction cost + + + # weight of existing be price + self.be_price * abs(self.size) # weight of previous pp + ) / abs(new_size) # normalized by the new size: weighted mean. + + self.size = new_size + + return new_size, self.be_price + + def minimize_clears( + self, + + ) -> dict[str, dict]: + ''' + Minimize the position's clears entries by removing + all transactions before the last net zero size to avoid + unecessary history irrelevant to the current pp state. + + + ''' + size: float = 0 + clears_since_zero: deque[tuple(str, dict)] = deque() + + # scan for the last "net zero" position by + # iterating clears in reverse. + for tid, clear in reversed(self.clears.items()): + size += clear['size'] + clears_since_zero.appendleft((tid, clear)) + + if size == 0: + break + + self.clears = dict(clears_since_zero) + return self.clears + + +def update_pps( + records: dict[str, Transaction], + pps: Optional[dict[str, Position]] = None + +) -> dict[str, Position]: + ''' + Compile a set of positions from a trades ledger. + + ''' + pps: dict[str, Position] = pps or {} + + # lifo update all pps from records + for r in records: + + pp = pps.setdefault( + r.bsuid, + + # if no existing pp, allocate fresh one. + Position( + Symbol.from_fqsn( + r.fqsn, + info={}, + ), + size=0.0, + be_price=0.0, + bsuid=r.bsuid, + expiry=r.expiry, + ) + ) + + # don't do updates for ledger records we already have + # included in the current pps state. + if r.tid in pp.clears: + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations. + continue + + # lifo style "breakeven" price calc + pp.lifo_update( + r.size, + r.price, + + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost=2*r.cost, + ) + + # track clearing data + pp.update(r) + + assert len(set(pp.clears)) == len(pp.clears) + + return pps + + +def load_pps_from_ledger( + + brokername: str, + acctname: str, + + # post normalization filter on ledger entries to be processed + filter_by: Optional[list[dict]] = None, + +) -> dict[str, Position]: + ''' + Open a ledger file by broker name and account and read in and + process any trade records into our normalized ``Transaction`` + form and then pass these into the position processing routine + and deliver the two dict-sets of the active and closed pps. + + ''' + with open_trade_ledger( + brokername, + acctname, + ) as ledger: + if not ledger: + # null case, no ledger file with content + return {} + + brokermod = get_brokermod(brokername) + src_records = brokermod.norm_trade_records(ledger) + + if filter_by: + bsuids = set(filter_by) + records = list(filter(lambda r: r.bsuid in bsuids, src_records)) + else: + records = src_records + + return update_pps(records) + + +def get_pps( + brokername: str, + acctids: Optional[set[str]] = set(), + +) -> dict[str, dict[str, Position]]: + ''' + Read out broker-specific position entries from + incremental update file: ``pps.toml``. + + ''' + conf, path = config.load( + 'pps', + # load dicts as inlines to preserve compactness + # _dict=toml.decoder.InlineTableDict, + ) + + all_active = {} + all_closed = {} + + # try to load any ledgers if no section found + bconf, path = config.load('brokers') + accounts = bconf[brokername]['accounts'] + for account in accounts: + + # TODO: instead of this filter we could + # always send all known pps but just not audit + # them since an active client might not be up? + if ( + acctids and + f'{brokername}.{account}' not in acctids + ): + continue + + active, closed = update_pps_conf(brokername, account) + all_active.setdefault(account, {}).update(active) + all_closed.setdefault(account, {}).update(closed) + + return all_active, all_closed + + +# TODO: instead see if we can hack tomli and tomli-w to do the same: +# - https://github.com/hukkin/tomli +# - https://github.com/hukkin/tomli-w +class PpsEncoder(toml.TomlEncoder): + ''' + Special "styled" encoder that makes a ``pps.toml`` redable and + compact by putting `.clears` tables inline and everything else + flat-ish. + + ''' + separator = ',' + + def dump_list(self, v): + ''' + Dump an inline list with a newline after every element and + with consideration for denoted inline table types. + + ''' + retval = "[\n" + for u in v: + if isinstance(u, toml.decoder.InlineTableDict): + out = self.dump_inline_table(u) + else: + out = str(self.dump_value(u)) + + retval += " " + out + "," + "\n" + retval += "]" + return retval + + def dump_inline_table(self, section): + """Preserve inline table in its compact syntax instead of expanding + into subsection. + https://github.com/toml-lang/toml#user-content-inline-table + """ + val_list = [] + for k, v in section.items(): + # if isinstance(v, toml.decoder.InlineTableDict): + if isinstance(v, dict): + val = self.dump_inline_table(v) + else: + val = str(self.dump_value(v)) + + val_list.append(k + " = " + val) + + retval = "{ " + ", ".join(val_list) + " }" + return retval + + def dump_sections(self, o, sup): + retstr = "" + if sup != "" and sup[-1] != ".": + sup += '.' + retdict = self._dict() + arraystr = "" + for section in o: + qsection = str(section) + value = o[section] + + if not re.match(r'^[A-Za-z0-9_-]+$', section): + qsection = toml.encoder._dump_str(section) + + # arrayoftables = False + if ( + self.preserve + and isinstance(value, toml.decoder.InlineTableDict) + ): + retstr += ( + qsection + + + " = " + + + self.dump_inline_table(o[section]) + + + '\n' # only on the final terminating left brace + ) + + # XXX: this code i'm pretty sure is just blatantly bad + # and/or wrong.. + # if isinstance(o[section], list): + # for a in o[section]: + # if isinstance(a, dict): + # arrayoftables = True + # if arrayoftables: + # for a in o[section]: + # arraytabstr = "\n" + # arraystr += "[[" + sup + qsection + "]]\n" + # s, d = self.dump_sections(a, sup + qsection) + # if s: + # if s[0] == "[": + # arraytabstr += s + # else: + # arraystr += s + # while d: + # newd = self._dict() + # for dsec in d: + # s1, d1 = self.dump_sections(d[dsec], sup + + # qsection + "." + + # dsec) + # if s1: + # arraytabstr += ("[" + sup + qsection + + # "." + dsec + "]\n") + # arraytabstr += s1 + # for s1 in d1: + # newd[dsec + "." + s1] = d1[s1] + # d = newd + # arraystr += arraytabstr + + elif isinstance(value, dict): + retdict[qsection] = o[section] + + elif o[section] is not None: + retstr += ( + qsection + + + " = " + + + str(self.dump_value(o[section])) + ) + + # if not isinstance(value, dict): + if not isinstance(value, toml.decoder.InlineTableDict): + # inline tables should not contain newlines: + # https://toml.io/en/v1.0.0#inline-table + retstr += '\n' + + else: + raise ValueError(value) + + retstr += arraystr + return (retstr, retdict) + + +def load_pps_from_toml( + brokername: str, + acctid: str, + + # XXX: there is an edge case here where we may want to either audit + # the retrieved ``pps.toml`` output or reprocess it since there was + # an error on write on the last attempt to update the state file + # even though the ledger *was* updated. For this cases we allow the + # caller to pass in a symbol set they'd like to reload from the + # underlying ledger to be reprocessed in computing pps state. + reload_records: Optional[dict[str, str]] = None, + update_from_ledger: bool = False, + +) -> tuple[dict, dict[str, Position]]: + ''' + Load and marshal to objects all pps from either an existing + ``pps.toml`` config, or from scratch from a ledger file when + none yet exists. + + ''' + conf, path = config.load('pps') + brokersection = conf.setdefault(brokername, {}) + pps = brokersection.setdefault(acctid, {}) + pp_objs = {} + + # no pps entry yet for this broker/account so parse any available + # ledgers to build a brand new pps state. + if not pps or update_from_ledger: + pp_objs = load_pps_from_ledger( + brokername, + acctid, + ) + + # Reload symbol specific ledger entries if requested by the + # caller **AND** none exist in the current pps state table. + elif ( + pps and reload_records + ): + # no pps entry yet for this broker/account so parse + # any available ledgers to build a pps state. + pp_objs = load_pps_from_ledger( + brokername, + acctid, + filter_by=reload_records, + ) + + if not pps: + log.warning( + f'No trade history could be loaded for {brokername}:{acctid}' + ) + + # unmarshal/load ``pps.toml`` config entries into object form. + for fqsn, entry in pps.items(): + bsuid = entry['bsuid'] + + # convert clears sub-tables (only in this form + # for toml re-presentation) back into a master table. + clears_list = entry['clears'] + + # index clears entries in "object" form by tid in a top + # level dict instead of a list (as is presented in our + # ``pps.toml``). + clears = {} + for clears_table in clears_list: + tid = clears_table.pop('tid') + clears[tid] = clears_table + + size = entry['size'] + + # TODO: an audit system for existing pps entries? + # if not len(clears) == abs(size): + # pp_objs = load_pps_from_ledger( + # brokername, + # acctid, + # filter_by=reload_records, + # ) + # reason = 'size <-> len(clears) mismatch' + # raise ValueError( + # '`pps.toml` entry is invalid:\n' + # f'{fqsn}\n' + # f'{pformat(entry)}' + # ) + + expiry = entry.get('expiry') + if expiry: + expiry = pendulum.parse(expiry) + + pp_objs[bsuid] = Position( + Symbol.from_fqsn(fqsn, info={}), + size=size, + be_price=entry['be_price'], + expiry=expiry, + bsuid=entry['bsuid'], + + # XXX: super critical, we need to be sure to include + # all pps.toml clears to avoid reusing clears that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + clears=clears, + ) + + return conf, pp_objs + + +def update_pps_conf( + brokername: str, + acctid: str, + + trade_records: Optional[list[Transaction]] = None, + ledger_reload: Optional[dict[str, str]] = None, + +) -> tuple[ + dict[str, Position], + dict[str, Position], +]: + + # this maps `.bsuid` values to positions + pp_objs: dict[Union[str, int], Position] + + if trade_records and ledger_reload: + for r in trade_records: + ledger_reload[r.bsuid] = r.fqsn + + conf, pp_objs = load_pps_from_toml( + brokername, + acctid, + reload_records=ledger_reload, + ) + + # update all pp objects from any (new) trade records which + # were passed in (aka incremental update case). + if trade_records: + pp_objs = update_pps( + trade_records, + pps=pp_objs, + ) + + pp_entries = {} # dict-serialize all active pps + # NOTE: newly closed position are also important to report/return + # since a consumer, like an order mode UI ;), might want to react + # based on the closure. + closed_pp_objs: dict[str, Position] = {} + + for bsuid in list(pp_objs): + pp = pp_objs[bsuid] + pp.minimize_clears() + + if ( + pp.size == 0 + + # drop time-expired positions (normally derivatives) + or (pp.expiry and pp.expiry < now()) + ): + # if expired the position is closed + pp.size = 0 + + # position is already closed aka "net zero" + closed_pp = pp_objs.pop(bsuid, None) + if closed_pp: + closed_pp_objs[bsuid] = closed_pp + + else: + # serialize to pre-toml form + asdict = pp.to_pretoml() + + if pp.expiry is None: + asdict.pop('expiry', None) + + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = asdict.pop('symbol') + fqsn = s.front_fqsn() + print(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.rstrip(f'.{brokername}') + + pp_entries[brokerless_key] = asdict + + conf[brokername][acctid] = pp_entries + + # TODO: why tf haven't they already done this for inline tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[toml.decoder.InlineTableDict] = enc.dump_inline_table + + config.write( + conf, + 'pps', + encoder=enc, + ) + + # deliver object form of all pps in table to caller + return pp_objs, closed_pp_objs + + +if __name__ == '__main__': + import sys + + args = sys.argv + assert len(args) > 1, 'Specifiy account(s) from `brokers.toml`' + args = args[1:] + for acctid in args: + broker, name = acctid.split('.') + update_pps_conf(broker, name) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 0abb6459..6a1ab01e 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -19,6 +19,7 @@ Position info and display """ from __future__ import annotations +from copy import copy from dataclasses import dataclass from functools import partial from math import floor, copysign @@ -105,8 +106,8 @@ async def update_pnl_from_feed( # compute and display pnl status order_mode.pane.pnl_label.format( pnl=copysign(1, size) * pnl( - # live.avg_price, - order_mode.current_pp.live_pp.avg_price, + # live.be_price, + order_mode.current_pp.live_pp.be_price, tick['price'], ), ) @@ -356,7 +357,7 @@ class SettingsPane: # last historical close price last = feed.shm.array[-1][['close']][0] pnl_value = copysign(1, size) * pnl( - tracker.live_pp.avg_price, + tracker.live_pp.be_price, last, ) @@ -476,7 +477,7 @@ class PositionTracker: self.alloc = alloc self.startup_pp = startup_pp - self.live_pp = startup_pp.copy() + self.live_pp = copy(startup_pp) view = chart.getViewBox() @@ -556,7 +557,7 @@ class PositionTracker: pp = position or self.live_pp self.update_line( - pp.avg_price, + pp.be_price, pp.size, self.chart.linked.symbol.lot_size_digits, ) @@ -570,7 +571,7 @@ class PositionTracker: self.hide() else: - self._level_marker.level = pp.avg_price + self._level_marker.level = pp.be_price # these updates are critical to avoid lag on view/scene changes self._level_marker.update() # trigger paint diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index a86fe816..f5a85d64 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -33,10 +33,10 @@ import trio from PyQt5.QtCore import Qt from .. import config +from ..pp import Position from ..clearing._client import open_ems, OrderBook from ..clearing._allocate import ( mk_allocator, - Position, ) from ._style import _font from ..data._source import Symbol @@ -59,7 +59,8 @@ log = get_logger(__name__) class OrderDialog(BaseModel): - '''Trade dialogue meta-data describing the lifetime + ''' + Trade dialogue meta-data describing the lifetime of an order submission to ``emsd`` from a chart. ''' @@ -87,7 +88,8 @@ def on_level_change_update_next_order_info( tracker: PositionTracker, ) -> None: - '''A callback applied for each level change to the line + ''' + A callback applied for each level change to the line which will recompute the order size based on allocator settings. this is assigned inside ``OrderMode.line_from_order()`` @@ -604,7 +606,10 @@ async def open_order_mode( startup_pp = Position( symbol=symbol, size=0, - avg_price=0, + be_price=0, + + # XXX: BLEH, do we care about this on the client side? + bsuid=symbol, ) msg = pps_by_account.get(account_name) if msg: diff --git a/setup.py b/setup.py index a495db2e..9ae98dbb 100755 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ setup( }, install_requires=[ 'toml', + 'tomli', # fastest pure py reader 'click', 'colorlog', 'attrs',