From 60a6f3269c81221f7bc730d0d58215c0c4dd6a84 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 May 2023 09:41:44 -0400 Subject: [PATCH] ib: use flex report datetime sort Since `open_trade_ledger()` now requires a sort we pass in a combo of the std `pendulum.parse()` for API records and a custom flex parser for flex entries pulled offline. Add special handling for `MktPair.src` such that when it's a fiat (like it should always be for most legacy assets) we try to get the fqme without that `.src` token (i.e. not mnqusd) to avoid breaking roundtripping of live feed requests (due to new symbology) as well as the current tsdb table key set.. Do a wholesale renaming of fqsn -> fqme in most of the rest of the backend modules. --- piker/brokers/ib/_flex_reports.py | 4 +++ piker/brokers/ib/_util.py | 4 +++ piker/brokers/ib/api.py | 15 +++++++--- piker/brokers/ib/broker.py | 16 ++++++++--- piker/brokers/ib/feed.py | 47 ++++++++++++++++++------------- 5 files changed, 58 insertions(+), 28 deletions(-) diff --git a/piker/brokers/ib/_flex_reports.py b/piker/brokers/ib/_flex_reports.py index d26e0e3f..2f34d037 100644 --- a/piker/brokers/ib/_flex_reports.py +++ b/piker/brokers/ib/_flex_reports.py @@ -35,6 +35,10 @@ from piker.accounting import ( def parse_flex_dt( record: str, ) -> pendulum.datetime: + ''' + Parse stupid flex record datetime stamps for the `dateTime` field.. + + ''' date, ts = record.split(';') dt = pendulum.parse(date) ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 4c3bbb34..585ea18d 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -170,6 +170,10 @@ async def vnc_click_hack( def i3ipc_xdotool_manual_click_hack() -> None: i3 = i3ipc.Connection() + + # TODO: might be worth offering some kinda api for grabbing + # the window id from the pid? + # https://stackoverflow.com/a/2250879 t = i3.get_tree() orig_win_id = t.find_focused().window diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index e9a9dc49..e64f085f 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -14,11 +14,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -``ib`` core API client machinery; mostly sane wrapping around -``ib_insync``. +''' +Core API client machinery; mostly sane/useful wrapping around `ib_insync`.. -""" +''' from __future__ import annotations from contextlib import ( asynccontextmanager as acm, @@ -1450,6 +1449,14 @@ class MethodProxy: while not chan.closed(): # send through method + ``kwargs: dict`` as pair msg = await chan.receive() + + # TODO: implement reconnect functionality like + # in our `.data._web_bs.NoBsWs` + # try: + # msg = await chan.receive() + # except ConnectionError: + # self.reset() + # print(f'NEXT MSG: {msg}') # TODO: py3.10 ``match:`` syntax B) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 2f4cdb78..25a82ef3 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -60,6 +60,7 @@ from piker.accounting import ( Position, Transaction, open_trade_ledger, + iter_by_dt, open_pps, PpTable, ) @@ -434,9 +435,9 @@ async def update_and_audit_msgs( if validate and p.size: # raise ValueError( log.error( - f'UNEXPECTED POSITION says IB:\n' - 'Maybe they LIQUIDATED YOU or are missing ledger txs?\n' - f'PIKER:\n{pikerfmtmsg}\n\n' + f'UNEXPECTED POSITION says IB => {msg.symbol}\n' + 'Maybe they LIQUIDATED YOU or are missing ledger entries?\n' + f'{pikerfmtmsg}\n\n' ) msgs.append(msg) @@ -581,6 +582,13 @@ async def trades_dialogue( open_trade_ledger( 'ib', acctid, + tx_sort=partial( + iter_by_dt, + parsers={ + 'dateTime': parse_flex_dt, + 'datetime': pendulum.parse, + }, + ), ) ) @@ -654,7 +662,7 @@ async def trades_dialogue( # update position table with latest ledger from all # gathered transactions: ledger file + api records. - trans = norm_trade_records(ledger) + trans: dict[str, Transaction] = norm_trade_records(ledger) table.update_from_trans(trans) # process pp value reported from ib's system. we only diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index eacfca7b..28db4eee 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -120,9 +120,7 @@ async def open_data_client() -> MethodProxy: @acm async def open_history_client( - fqme: str, - - # mkt: MktPair | None = None, + mkt: MktPair, ) -> tuple[Callable, int]: ''' @@ -130,7 +128,7 @@ async def open_history_client( that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. ''' - # TODO: + # TODO: mostly meta-data processing to drive shm and tsdb storage.. # - add logic to handle tradable hours and only grab # valid bars in the range? # - we want to avoid overrunning the underlying shm array buffer and @@ -139,12 +137,21 @@ async def open_history_client( # the shm size will be driven by user config and available sys # memory. + # IB's internal symbology does not expect the "source asset" in + # the "symbol name", what we call the "market name". This is + # common in most legacy market brokers since it's presumed that + # given a certain stock exchange, listed assets are traded + # "from" a particular source fiat, normally something like USD. + if ( + mkt.src + and mkt.src.atype == 'fiat' + ): + fqme: str = mkt.get_bs_fqme(without_src=True) + else: + fqme = mkt.bs_fqme + async with open_data_client() as proxy: - # TODO: maybe strip the `MktPair.src: Asset` key here? - # see the comment below.. - # if mkt is not None: - # fqme: str = mkt.fqme.remove(mkt.src.name) max_timeout: float = 2. mean: float = 0 @@ -156,7 +163,7 @@ async def open_history_client( 'idealpro' not in fqme ): try: - head_dt = await proxy.get_head_time(fqsn=fqme) + head_dt = await proxy.get_head_time(fqme=fqme) except RequestError: head_dt = None @@ -310,7 +317,7 @@ _failed_resets: int = 0 async def get_bars( proxy: MethodProxy, - fqsn: str, + fqme: str, timeframe: int, # blank to start which tells ib to look up the latest datum @@ -354,7 +361,7 @@ async def get_bars( while _failed_resets < max_failed_resets: try: out = await proxy.bars( - fqsn=fqsn, + fqme=fqme, end_dt=end_dt, sample_period_s=timeframe, @@ -380,7 +387,7 @@ async def get_bars( continue if bars_array is None: - raise SymbolNotFound(fqsn) + raise SymbolNotFound(fqme) first_dt = pendulum.from_timestamp( bars[0].date.timestamp()) @@ -411,7 +418,7 @@ async def get_bars( if 'No market data permissions for' in msg: # TODO: signalling for no permissions searches raise NoData( - f'Symbol: {fqsn}', + f'Symbol: {fqme}', ) elif ( @@ -437,7 +444,7 @@ async def get_bars( if nodatas_count >= max_nodatas: raise DataUnavailable( - f'Presuming {fqsn} has no further history ' + f'Presuming {fqme} has no further history ' f'after {max_nodatas} tries..' ) @@ -701,7 +708,7 @@ def normalize( # check for special contract types con = ticker.contract - fqsn, calc_price = con2fqsn(con) + fqme, calc_price = con2fqsn(con) # convert named tuples to dicts so we send usable keys new_ticks = [] @@ -731,9 +738,9 @@ def normalize( # serialize for transport data = asdict(ticker) - # generate fqsn with possible specialized suffix + # generate fqme with possible specialized suffix # for derivatives, note the lowercase. - data['symbol'] = data['fqsn'] = fqsn + data['symbol'] = data['fqme'] = fqme # convert named tuples to dicts for transport tbts = data.get('tickByTicks') @@ -1002,9 +1009,9 @@ async def stream_quotes( # last = time.time() async for ticker in stream: quote = normalize(ticker) - fqsn = quote['fqsn'] - # print(f'sending {fqsn}:\n{quote}') - await send_chan.send({fqsn: quote}) + fqme = quote['fqme'] + # print(f'sending {fqme}:\n{quote}') + await send_chan.send({fqme: quote}) # ugh, clear ticks since we've consumed them ticker.ticks = []