diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index af99e9ad..c18125f4 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1127,6 +1127,12 @@ async def load_aio_clients( # careful. timeout=connect_timeout, ) + # create and cache client + client = Client(ib) + + # update all actor-global caches + log.info(f"Caching client for {sockaddr}") + _client_cache[sockaddr] = client break except ( @@ -1150,21 +1156,9 @@ async def load_aio_clients( log.warning( f'Failed to connect on {port} for {i} time, retrying...') - # create and cache client - client = Client(ib) - # Pre-collect all accounts available for this # connection and map account names to this client # instance. - pps = ib.positions() - if pps: - for pp in pps: - accounts_found[ - accounts_def.inverse[pp.account] - ] = client - - # if there are accounts without positions we should still - # register them for this client for value in ib.accountValues(): acct_number = value.account @@ -1185,10 +1179,6 @@ async def load_aio_clients( f'{pformat(accounts_found)}' ) - # update all actor-global caches - log.info(f"Caching client for {sockaddr}") - _client_cache[sockaddr] = client - # XXX: why aren't we just updating this directy above # instead of using the intermediary `accounts_found`? _accounts2clients.update(accounts_found) @@ -1245,7 +1235,6 @@ async def open_client_proxies() -> tuple[ ]: async with ( tractor.trionics.maybe_open_context( - # acm_func=open_client_proxies, acm_func=tractor.to_asyncio.open_channel_from, kwargs={'target': load_clients_for_trio}, diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 9f384166..c2f03a4f 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations +from bisect import insort from contextlib import ExitStack from dataclasses import asdict from functools import partial @@ -36,8 +37,6 @@ from trio_typing import TaskStatus import tractor from ib_insync.contract import ( Contract, - # Option, - # Forex, ) from ib_insync.order import ( Trade, @@ -357,11 +356,24 @@ async def update_and_audit_msgs( # presume we're at least not more in the shit then we # thought. if diff: + reverse_split_ratio = pikersize / ibsize + split_ratio = 1/reverse_split_ratio + + if split_ratio >= reverse_split_ratio: + entry = f'split_ratio = {int(split_ratio)}' + else: + entry = f'split_ratio = 1/{int(reverse_split_ratio)}' + raise ValueError( f'POSITION MISMATCH ib <-> piker ledger:\n' f'ib: {ibppmsg}\n' f'piker: {msg}\n' - 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + f'reverse_split_ratio: {reverse_split_ratio}\n' + f'split_ratio: {split_ratio}\n\n' + 'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n' + 'If you are expecting a (reverse) split in this ' + 'instrument you should probably put the following ' + f'in the `pps.toml` section:\n{entry}' ) msg.size = ibsize @@ -394,11 +406,12 @@ async def update_and_audit_msgs( avg_price=p.ppu, ) if validate and p.size: - raise ValueError( - f'UNEXPECTED POSITION ib <-> piker ledger:\n' + # raise ValueError( + log.error( + f'UNEXPECTED POSITION says ib:\n' f'piker: {msg}\n' 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n' - 'MAYBE THEY LIQUIDATED YOU BRO!??!' + 'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?' ) msgs.append(msg) @@ -423,7 +436,6 @@ async def trades_dialogue( # deliver positions to subscriber before anything else all_positions = [] accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] acctids = set() cids2pps: dict[str, BrokerdPosition] = {} @@ -450,8 +462,8 @@ async def trades_dialogue( with ( ExitStack() as lstack, ): + # load ledgers and pps for all detected client-proxies for account, proxy in proxies.items(): - assert account in accounts_def accounts.add(account) acctid = account.strip('ib.') @@ -466,6 +478,7 @@ async def trades_dialogue( open_pps('ib', acctid) ) + for account, proxy in proxies.items(): client = aioclients[account] # process pp value reported from ib's system. we only use these @@ -473,28 +486,29 @@ async def trades_dialogue( # the so called (bs) "FIFO" style which more or less results in # a price that's not useful for traders who want to not lose # money.. xb - # for client in aioclients.values(): for pos in client.positions(): # collect all ib-pp reported positions so that we can be # sure know which positions to update from the ledger if # any are missing from the ``pps.toml`` bsuid, msg = pack_position(pos) + acctid = msg.account = accounts_def.inverse[msg.account] acctid = acctid.strip('ib.') cids2pps[(acctid, bsuid)] = msg assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') + ledger = ledgers[acctid] table = tables[acctid] + pp = table.pps.get(bsuid) if ( not pp or pp.size != msg.size ): trans = norm_trade_records(ledger) - updated = table.update_from_trans(trans) - pp = updated[bsuid] + table.update_from_trans(trans) # update trades ledgers for all accounts from connected # api clients which report trades for **this session**. @@ -521,9 +535,34 @@ async def trades_dialogue( trans = trans_by_acct.get(acctid) if trans: table.update_from_trans(trans) - updated = table.update_from_trans(trans) - assert msg.size == pp.size, 'WTF' + # XXX: not sure exactly why it wouldn't be in + # the updated output (maybe this is a bug?) but + # if you create a pos from TWS and then load it + # from the api trades it seems we get a key + # error from ``update[bsuid]`` ? + pp = table.pps.get(bsuid) + if not pp: + log.error( + f'The contract id for {msg} may have ' + f'changed to {bsuid}\nYou may need to ' + 'adjust your ledger for this, skipping ' + 'for now.' + ) + continue + + # XXX: not sure exactly why it wouldn't be in + # the updated output (maybe this is a bug?) but + # if you create a pos from TWS and then load it + # from the api trades it seems we get a key + # error from ``update[bsuid]`` ? + pp = table.pps[bsuid] + if msg.size != pp.size: + log.error( + 'Position mismatch {pp.symbol.front_fqsn()}:\n' + f'ib: {msg.size}\n' + f'piker: {pp.size}\n' + ) active_pps, closed_pps = table.dump_active() @@ -558,6 +597,7 @@ async def trades_dialogue( # proxy wrapper for starting trade event stream async def open_trade_event_stream( + client: Client, task_status: TaskStatus[ trio.abc.ReceiveChannel ] = trio.TASK_STATUS_IGNORED, @@ -575,18 +615,25 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): - trade_event_stream = await n.start(open_trade_event_stream) - clients.append((client, trade_event_stream)) - # start order request handler **before** local trades - # event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) + for client in set(aioclients.values()): + trade_event_stream = await n.start( + open_trade_event_stream, + client, + ) - # allocate event relay tasks for each client connection - for client, stream in clients: + # start order request handler **before** local trades + # event loop + n.start_soon( + handle_order_requests, + ems_stream, + accounts_def, + ) + + # allocate event relay tasks for each client connection n.start_soon( deliver_trade_events, - stream, + trade_event_stream, ems_stream, accounts_def, cids2pps, @@ -661,6 +708,22 @@ async def emit_pp_update( await ems_stream.send(msg) +_statuses: dict[str, str] = { + 'cancelled': 'canceled', + 'submitted': 'open', + + # XXX: just pass these through? it duplicates actual fill events other + # then the case where you the `.remaining == 0` case which is our + # 'closed'` case. + # 'filled': 'pending', + # 'pendingsubmit': 'pending', + + # TODO: see a current ``ib_insync`` issue around this: + # https://github.com/erdewit/ib_insync/issues/363 + 'inactive': 'pending', +} + + async def deliver_trade_events( trade_event_stream: trio.MemoryReceiveChannel, @@ -914,11 +977,13 @@ def norm_trade_records( ledger into our standard record format. ''' - records: dict[str, Transaction] = {} + records: list[Transaction] = [] for tid, record in ledger.items(): conid = record.get('conId') or record['conid'] - comms = record.get('commission') or -1*record['ibCommission'] + comms = record.get('commission') + if comms is None: + comms = -1*record['ibCommission'] price = record.get('price') or record['tradePrice'] # the api doesn't do the -/+ on the quantity for you but flex @@ -991,19 +1056,22 @@ def norm_trade_records( # should already have entries if the pps are still open, in # which case, we can pull the fqsn from that table (see # `trades_dialogue()` above). - - records[tid] = Transaction( - fqsn=fqsn, - tid=tid, - size=size, - price=price, - cost=comms, - dt=dt, - expiry=expiry, - bsuid=conid, + insort( + records, + Transaction( + fqsn=fqsn, + tid=tid, + size=size, + price=price, + cost=comms, + dt=dt, + expiry=expiry, + bsuid=conid, + ), + key=lambda t: t.dt ) - return records + return {r.tid: r for r in records} def trades_to_ledger_entries( diff --git a/piker/pp.py b/piker/pp.py index 456c2c4f..8fdaaa4d 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -134,11 +134,14 @@ class Position(Struct): # unique backend symbol id bsuid: str + split_ratio: Optional[int] = None + # ordered record of known constituent trade messages clears: dict[ Union[str, int, Status], # trade id dict[str, Any], # transaction history summaries ] = {} + first_clear_dt: Optional[datetime] = None expiry: Optional[datetime] = None @@ -159,6 +162,12 @@ class Position(Struct): clears = d.pop('clears') expiry = d.pop('expiry') + if self.split_ratio is None: + d.pop('split_ratio') + + # should be obvious from clears/event table + d.pop('first_clear_dt') + # TODO: we need to figure out how to have one top level # listing venue here even when the backend isn't providing # it via the trades ledger.. @@ -166,16 +175,14 @@ class Position(Struct): s = d.pop('symbol') fqsn = s.front_fqsn() - size = d.pop('size') - ppu = d.pop('ppu') - d['size'], d['ppu'] = self.audit_sizing(size, ppu) - if self.expiry is None: d.pop('expiry', None) elif expiry: d['expiry'] = str(expiry) toml_clears_list = [] + + # reverse sort so latest clears are at top of section? for tid, data in sorted( list(clears.items()), @@ -184,7 +191,8 @@ class Position(Struct): ): inline_table = toml.TomlDecoder().get_empty_inline_table() - inline_table['dt'] = data['dt'] + # serialize datetime to parsable `str` + inline_table['dt'] = str(data['dt']) # insert optional clear fields in column order for k in ['ppu', 'accum_size']: @@ -203,35 +211,51 @@ class Position(Struct): return fqsn, d - def audit_sizing( - self, - size: Optional[float] = None, - ppu: Optional[float] = None, - - ) -> tuple[float, float]: + def ensure_state(self) -> None: ''' - Audit either the `.size` and `.ppu` values or equvialent - passed in values against the clears table calculations and - return the calc-ed values if they differ and log warnings to - console. + Audit either the `.size` and `.ppu` local instance vars against + the clears table calculations and return the calc-ed values if + they differ and log warnings to console. ''' - size = size or self.size - ppu = ppu or self.ppu + clears = list(self.clears.values()) + self.first_clear_dt = min(list(entry['dt'] for entry in clears)) + last_clear = clears[-1] + csize = self.calc_size() - cppu = self.calc_ppu() + accum = last_clear['accum_size'] + if not self.expired(): + if ( + csize != accum + and csize != round(accum * self.split_ratio or 1) + ): + raise ValueError(f'Size mismatch: {csize}') + else: + assert csize == 0, 'Contract is expired but non-zero size?' - if size != csize: - log.warning(f'size != calculated size: {size} != {csize}') - size = csize - - if ppu != cppu: + if self.size != csize: log.warning( - f'ppu != calculated ppu: {ppu} != {cppu}' + 'Position state mismatch:\n' + f'{self.size} => {csize}' ) - ppu = cppu + self.size = csize - return size, ppu + cppu = self.calc_ppu() + ppu = last_clear['ppu'] + if ( + cppu != ppu + and self.split_ratio is not None + # handle any split info entered (for now) manually by user + and cppu != (ppu / self.split_ratio) + ): + raise ValueError(f'PPU mismatch: {cppu}') + + if self.ppu != cppu: + log.warning( + 'Position state mismatch:\n' + f'{self.ppu} => {cppu}' + ) + self.ppu = cppu def update_from_msg( self, @@ -384,12 +408,40 @@ class Position(Struct): asize_h.append(accum_size) ppu_h.append(ppu_h[-1]) - return ppu_h[-1] if ppu_h else 0 + final_ppu = ppu_h[-1] if ppu_h else 0 + + # handle any split info entered (for now) manually by user + if self.split_ratio is not None: + final_ppu /= self.split_ratio + + return final_ppu + + def expired(self) -> bool: + ''' + Predicate which checks if the contract/instrument is past its expiry. + + ''' + return bool(self.expiry) and self.expiry < now() def calc_size(self) -> float: + ''' + Calculate the unit size of this position in the destination + asset using the clears/trade event table; zero if expired. + + ''' size: float = 0 + + # time-expired pps (normally derivatives) are "closed" + # and have a zero size. + if self.expired(): + return 0 + for tid, entry in self.clears.items(): size += entry['size'] + + if self.split_ratio is not None: + size = round(size * self.split_ratio) + return size def minimize_clears( @@ -434,20 +486,24 @@ class Position(Struct): 'cost': t.cost, 'price': t.price, 'size': t.size, - 'dt': str(t.dt), + 'dt': t.dt, } # TODO: compute these incrementally instead # of re-looping through each time resulting in O(n**2) - # behaviour.. - # compute these **after** adding the entry - # in order to make the recurrence relation math work - # inside ``.calc_size()``. + # behaviour..? + + # NOTE: we compute these **after** adding the entry in order to + # make the recurrence relation math work inside + # ``.calc_size()``. self.size = clear['accum_size'] = self.calc_size() self.ppu = clear['ppu'] = self.calc_ppu() return clear + def sugest_split(self) -> float: + ... + class PpTable(Struct): @@ -484,16 +540,22 @@ class PpTable(Struct): expiry=t.expiry, ) ) + clears = pp.clears + if clears: + first_clear_dt = pp.first_clear_dt - # don't do updates for ledger records we already have - # included in the current pps state. - if t.tid in pp.clears: - # NOTE: likely you'll see repeats of the same - # ``Transaction`` passed in here if/when you are restarting - # a ``brokerd.ib`` where the API will re-report trades from - # the current session, so we need to make sure we don't - # "double count" these in pp calculations. - continue + # don't do updates for ledger records we already have + # included in the current pps state. + if ( + t.tid in clears + or first_clear_dt and t.dt < first_clear_dt + ): + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations. + continue # update clearing table pp.add_clear(t) @@ -501,7 +563,7 @@ class PpTable(Struct): # minimize clears tables and update sizing. for bsuid, pp in updated.items(): - pp.size, pp.ppu = pp.audit_sizing() + pp.ensure_state() return updated @@ -534,7 +596,7 @@ class PpTable(Struct): # if bsuid == qqqbsuid: # breakpoint() - pp.size, pp.ppu = pp.audit_sizing() + pp.ensure_state() if ( # "net-zero" is a "closed" position @@ -574,6 +636,7 @@ class PpTable(Struct): # keep the minimal amount of clears that make up this # position since the last net-zero state. pos.minimize_clears() + pos.ensure_state() # serialize to pre-toml form fqsn, asdict = pos.to_pretoml() @@ -835,19 +898,35 @@ def open_pps( # index clears entries in "object" form by tid in a top # level dict instead of a list (as is presented in our # ``pps.toml``). - pp = pp_objs.get(bsuid) - if pp: - clears = pp.clears - else: - clears = {} + clears = pp_objs.setdefault(bsuid, {}) + + # TODO: should be make a ``Struct`` for clear/event entries? + # convert "clear events table" from the toml config (list of + # a dicts) and load it into object form for use in position + # processing of new clear events. + trans: list[Transaction] = [] for clears_table in clears_list: tid = clears_table.pop('tid') + dtstr = clears_table['dt'] + dt = pendulum.parse(dtstr) + clears_table['dt'] = dt + trans.append(Transaction( + fqsn=bsuid, + bsuid=bsuid, + tid=tid, + size=clears_table['size'], + price=clears_table['price'], + cost=clears_table['cost'], + dt=dt, + )) clears[tid] = clears_table size = entry['size'] + # TODO: remove but, handle old field name for now ppu = entry.get('ppu', entry.get('be_price', 0)) + split_ratio = entry.get('split_ratio') expiry = entry.get('expiry') if expiry: @@ -857,19 +936,21 @@ def open_pps( Symbol.from_fqsn(fqsn, info={}), size=size, ppu=ppu, + split_ratio=split_ratio, expiry=expiry, bsuid=entry['bsuid'], - - # XXX: super critical, we need to be sure to include - # all pps.toml clears to avoid reusing clears that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - clears=clears, ) + # XXX: super critical, we need to be sure to include + # all pps.toml clears to avoid reusing clears that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + for t in trans: + pp.add_clear(t) + # audit entries loaded from toml - pp.size, pp.ppu = pp.audit_sizing() + pp.ensure_state() try: yield table