Compare commits

..

No commits in common. "b57718077381093ea35397a076cd8f8acef0aafa" and "ab1463d9429b0e8e9708a0dfdbc461cad1cf4b5e" have entirely different histories.

6 changed files with 95 additions and 222 deletions

View File

@ -100,7 +100,7 @@ async def data_reset_hack(
log.warning( log.warning(
no_setup_msg no_setup_msg
+ +
'REQUIRES A `vnc_addrs: array` ENTRY' f'REQUIRES A `vnc_addrs: array` ENTRY'
) )
vnc_host, vnc_port = vnc_sockaddr.get( vnc_host, vnc_port = vnc_sockaddr.get(

View File

@ -287,31 +287,9 @@ class Client:
self.conf = config self.conf = config
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
self.ib: IB = ib self.ib = ib
self.ib.RaiseRequestErrors: bool = True self.ib.RaiseRequestErrors: bool = True
# self._acnt_names: set[str] = {}
self._acnt_names: list[str] = []
@property
def acnts(self) -> list[str]:
# return list(self._acnt_names)
return self._acnt_names
def __repr__(self) -> str:
return (
f'<{type(self).__name__}('
f'ib={self.ib} '
f'acnts={self.acnts}'
# TODO: we need to mask out acnt-#s and other private
# infos if we're going to console this!
# f' |_.conf:\n'
# f' {pformat(self.conf)}\n'
')>'
)
async def get_fills(self) -> list[Fill]: async def get_fills(self) -> list[Fill]:
''' '''
Return list of rents `Fills` from trading session. Return list of rents `Fills` from trading session.
@ -398,21 +376,19 @@ class Client:
# whatToShow='MIDPOINT', # whatToShow='MIDPOINT',
# whatToShow='TRADES', # whatToShow='TRADES',
) )
log.info(
f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n'
f'fqme: {fqme}\n'
f'global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
bars = await self.ib.reqHistoricalDataAsync( bars = await self.ib.reqHistoricalDataAsync(
**kwargs, **kwargs,
) )
query_info: str = (
f'REQUESTING IB history BARS\n'
f' ------ - ------\n'
f'dt_duration: {dt_duration}\n'
f'ib_duration_str: {ib_duration_str}\n'
f'bar_size: {bar_size}\n'
f'fqme: {fqme}\n'
f'actor-global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
# tail case if no history for range or none prior. # tail case if no history for range or none prior.
if not bars:
# NOTE: there's actually 3 cases here to handle (and # NOTE: there's actually 3 cases here to handle (and
# this should be read alongside the implementation of # this should be read alongside the implementation of
# `.reqHistoricalDataAsync()`): # `.reqHistoricalDataAsync()`):
@ -422,39 +398,33 @@ class Client:
# a weekend, holiday or other non-trading period prior to # a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``, # ``end_dt`` which exceeds the ``duration``,
# - LITERALLY this is the start of the mkt's history! # - LITERALLY this is the start of the mkt's history!
if not bars:
# TODO: figure out wut's going on here.
# TODO: is this handy, a sync requester for tinkering
# with empty frame cases?
# def get_hist():
# return self.ib.reqHistoricalData(**kwargs)
# import pdbp
# pdbp.set_trace()
log.critical( # sync requester for debugging empty frame cases
'STUPID IB SAYS NO HISTORY\n\n' def get_hist():
+ query_info return self.ib.reqHistoricalData(**kwargs)
)
assert get_hist
import pdbp
pdbp.set_trace()
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case?
# right now there's no way to detect a timeout..
return [], np.empty(0), dt_duration return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no
# way to detect a timeout.
log.info(query_info) # NOTE XXX: ensure minimum duration in bars B)
# NOTE XXX: ensure minimum duration in bars? # => we recursively call this method until we get at least
# => recursively call this method until we get at least as # as many bars such that they sum in aggregate to the the
# many bars such that they sum in aggregate to the the
# desired total time (duration) at most. # desired total time (duration) at most.
# XXX XXX XXX
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data # - if you query over a gap and get no data
# that may short circuit the history # that may short circuit the history
if ( if (
# XXX XXX XXX end_dt
# => WHY DID WE EVEN NEED THIS ORIGINALLY!? <= and False
# XXX XXX XXX
False
and end_dt
): ):
nparr: np.ndarray = bars_to_np(bars) nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time'] times: np.ndarray = nparr['time']
@ -957,10 +927,7 @@ class Client:
warnset = True warnset = True
else: else:
log.info( log.info(f'Got first quote for {contract}')
'Got first quote for contract\n'
f'{contract}\n'
)
break break
else: else:
if timeouterr and raise_on_timeout: if timeouterr and raise_on_timeout:
@ -1024,12 +991,8 @@ class Client:
outsideRth=True, outsideRth=True,
optOutSmartRouting=True, optOutSmartRouting=True,
# TODO: need to understand this setting better as
# it pertains to shit ass mms..
routeMarketableToBbo=True, routeMarketableToBbo=True,
designatedLocation='SMART', designatedLocation='SMART',
# TODO: make all orders GTC? # TODO: make all orders GTC?
# https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba # https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba
# goodTillDate=f"yyyyMMdd-HH:mm:ss", # goodTillDate=f"yyyyMMdd-HH:mm:ss",
@ -1157,8 +1120,8 @@ def get_config() -> dict[str, Any]:
names = list(accounts.keys()) names = list(accounts.keys())
accts = section['accounts'] = bidict(accounts) accts = section['accounts'] = bidict(accounts)
log.info( log.info(
f'{path} defines {len(accts)} account aliases:\n' f'brokers.toml defines {len(accts)} accounts: '
f'{pformat(names)}\n' f'{pformat(names)}'
) )
if section is None: if section is None:
@ -1225,7 +1188,7 @@ async def load_aio_clients(
try_ports = list(try_ports.values()) try_ports = list(try_ports.values())
_err = None _err = None
accounts_def: dict[str, str] = config.load_accounts(['ib']) accounts_def = config.load_accounts(['ib'])
ports = try_ports if port is None else [port] ports = try_ports if port is None else [port]
combos = list(itertools.product(hosts, ports)) combos = list(itertools.product(hosts, ports))
accounts_found: dict[str, Client] = {} accounts_found: dict[str, Client] = {}
@ -1264,9 +1227,7 @@ async def load_aio_clients(
client = Client(ib=ib, config=conf) client = Client(ib=ib, config=conf)
# update all actor-global caches # update all actor-global caches
log.runtime( log.info(f"Caching client for {sockaddr}")
f'Connected and caching `Client` @ {sockaddr!r}'
)
_client_cache[sockaddr] = client _client_cache[sockaddr] = client
break break
@ -1281,54 +1242,32 @@ async def load_aio_clients(
OSError, OSError,
) as ce: ) as ce:
_err = ce _err = ce
message: str = ( log.warning(
f'Failed to connect on {host}:{port} after {i} tries with\n' f'Failed to connect on {host}:{port} for {i} time with,\n'
f'{ib.client.apiError.value()!r}\n\n' f'{ib.client.apiError.value()}\n'
'Retrying with a new client id..\n' 'retrying with a new client id..')
)
log.runtime(message)
else:
# XXX report loudly if we never established after all
# re-tries
log.warning(message)
# Pre-collect all accounts available for this # Pre-collect all accounts available for this
# connection and map account names to this client # connection and map account names to this client
# instance. # instance.
for value in ib.accountValues(): for value in ib.accountValues():
acct_number: str = value.account acct_number = value.account
acnt_alias: str = accounts_def.inverse.get(acct_number) entry = accounts_def.inverse.get(acct_number)
if not acnt_alias: if not entry:
# TODO: should we constuct the below reco-ex from
# the existing config content?
_, path = config.load(
conf_name='brokers',
)
raise ValueError( raise ValueError(
'No alias in account section for account!\n' 'No section in brokers.toml for account:'
f'Please add an acnt alias entry to your {path}\n' f' {acct_number}\n'
'For example,\n\n' f'Please add entry to continue using this API client'
'[ib.accounts]\n'
'margin = {accnt_number!r}\n'
'^^^^^^ <- you need this part!\n\n'
'This ensures `piker` will not leak private acnt info '
'to console output by default!\n'
) )
# surjection of account names to operating clients. # surjection of account names to operating clients.
if acnt_alias not in accounts_found: if acct_number not in accounts_found:
accounts_found[acnt_alias] = client accounts_found[entry] = client
# client._acnt_names.add(acnt_alias)
client._acnt_names.append(acnt_alias)
if accounts_found:
log.info( log.info(
f'Loaded accounts for api client\n\n' f'Loaded accounts for client @ {host}:{port}\n'
f'{pformat(accounts_found)}\n' f'{pformat(accounts_found)}'
) )
# XXX: why aren't we just updating this directy above # XXX: why aren't we just updating this directy above
@ -1533,7 +1472,7 @@ async def open_aio_client_method_relay(
msg: tuple[str, dict] | dict | None = await from_trio.get() msg: tuple[str, dict] | dict | None = await from_trio.get()
match msg: match msg:
case None: # termination sentinel case None: # termination sentinel
log.info('asyncio `Client` method-proxy SHUTDOWN!') print('asyncio PROXY-RELAY SHUTDOWN')
break break
case (meth_name, kwargs): case (meth_name, kwargs):

View File

@ -1183,14 +1183,7 @@ async def deliver_trade_events(
pos pos
and fill and fill
): ):
now_cr: CommissionReport = fill.commissionReport assert fill.commissionReport == cr
if (now_cr != cr):
log.warning(
'UhhHh ib updated the commission report mid-fill..?\n'
f'was: {pformat(cr)}\n'
f'now: {pformat(now_cr)}\n'
)
await emit_pp_update( await emit_pp_update(
ems_stream, ems_stream,
accounts_def, accounts_def,

View File

@ -671,8 +671,8 @@ async def _setup_quote_stream(
# making them mostly useless and explains why the scanner # making them mostly useless and explains why the scanner
# is always slow XD # is always slow XD
# '293', # Trade count for day # '293', # Trade count for day
# '294', # Trade rate / minute '294', # Trade rate / minute
# '295', # Vlm rate / minute '295', # Vlm rate / minute
), ),
contract: Contract | None = None, contract: Contract | None = None,
@ -915,13 +915,9 @@ async def stream_quotes(
if first_ticker: if first_ticker:
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
log.info(
# TODO: we need a stack-oriented log levels filters for 'Rxed init quote:\n'
# this! f'{pformat(first_quote)}'
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n\n'
f'{pformat(first_quote)}\n'
) )
# NOTE: it might be outside regular trading hours for # NOTE: it might be outside regular trading hours for
@ -973,11 +969,7 @@ async def stream_quotes(
raise_on_timeout=True, raise_on_timeout=True,
) )
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
log.info(
# TODO: we need a stack-oriented log levels filters for
# this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n' 'Rxed init quote:\n'
f'{pformat(first_quote)}' f'{pformat(first_quote)}'
) )

View File

@ -31,11 +31,7 @@ from typing import (
) )
from bidict import bidict from bidict import bidict
from pendulum import ( import pendulum
DateTime,
parse,
from_timestamp,
)
from ib_insync import ( from ib_insync import (
Contract, Contract,
Commodity, Commodity,
@ -70,11 +66,10 @@ tx_sort: Callable = partial(
iter_by_dt, iter_by_dt,
parsers={ parsers={
'dateTime': parse_flex_dt, 'dateTime': parse_flex_dt,
'datetime': parse, 'datetime': pendulum.parse,
# for some some fucking 2022 and
# XXX: for some some fucking 2022 and # back options records...fuck me.
# back options records.. f@#$ me.. 'date': pendulum.parse,
'date': parse,
} }
) )
@ -94,38 +89,15 @@ def norm_trade(
conid: int = str(record.get('conId') or record['conid']) conid: int = str(record.get('conId') or record['conid'])
bs_mktid: str = str(conid) bs_mktid: str = str(conid)
comms = record.get('commission')
if comms is None:
comms = -1*record['ibCommission']
# NOTE: sometimes weird records (like BTTX?) price = record.get('price') or record['tradePrice']
# have no field for this?
comms: float = -1 * (
record.get('commission')
or record.get('ibCommission')
or 0
)
if not comms:
log.warning(
'No commissions found for record?\n'
f'{pformat(record)}\n'
)
price: float = (
record.get('price')
or record.get('tradePrice')
)
if price is None:
log.warning(
'No `price` field found in record?\n'
'Skipping normalization..\n'
f'{pformat(record)}\n'
)
return None
# the api doesn't do the -/+ on the quantity for you but flex # the api doesn't do the -/+ on the quantity for you but flex
# records do.. are you fucking serious ib...!? # records do.. are you fucking serious ib...!?
size: float|int = ( size = record.get('quantity') or record['shares'] * {
record.get('quantity')
or record['shares']
) * {
'BOT': 1, 'BOT': 1,
'SLD': -1, 'SLD': -1,
}[record['side']] }[record['side']]
@ -156,31 +128,26 @@ def norm_trade(
# otype = tail[6] # otype = tail[6]
# strike = tail[7:] # strike = tail[7:]
log.warning( print(f'skipping opts contract {symbol}')
f'Skipping option contract -> NO SUPPORT YET!\n'
f'{symbol}\n'
)
return None return None
# timestamping is way different in API records # timestamping is way different in API records
dtstr: str = record.get('datetime') dtstr = record.get('datetime')
date: str = record.get('date') date = record.get('date')
flex_dtstr: str = record.get('dateTime') flex_dtstr = record.get('dateTime')
if dtstr or date: if dtstr or date:
dt: DateTime = parse(dtstr or date) dt = pendulum.parse(dtstr or date)
elif flex_dtstr: elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp.. # probably a flex record with a wonky non-std timestamp..
dt: DateTime = parse_flex_dt(record['dateTime']) dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from # special handling of symbol extraction from
# flex records using some ad-hoc schema parsing. # flex records using some ad-hoc schema parsing.
asset_type: str = ( asset_type: str = record.get(
record.get('assetCategory') 'assetCategory'
or record.get('secType') ) or record.get('secType', 'STK')
or 'STK'
)
if (expiry := ( if (expiry := (
record.get('lastTradeDateOrContractMonth') record.get('lastTradeDateOrContractMonth')
@ -390,7 +357,6 @@ def norm_trade_records(
if txn is None: if txn is None:
continue continue
# inject txns sorted by datetime
insort( insort(
records, records,
txn, txn,
@ -439,7 +405,7 @@ def api_trades_to_ledger_entries(
txn_dict[attr_name] = val txn_dict[attr_name] = val
tid = str(txn_dict['execId']) tid = str(txn_dict['execId'])
dt = from_timestamp(txn_dict['time']) dt = pendulum.from_timestamp(txn_dict['time'])
txn_dict['datetime'] = str(dt) txn_dict['datetime'] = str(dt)
acctid = accounts[txn_dict['acctNumber']] acctid = accounts[txn_dict['acctNumber']]

View File

@ -209,10 +209,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
break break
ib_client = proxy._aio_ns.ib ib_client = proxy._aio_ns.ib
log.info( log.info(f'Using {ib_client} for symbol search')
f'Using API client for symbol-search\n'
f'{ib_client}\n'
)
last = time.time() last = time.time()
async for pattern in stream: async for pattern in stream:
@ -297,7 +294,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
elif stock_results: elif stock_results:
break break
# else: # else:
# await tractor.pause() await tractor.pause()
# # match against our ad-hoc set immediately # # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extract( # adhoc_matches = fuzzy.extract(
@ -525,21 +522,7 @@ async def get_mkt_info(
venue = con.primaryExchange or con.exchange venue = con.primaryExchange or con.exchange
price_tick: Decimal = Decimal(str(details.minTick)) price_tick: Decimal = Decimal(str(details.minTick))
ib_min_tick_gt_2: Decimal = Decimal('0.01') # price_tick: Decimal = Decimal('0.01')
if (
price_tick < ib_min_tick_gt_2
):
# TODO: we need to add some kinda dynamic rounding sys
# to our MktPair i guess?
# not sure where the logic should sit, but likely inside
# the `.clearing._ems` i suppose...
log.warning(
'IB seems to disallow a min price tick < 0.01 '
'when the price is > 2.0..?\n'
f'Decreasing min tick precision for {fqme} to 0.01'
)
# price_tick = ib_min_tick
# await tractor.pause()
if atype == 'stock': if atype == 'stock':
# XXX: GRRRR they don't support fractional share sizes for # XXX: GRRRR they don't support fractional share sizes for