ib: use flex report datetime sort

Since `open_trade_ledger()` now requires a sort we pass in a combo of
the std `pendulum.parse()` for API records and a custom flex parser for
flex entries pulled offline.

Add special handling for `MktPair.src` such that when it's a fiat (like
it should always be for most legacy assets) we try to get the fqme
without that `.src` token (i.e. not mnqusd) to avoid breaking
roundtripping of live feed requests (due to new symbology) as well as
the current tsdb table key set..

Do a wholesale renaming of fqsn -> fqme in most of the rest of the
backend modules.
master
Tyler Goodlet 2023-05-22 09:41:44 -04:00
parent 53003618cb
commit 60a6f3269c
5 changed files with 58 additions and 28 deletions

View File

@ -35,6 +35,10 @@ from piker.accounting import (
def parse_flex_dt( def parse_flex_dt(
record: str, record: str,
) -> pendulum.datetime: ) -> pendulum.datetime:
'''
Parse stupid flex record datetime stamps for the `dateTime` field..
'''
date, ts = record.split(';') date, ts = record.split(';')
dt = pendulum.parse(date) dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'

View File

@ -170,6 +170,10 @@ async def vnc_click_hack(
def i3ipc_xdotool_manual_click_hack() -> None: def i3ipc_xdotool_manual_click_hack() -> None:
i3 = i3ipc.Connection() i3 = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
t = i3.get_tree() t = i3.get_tree()
orig_win_id = t.find_focused().window orig_win_id = t.find_focused().window

View File

@ -14,11 +14,10 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
``ib`` core API client machinery; mostly sane wrapping around Core API client machinery; mostly sane/useful wrapping around `ib_insync`..
``ib_insync``.
""" '''
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
@ -1450,6 +1449,14 @@ class MethodProxy:
while not chan.closed(): while not chan.closed():
# send through method + ``kwargs: dict`` as pair # send through method + ``kwargs: dict`` as pair
msg = await chan.receive() msg = await chan.receive()
# TODO: implement reconnect functionality like
# in our `.data._web_bs.NoBsWs`
# try:
# msg = await chan.receive()
# except ConnectionError:
# self.reset()
# print(f'NEXT MSG: {msg}') # print(f'NEXT MSG: {msg}')
# TODO: py3.10 ``match:`` syntax B) # TODO: py3.10 ``match:`` syntax B)

View File

@ -60,6 +60,7 @@ from piker.accounting import (
Position, Position,
Transaction, Transaction,
open_trade_ledger, open_trade_ledger,
iter_by_dt,
open_pps, open_pps,
PpTable, PpTable,
) )
@ -434,9 +435,9 @@ async def update_and_audit_msgs(
if validate and p.size: if validate and p.size:
# raise ValueError( # raise ValueError(
log.error( log.error(
f'UNEXPECTED POSITION says IB:\n' f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or are missing ledger txs?\n' 'Maybe they LIQUIDATED YOU or are missing ledger entries?\n'
f'PIKER:\n{pikerfmtmsg}\n\n' f'{pikerfmtmsg}\n\n'
) )
msgs.append(msg) msgs.append(msg)
@ -581,6 +582,13 @@ async def trades_dialogue(
open_trade_ledger( open_trade_ledger(
'ib', 'ib',
acctid, acctid,
tx_sort=partial(
iter_by_dt,
parsers={
'dateTime': parse_flex_dt,
'datetime': pendulum.parse,
},
),
) )
) )
@ -654,7 +662,7 @@ async def trades_dialogue(
# update position table with latest ledger from all # update position table with latest ledger from all
# gathered transactions: ledger file + api records. # gathered transactions: ledger file + api records.
trans = norm_trade_records(ledger) trans: dict[str, Transaction] = norm_trade_records(ledger)
table.update_from_trans(trans) table.update_from_trans(trans)
# process pp value reported from ib's system. we only # process pp value reported from ib's system. we only

View File

@ -120,9 +120,7 @@ async def open_data_client() -> MethodProxy:
@acm @acm
async def open_history_client( async def open_history_client(
fqme: str, mkt: MktPair,
# mkt: MktPair | None = None,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
''' '''
@ -130,7 +128,7 @@ async def open_history_client(
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
''' '''
# TODO: # TODO: mostly meta-data processing to drive shm and tsdb storage..
# - add logic to handle tradable hours and only grab # - add logic to handle tradable hours and only grab
# valid bars in the range? # valid bars in the range?
# - we want to avoid overrunning the underlying shm array buffer and # - we want to avoid overrunning the underlying shm array buffer and
@ -139,12 +137,21 @@ async def open_history_client(
# the shm size will be driven by user config and available sys # the shm size will be driven by user config and available sys
# memory. # memory.
# IB's internal symbology does not expect the "source asset" in
# the "symbol name", what we call the "market name". This is
# common in most legacy market brokers since it's presumed that
# given a certain stock exchange, listed assets are traded
# "from" a particular source fiat, normally something like USD.
if (
mkt.src
and mkt.src.atype == 'fiat'
):
fqme: str = mkt.get_bs_fqme(without_src=True)
else:
fqme = mkt.bs_fqme
async with open_data_client() as proxy: async with open_data_client() as proxy:
# TODO: maybe strip the `MktPair.src: Asset` key here?
# see the comment below..
# if mkt is not None:
# fqme: str = mkt.fqme.remove(mkt.src.name)
max_timeout: float = 2. max_timeout: float = 2.
mean: float = 0 mean: float = 0
@ -156,7 +163,7 @@ async def open_history_client(
'idealpro' not in fqme 'idealpro' not in fqme
): ):
try: try:
head_dt = await proxy.get_head_time(fqsn=fqme) head_dt = await proxy.get_head_time(fqme=fqme)
except RequestError: except RequestError:
head_dt = None head_dt = None
@ -310,7 +317,7 @@ _failed_resets: int = 0
async def get_bars( async def get_bars(
proxy: MethodProxy, proxy: MethodProxy,
fqsn: str, fqme: str,
timeframe: int, timeframe: int,
# blank to start which tells ib to look up the latest datum # blank to start which tells ib to look up the latest datum
@ -354,7 +361,7 @@ async def get_bars(
while _failed_resets < max_failed_resets: while _failed_resets < max_failed_resets:
try: try:
out = await proxy.bars( out = await proxy.bars(
fqsn=fqsn, fqme=fqme,
end_dt=end_dt, end_dt=end_dt,
sample_period_s=timeframe, sample_period_s=timeframe,
@ -380,7 +387,7 @@ async def get_bars(
continue continue
if bars_array is None: if bars_array is None:
raise SymbolNotFound(fqsn) raise SymbolNotFound(fqme)
first_dt = pendulum.from_timestamp( first_dt = pendulum.from_timestamp(
bars[0].date.timestamp()) bars[0].date.timestamp())
@ -411,7 +418,7 @@ async def get_bars(
if 'No market data permissions for' in msg: if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches # TODO: signalling for no permissions searches
raise NoData( raise NoData(
f'Symbol: {fqsn}', f'Symbol: {fqme}',
) )
elif ( elif (
@ -437,7 +444,7 @@ async def get_bars(
if nodatas_count >= max_nodatas: if nodatas_count >= max_nodatas:
raise DataUnavailable( raise DataUnavailable(
f'Presuming {fqsn} has no further history ' f'Presuming {fqme} has no further history '
f'after {max_nodatas} tries..' f'after {max_nodatas} tries..'
) )
@ -701,7 +708,7 @@ def normalize(
# check for special contract types # check for special contract types
con = ticker.contract con = ticker.contract
fqsn, calc_price = con2fqsn(con) fqme, calc_price = con2fqsn(con)
# convert named tuples to dicts so we send usable keys # convert named tuples to dicts so we send usable keys
new_ticks = [] new_ticks = []
@ -731,9 +738,9 @@ def normalize(
# serialize for transport # serialize for transport
data = asdict(ticker) data = asdict(ticker)
# generate fqsn with possible specialized suffix # generate fqme with possible specialized suffix
# for derivatives, note the lowercase. # for derivatives, note the lowercase.
data['symbol'] = data['fqsn'] = fqsn data['symbol'] = data['fqme'] = fqme
# convert named tuples to dicts for transport # convert named tuples to dicts for transport
tbts = data.get('tickByTicks') tbts = data.get('tickByTicks')
@ -1002,9 +1009,9 @@ async def stream_quotes(
# last = time.time() # last = time.time()
async for ticker in stream: async for ticker in stream:
quote = normalize(ticker) quote = normalize(ticker)
fqsn = quote['fqsn'] fqme = quote['fqme']
# print(f'sending {fqsn}:\n{quote}') # print(f'sending {fqme}:\n{quote}')
await send_chan.send({fqsn: quote}) await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []