From 6856ca207f786ebc03d60a84b44aedeb6adc65dd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 8 Aug 2022 13:47:17 -0400 Subject: [PATCH 01/10] Fix for TWS created position loading --- piker/brokers/ib/broker.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 9f384166..e1b2505e 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -494,8 +494,6 @@ async def trades_dialogue( ): trans = norm_trade_records(ledger) updated = table.update_from_trans(trans) - pp = updated[bsuid] - # update trades ledgers for all accounts from connected # api clients which report trades for **this session**. trades = await proxy.trades() @@ -523,7 +521,18 @@ async def trades_dialogue( table.update_from_trans(trans) updated = table.update_from_trans(trans) - assert msg.size == pp.size, 'WTF' + # XXX: not sure exactly why it wouldn't be in + # the updated output (maybe this is a bug?) but + # if you create a pos from TWS and then load it + # from the api trades it seems we get a key + # error from ``update[bsuid]`` ? + pp = table.pps[bsuid] + if msg.size != pp.size: + log.error( + 'Position mismatch {pp.symbol.front_fqsn()}:\n' + f'ib: {msg.size}\n' + f'piker: {pp.size}\n' + ) active_pps, closed_pps = table.dump_active() From 7bec989eedc7fe7c49a8b13f6d940e3735788d96 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Aug 2022 18:01:41 -0400 Subject: [PATCH 02/10] First try mega-basic stock (reverse) split support with `ib` and `pps.toml` --- piker/brokers/ib/broker.py | 48 +++++++++++++++++++++++++++++++++----- piker/pp.py | 19 ++++++++++++++- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index e1b2505e..0538540f 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -36,8 +36,6 @@ from trio_typing import TaskStatus import tractor from ib_insync.contract import ( Contract, - # Option, - # Forex, ) from ib_insync.order import ( Trade, @@ -357,11 +355,24 @@ async def update_and_audit_msgs( # presume we're at least not more in the shit then we # thought. if diff: + reverse_split_ratio = pikersize / ibsize + split_ratio = 1/reverse_split_ratio + + if split_ratio >= reverse_split_ratio: + entry = f'split_ratio = {int(split_ratio)}' + else: + entry = f'split_ratio = 1/{int(reverse_split_ratio)}' + raise ValueError( f'POSITION MISMATCH ib <-> piker ledger:\n' f'ib: {ibppmsg}\n' f'piker: {msg}\n' - 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + f'reverse_split_ratio: {reverse_split_ratio}\n' + f'split_ratio: {split_ratio}\n\n' + 'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n' + 'If you are expecting a (reverse) split in this ' + 'instrument you should probably put the following ' + f'in the `pps.toml` section:\n{entry}' ) msg.size = ibsize @@ -480,6 +491,7 @@ async def trades_dialogue( # sure know which positions to update from the ledger if # any are missing from the ``pps.toml`` bsuid, msg = pack_position(pos) + acctid = msg.account = accounts_def.inverse[msg.account] acctid = acctid.strip('ib.') cids2pps[(acctid, bsuid)] = msg @@ -493,7 +505,7 @@ async def trades_dialogue( or pp.size != msg.size ): trans = norm_trade_records(ledger) - updated = table.update_from_trans(trans) + table.update_from_trans(trans) # update trades ledgers for all accounts from connected # api clients which report trades for **this session**. trades = await proxy.trades() @@ -519,14 +531,22 @@ async def trades_dialogue( trans = trans_by_acct.get(acctid) if trans: table.update_from_trans(trans) - updated = table.update_from_trans(trans) # XXX: not sure exactly why it wouldn't be in # the updated output (maybe this is a bug?) but # if you create a pos from TWS and then load it # from the api trades it seems we get a key # error from ``update[bsuid]`` ? - pp = table.pps[bsuid] + pp = table.pps.get(bsuid) + if not pp: + log.error( + f'The contract id for {msg} may have ' + f'changed to {bsuid}\nYou may need to ' + 'adjust your ledger for this, skipping ' + 'for now.' + ) + continue + if msg.size != pp.size: log.error( 'Position mismatch {pp.symbol.front_fqsn()}:\n' @@ -670,6 +690,22 @@ async def emit_pp_update( await ems_stream.send(msg) +_statuses: dict[str, str] = { + 'cancelled': 'canceled', + 'submitted': 'open', + + # XXX: just pass these through? it duplicates actual fill events other + # then the case where you the `.remaining == 0` case which is our + # 'closed'` case. + # 'filled': 'pending', + # 'pendingsubmit': 'pending', + + # TODO: see a current ``ib_insync`` issue around this: + # https://github.com/erdewit/ib_insync/issues/363 + 'inactive': 'pending', +} + + async def deliver_trade_events( trade_event_stream: trio.MemoryReceiveChannel, diff --git a/piker/pp.py b/piker/pp.py index 456c2c4f..0ff8c9d5 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -134,6 +134,8 @@ class Position(Struct): # unique backend symbol id bsuid: str + split_ratio: Optional[int] = None + # ordered record of known constituent trade messages clears: dict[ Union[str, int, Status], # trade id @@ -159,6 +161,9 @@ class Position(Struct): clears = d.pop('clears') expiry = d.pop('expiry') + if self.split_ratio is None: + d.pop('split_ratio') + # TODO: we need to figure out how to have one top level # listing venue here even when the backend isn't providing # it via the trades ledger.. @@ -384,12 +389,22 @@ class Position(Struct): asize_h.append(accum_size) ppu_h.append(ppu_h[-1]) - return ppu_h[-1] if ppu_h else 0 + final_ppu = ppu_h[-1] if ppu_h else 0 + + # handle any split info entered (for now) manually by user + if self.split_ratio is not None: + final_ppu /= self.split_ratio + + return final_ppu def calc_size(self) -> float: size: float = 0 for tid, entry in self.clears.items(): size += entry['size'] + + if self.split_ratio is not None: + size = round(size * self.split_ratio) + return size def minimize_clears( @@ -848,6 +863,7 @@ def open_pps( size = entry['size'] # TODO: remove but, handle old field name for now ppu = entry.get('ppu', entry.get('be_price', 0)) + split_ratio = entry.get('split_ratio') expiry = entry.get('expiry') if expiry: @@ -857,6 +873,7 @@ def open_pps( Symbol.from_fqsn(fqsn, info={}), size=size, ppu=ppu, + split_ratio=split_ratio, expiry=expiry, bsuid=entry['bsuid'], From 0cf4e07b84e66c5395fc7009a7ea96dfcb4af273 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 15 Aug 2022 10:42:58 -0400 Subject: [PATCH 03/10] Use `datetime` sorting on clears table appends In order to avoid issues with reloading ledger and API trades after an existing `pps.toml` exists we have to make sure we not only avoid duplicate entries but also avoid re-adding entries that would have been removed during a prior call to the `Position.minimize_clears()` filter. The easiest way to do this is to sort on timestamps and avoid adding any record that pre-existed the last net-zero position ledger event that `.minimize_clears()` discarded. In order to implement this it means parsing config file clears table's timestamps into datetime objects for inequality checks and we add a `Position.first_clear_dt` attr for storing this value when managing pps in object form but never store it in the config (since it should be obviously from the sorted clear event table). --- piker/pp.py | 60 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 0ff8c9d5..27ab7af8 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -141,6 +141,7 @@ class Position(Struct): Union[str, int, Status], # trade id dict[str, Any], # transaction history summaries ] = {} + first_clear_dt: Optional[datetime] = None expiry: Optional[datetime] = None @@ -164,6 +165,9 @@ class Position(Struct): if self.split_ratio is None: d.pop('split_ratio') + # should be obvious from clears/event table + d.pop('first_clear_dt') + # TODO: we need to figure out how to have one top level # listing venue here even when the backend isn't providing # it via the trades ledger.. @@ -189,7 +193,8 @@ class Position(Struct): ): inline_table = toml.TomlDecoder().get_empty_inline_table() - inline_table['dt'] = data['dt'] + # serialize datetime to parsable `str` + inline_table['dt'] = str(data['dt']) # insert optional clear fields in column order for k in ['ppu', 'accum_size']: @@ -236,6 +241,10 @@ class Position(Struct): ) ppu = cppu + self.first_clear_dt = min( + list(entry['dt'] for entry in self.clears.values()) + ) + return size, ppu def update_from_msg( @@ -449,15 +458,16 @@ class Position(Struct): 'cost': t.cost, 'price': t.price, 'size': t.size, - 'dt': str(t.dt), + 'dt': t.dt, } # TODO: compute these incrementally instead # of re-looping through each time resulting in O(n**2) - # behaviour.. - # compute these **after** adding the entry - # in order to make the recurrence relation math work - # inside ``.calc_size()``. + # behaviour..? + + # NOTE: we compute these **after** adding the entry in order to + # make the recurrence relation math work inside + # ``.calc_size()``. self.size = clear['accum_size'] = self.calc_size() self.ppu = clear['ppu'] = self.calc_ppu() @@ -499,16 +509,22 @@ class PpTable(Struct): expiry=t.expiry, ) ) + clears = pp.clears + if clears: + first_clear_dt = pp.first_clear_dt - # don't do updates for ledger records we already have - # included in the current pps state. - if t.tid in pp.clears: - # NOTE: likely you'll see repeats of the same - # ``Transaction`` passed in here if/when you are restarting - # a ``brokerd.ib`` where the API will re-report trades from - # the current session, so we need to make sure we don't - # "double count" these in pp calculations. - continue + # don't do updates for ledger records we already have + # included in the current pps state. + if ( + t.tid in clears + or first_clear_dt and t.dt < first_clear_dt + ): + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations. + continue # update clearing table pp.add_clear(t) @@ -850,17 +866,21 @@ def open_pps( # index clears entries in "object" form by tid in a top # level dict instead of a list (as is presented in our # ``pps.toml``). - pp = pp_objs.get(bsuid) - if pp: - clears = pp.clears - else: - clears = {} + clears = pp_objs.setdefault(bsuid, {}) + # TODO: should be make a ``Struct`` for clear/event entries? + # convert "clear events table" from the toml config (list of + # a dicts) and load it into object form for use in position + # processing of new clear events. for clears_table in clears_list: tid = clears_table.pop('tid') + dtstr = clears_table['dt'] + dt = pendulum.parse(dtstr) + clears_table['dt'] = dt clears[tid] = clears_table size = entry['size'] + # TODO: remove but, handle old field name for now ppu = entry.get('ppu', entry.get('be_price', 0)) split_ratio = entry.get('split_ratio') From 941a2196b37dcfb056a3949d919ca63ca20ca829 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Aug 2022 10:06:37 -0400 Subject: [PATCH 04/10] Get pos entry from table not `updated: dict` output --- piker/brokers/ib/broker.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 0538540f..4a156078 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -547,6 +547,12 @@ async def trades_dialogue( ) continue + # XXX: not sure exactly why it wouldn't be in + # the updated output (maybe this is a bug?) but + # if you create a pos from TWS and then load it + # from the api trades it seems we get a key + # error from ``update[bsuid]`` ? + pp = table.pps[bsuid] if msg.size != pp.size: log.error( 'Position mismatch {pp.symbol.front_fqsn()}:\n' From 23ba0e5e6971f3e08bc1ad36bfa9b20a53b45252 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 15 Aug 2022 13:35:43 -0400 Subject: [PATCH 05/10] Don't raise on missing position for now, just error log --- piker/brokers/ib/broker.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 4a156078..90f5a2d3 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -405,11 +405,12 @@ async def update_and_audit_msgs( avg_price=p.ppu, ) if validate and p.size: - raise ValueError( - f'UNEXPECTED POSITION ib <-> piker ledger:\n' + # raise ValueError( + log.error( + f'UNEXPECTED POSITION says ib:\n' f'piker: {msg}\n' 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n' - 'MAYBE THEY LIQUIDATED YOU BRO!??!' + 'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?' ) msgs.append(msg) @@ -506,6 +507,7 @@ async def trades_dialogue( ): trans = norm_trade_records(ledger) table.update_from_trans(trans) + # update trades ledgers for all accounts from connected # api clients which report trades for **this session**. trades = await proxy.trades() From 999ae5a1c6c3f2d4aaa05c5a6119f02a39fbdd1c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Aug 2022 08:49:26 -0400 Subject: [PATCH 06/10] Handle `Position.split_ratio` in state audits This firstly changes `.audit_sizing()` => `.ensure_state()` and makes it return `None` as well as only error when split ratio denoted (via config) positions do not size as expected. Further refinements, - add an `.expired()` predicate method - always return a size of zero from `.calc_size()` on expired assets - load each `pps.toml` entry's clear tabe into `Transaction`s and use `.add_clear()` during from config init. --- piker/pp.py | 120 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 82 insertions(+), 38 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 27ab7af8..8fdaaa4d 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -175,16 +175,14 @@ class Position(Struct): s = d.pop('symbol') fqsn = s.front_fqsn() - size = d.pop('size') - ppu = d.pop('ppu') - d['size'], d['ppu'] = self.audit_sizing(size, ppu) - if self.expiry is None: d.pop('expiry', None) elif expiry: d['expiry'] = str(expiry) toml_clears_list = [] + + # reverse sort so latest clears are at top of section? for tid, data in sorted( list(clears.items()), @@ -213,39 +211,51 @@ class Position(Struct): return fqsn, d - def audit_sizing( - self, - size: Optional[float] = None, - ppu: Optional[float] = None, - - ) -> tuple[float, float]: + def ensure_state(self) -> None: ''' - Audit either the `.size` and `.ppu` values or equvialent - passed in values against the clears table calculations and - return the calc-ed values if they differ and log warnings to - console. + Audit either the `.size` and `.ppu` local instance vars against + the clears table calculations and return the calc-ed values if + they differ and log warnings to console. ''' - size = size or self.size - ppu = ppu or self.ppu + clears = list(self.clears.values()) + self.first_clear_dt = min(list(entry['dt'] for entry in clears)) + last_clear = clears[-1] + csize = self.calc_size() - cppu = self.calc_ppu() + accum = last_clear['accum_size'] + if not self.expired(): + if ( + csize != accum + and csize != round(accum * self.split_ratio or 1) + ): + raise ValueError(f'Size mismatch: {csize}') + else: + assert csize == 0, 'Contract is expired but non-zero size?' - if size != csize: - log.warning(f'size != calculated size: {size} != {csize}') - size = csize - - if ppu != cppu: + if self.size != csize: log.warning( - f'ppu != calculated ppu: {ppu} != {cppu}' + 'Position state mismatch:\n' + f'{self.size} => {csize}' ) - ppu = cppu + self.size = csize - self.first_clear_dt = min( - list(entry['dt'] for entry in self.clears.values()) - ) + cppu = self.calc_ppu() + ppu = last_clear['ppu'] + if ( + cppu != ppu + and self.split_ratio is not None + # handle any split info entered (for now) manually by user + and cppu != (ppu / self.split_ratio) + ): + raise ValueError(f'PPU mismatch: {cppu}') - return size, ppu + if self.ppu != cppu: + log.warning( + 'Position state mismatch:\n' + f'{self.ppu} => {cppu}' + ) + self.ppu = cppu def update_from_msg( self, @@ -406,8 +416,26 @@ class Position(Struct): return final_ppu + def expired(self) -> bool: + ''' + Predicate which checks if the contract/instrument is past its expiry. + + ''' + return bool(self.expiry) and self.expiry < now() + def calc_size(self) -> float: + ''' + Calculate the unit size of this position in the destination + asset using the clears/trade event table; zero if expired. + + ''' size: float = 0 + + # time-expired pps (normally derivatives) are "closed" + # and have a zero size. + if self.expired(): + return 0 + for tid, entry in self.clears.items(): size += entry['size'] @@ -473,6 +501,9 @@ class Position(Struct): return clear + def sugest_split(self) -> float: + ... + class PpTable(Struct): @@ -532,7 +563,7 @@ class PpTable(Struct): # minimize clears tables and update sizing. for bsuid, pp in updated.items(): - pp.size, pp.ppu = pp.audit_sizing() + pp.ensure_state() return updated @@ -565,7 +596,7 @@ class PpTable(Struct): # if bsuid == qqqbsuid: # breakpoint() - pp.size, pp.ppu = pp.audit_sizing() + pp.ensure_state() if ( # "net-zero" is a "closed" position @@ -605,6 +636,7 @@ class PpTable(Struct): # keep the minimal amount of clears that make up this # position since the last net-zero state. pos.minimize_clears() + pos.ensure_state() # serialize to pre-toml form fqsn, asdict = pos.to_pretoml() @@ -872,11 +904,22 @@ def open_pps( # convert "clear events table" from the toml config (list of # a dicts) and load it into object form for use in position # processing of new clear events. + trans: list[Transaction] = [] + for clears_table in clears_list: tid = clears_table.pop('tid') dtstr = clears_table['dt'] dt = pendulum.parse(dtstr) clears_table['dt'] = dt + trans.append(Transaction( + fqsn=bsuid, + bsuid=bsuid, + tid=tid, + size=clears_table['size'], + price=clears_table['price'], + cost=clears_table['cost'], + dt=dt, + )) clears[tid] = clears_table size = entry['size'] @@ -896,17 +939,18 @@ def open_pps( split_ratio=split_ratio, expiry=expiry, bsuid=entry['bsuid'], - - # XXX: super critical, we need to be sure to include - # all pps.toml clears to avoid reusing clears that were - # already included in the current incremental update - # state, since today's records may have already been - # processed! - clears=clears, ) + # XXX: super critical, we need to be sure to include + # all pps.toml clears to avoid reusing clears that were + # already included in the current incremental update + # state, since today's records may have already been + # processed! + for t in trans: + pp.add_clear(t) + # audit entries loaded from toml - pp.size, pp.ppu = pp.audit_sizing() + pp.ensure_state() try: yield table From 73d2e7716f052943d317a294bff6ea576435f2dd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Aug 2022 10:18:43 -0400 Subject: [PATCH 07/10] Pre-loop clients to build out pps tables, handle missing commission field --- piker/brokers/ib/api.py | 1 - piker/brokers/ib/broker.py | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index af99e9ad..4616fe52 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1245,7 +1245,6 @@ async def open_client_proxies() -> tuple[ ]: async with ( tractor.trionics.maybe_open_context( - # acm_func=open_client_proxies, acm_func=tractor.to_asyncio.open_channel_from, kwargs={'target': load_clients_for_trio}, diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 90f5a2d3..d60a9d43 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -462,8 +462,8 @@ async def trades_dialogue( with ( ExitStack() as lstack, ): + # load ledgers and pps for all detected client-proxies for account, proxy in proxies.items(): - assert account in accounts_def accounts.add(account) acctid = account.strip('ib.') @@ -478,6 +478,7 @@ async def trades_dialogue( open_pps('ib', acctid) ) + for account, proxy in proxies.items(): client = aioclients[account] # process pp value reported from ib's system. we only use these @@ -971,7 +972,9 @@ def norm_trade_records( for tid, record in ledger.items(): conid = record.get('conId') or record['conid'] - comms = record.get('commission') or -1*record['ibCommission'] + comms = record.get('commission') + if comms is None: + comms = -1*record['ibCommission'] price = record.get('price') or record['tradePrice'] # the api doesn't do the -/+ on the quantity for you but flex From 0fb07670d2277b9d3f8a695447bba472d9326c03 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Aug 2022 13:05:02 -0400 Subject: [PATCH 08/10] Fix multi-account positioning and order tracking.. This seems to have been broken in refactoring from commit 279c899de53 which was never tested against multiple accounts/clients. The fix is 2 part: - position tables are now correctly loaded ahead of time and used by account for each connected client in processing of ledgers and existing positions. - a task for each API client is started (as implemented prior) so that we actually get status updates for every client used for submissions. Further we add a bit of code using `bisect.insort()` to normalize ledgers to a datetime sorted list records (though pretty sure the `dict` transform ruins it?) in an effort to avoid issues with ledger transaction processing with previously minimized `Position.clears` tables, which should (but might not?) avoid incorporating clear events prior to the last "net-zero" positioning state. --- piker/brokers/ib/broker.py | 56 +++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index d60a9d43..3303d9dc 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations +from bisect import insort from contextlib import ExitStack from dataclasses import asdict from functools import partial @@ -435,7 +436,6 @@ async def trades_dialogue( # deliver positions to subscriber before anything else all_positions = [] accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] acctids = set() cids2pps: dict[str, BrokerdPosition] = {} @@ -486,7 +486,6 @@ async def trades_dialogue( # the so called (bs) "FIFO" style which more or less results in # a price that's not useful for traders who want to not lose # money.. xb - # for client in aioclients.values(): for pos in client.positions(): # collect all ib-pp reported positions so that we can be @@ -500,7 +499,9 @@ async def trades_dialogue( assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') + ledger = ledgers[acctid] table = tables[acctid] + pp = table.pps.get(bsuid) if ( not pp @@ -596,6 +597,7 @@ async def trades_dialogue( # proxy wrapper for starting trade event stream async def open_trade_event_stream( + client: Client, task_status: TaskStatus[ trio.abc.ReceiveChannel ] = trio.TASK_STATUS_IGNORED, @@ -613,18 +615,25 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): - trade_event_stream = await n.start(open_trade_event_stream) - clients.append((client, trade_event_stream)) - # start order request handler **before** local trades - # event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) + for client in aioclients.values(): + trade_event_stream = await n.start( + open_trade_event_stream, + client, + ) - # allocate event relay tasks for each client connection - for client, stream in clients: + # start order request handler **before** local trades + # event loop + n.start_soon( + handle_order_requests, + ems_stream, + accounts_def, + ) + + # allocate event relay tasks for each client connection n.start_soon( deliver_trade_events, - stream, + trade_event_stream, ems_stream, accounts_def, cids2pps, @@ -968,7 +977,7 @@ def norm_trade_records( ledger into our standard record format. ''' - records: dict[str, Transaction] = {} + records: list[Transaction] = [] for tid, record in ledger.items(): conid = record.get('conId') or record['conid'] @@ -1047,19 +1056,22 @@ def norm_trade_records( # should already have entries if the pps are still open, in # which case, we can pull the fqsn from that table (see # `trades_dialogue()` above). - - records[tid] = Transaction( - fqsn=fqsn, - tid=tid, - size=size, - price=price, - cost=comms, - dt=dt, - expiry=expiry, - bsuid=conid, + insort( + records, + Transaction( + fqsn=fqsn, + tid=tid, + size=size, + price=price, + cost=comms, + dt=dt, + expiry=expiry, + bsuid=conid, + ), + key=lambda t: t.dt ) - return records + return {r.tid: r for r in records} def trades_to_ledger_entries( From f202699c25345b81cc7c784b46c2381d68351eef Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 18 Aug 2022 11:31:18 -0400 Subject: [PATCH 09/10] Fix scan loop: only stash clients that actually connect.. --- piker/brokers/ib/api.py | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 4616fe52..c18125f4 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1127,6 +1127,12 @@ async def load_aio_clients( # careful. timeout=connect_timeout, ) + # create and cache client + client = Client(ib) + + # update all actor-global caches + log.info(f"Caching client for {sockaddr}") + _client_cache[sockaddr] = client break except ( @@ -1150,21 +1156,9 @@ async def load_aio_clients( log.warning( f'Failed to connect on {port} for {i} time, retrying...') - # create and cache client - client = Client(ib) - # Pre-collect all accounts available for this # connection and map account names to this client # instance. - pps = ib.positions() - if pps: - for pp in pps: - accounts_found[ - accounts_def.inverse[pp.account] - ] = client - - # if there are accounts without positions we should still - # register them for this client for value in ib.accountValues(): acct_number = value.account @@ -1185,10 +1179,6 @@ async def load_aio_clients( f'{pformat(accounts_found)}' ) - # update all actor-global caches - log.info(f"Caching client for {sockaddr}") - _client_cache[sockaddr] = client - # XXX: why aren't we just updating this directy above # instead of using the intermediary `accounts_found`? _accounts2clients.update(accounts_found) From 52febac6ae5add7de83d2aec290826e88064ff43 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 18 Aug 2022 11:33:47 -0400 Subject: [PATCH 10/10] Facepalm: order-handler tasks are one-to-one with unique clients --- piker/brokers/ib/broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 3303d9dc..c2f03a4f 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -616,7 +616,7 @@ async def trades_dialogue( trio.open_nursery() as n, ): - for client in aioclients.values(): + for client in set(aioclients.values()): trade_event_stream = await n.start( open_trade_event_stream, client,