From c6aa867c9bd054944390cec1abb5b1ba61f916a0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jun 2021 10:09:45 -0400 Subject: [PATCH 1/6] Add more futes, add in order status comments --- piker/_daemon.py | 2 +- piker/brokers/ib.py | 57 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index e172fe6b..c206a4d2 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -283,7 +283,7 @@ async def maybe_spawn_daemon( lock = Brokerd.locks[service_name] await lock.acquire() - # attach to existing brokerd if possible + # attach to existing daemon by name if possible async with tractor.find_actor(service_name) as portal: if portal is not None: lock.release() diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 45b93416..6abe257a 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -171,6 +171,7 @@ _adhoc_futes_set = { # equities 'nq.globex', 'mnq.globex', + 'es.globex', 'mes.globex', @@ -178,8 +179,20 @@ _adhoc_futes_set = { 'brr.cmecrypto', 'ethusdrr.cmecrypto', + # agriculture + 'he.globex', # lean hogs + 'le.globex', # live cattle (geezers) + 'gf.globex', # feeder cattle (younguns) + + # raw + 'lb.globex', # random len lumber + # metals - 'xauusd.cmdty', + 'xauusd.cmdty', # gold spot + 'gc.nymex', + 'mgc.nymex', + + 'xagusd.cmdty', # silver spot } # exchanges we don't support at the moment due to not knowing @@ -556,7 +569,7 @@ class Client: else: item = ('status', obj) - log.info(f'eventkit event -> {eventkit_obj}: {item}') + log.info(f'eventkit event ->\n{pformat(item)}') try: to_trio.send_nowait(item) @@ -1362,12 +1375,35 @@ async def trades_dialogue( # ib-gw goes down? Not sure exactly how that's happening looking # at the eventkit code above but we should probably handle it... async for event_name, item in ib_trade_events_stream: + print(f' ib sending {item}') - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) + + # XXX: here's some other sucky cases from the api + # - short-sale but securities haven't been located, in this + # case we should probably keep the order in some kind of + # weird state or cancel it outright? + # status='PendingSubmit', message=''), + # status='Cancelled', message='Error 404, + # reqId 1550: Order held while securities are located.'), + # status='PreSubmitted', message='')], if event_name == 'status': + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... + # unwrap needed data from ib_insync internal types trade: Trade = item status: OrderStatus = trade.orderStatus @@ -1378,10 +1414,13 @@ async def trades_dialogue( reqid=trade.order.orderId, time_ns=time.time_ns(), # cuz why not + + # everyone doin camel case.. status=status.status.lower(), # force lower case filled=status.filled, reason=status.whyHeld, + # this seems to not be necessarily up to date in the # execDetails event.. so we have to send it here I guess? remaining=status.remaining, @@ -1500,6 +1539,12 @@ async def open_symbol_search( if not pattern or pattern.isspace(): log.warning('empty pattern received, skipping..') + # TODO: *BUG* if nothing is returned here the client + # side will cache a null set result and not showing + # anything to the use on re-searches when this query + # timed out. We probably need a special "timeout" msg + # or something... + # XXX: this unblocks the far end search task which may # hold up a multi-search nursery block await stream.send({}) @@ -1507,7 +1552,7 @@ async def open_symbol_search( continue log.debug(f'searching for {pattern}') - # await tractor.breakpoint() + last = time.time() results = await _trio_run_client_method( method='search_stocks', From ffbfd187ada7febe0c967793c9a6f25820acfdf1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Aug 2021 15:15:28 -0400 Subject: [PATCH 2/6] Raise cache miss on a disconnected ib client --- piker/brokers/ib.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 6abe257a..23e71a0b 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -701,6 +701,10 @@ async def _aio_get_client( # grab first cached client client = list(_client_cache.values())[0] + if not client.ib.isConnected(): + # we have a stale client to re-allocate + raise KeyError + yield client except (KeyError, IndexError): @@ -780,7 +784,6 @@ async def _aio_run_client_method( kwargs['to_trio'] = to_trio log.runtime(f'Running {meth}({kwargs})') - return await async_meth(**kwargs) From d940957455fc12510ccfe55317c66136c95f7fd3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Aug 2021 15:17:26 -0400 Subject: [PATCH 3/6] Support account passthrough in `.submit_limit()` --- piker/brokers/ib.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 23e71a0b..277c6c49 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -496,6 +496,7 @@ class Client: price: float, action: str, size: int, + account: str = '', # if blank the "default" tws account is used # XXX: by default 0 tells ``ib_insync`` methods that there is no # existing order so ask the client to create a new one (which it @@ -518,6 +519,7 @@ class Client: Order( orderId=reqid or 0, # stupid api devs.. action=action.upper(), # BUY/SELL + account=account, orderType='LMT', lmtPrice=price, totalQuantity=size, From 3dad779c905d93c47390be48dd7a9d019f0f3965 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Aug 2021 17:53:01 -0400 Subject: [PATCH 4/6] Add commented catch to skip backpressure errors wen debugging --- piker/brokers/ib.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 277c6c49..f081bd22 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1093,6 +1093,11 @@ async def _setup_quote_stream( # decouple broadcast mem chan _quote_streams.pop(symbol, None) + # except trio.WouldBlock: + # # for slow debugging purposes to avoid clobbering prompt + # # with log msgs + # pass + ticker.updateEvent.connect(push) return from_aio From eb5762d91276303fb74b9bd3afeec5cf710cbb27 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Sep 2021 10:46:20 -0400 Subject: [PATCH 5/6] Add adhoc-symbols search for ib This gives us fast search over a known set of symbols you can't search for with the api such as futures and commodities contracts. Toss in a new client method to lookup contract details `Client.con_deats()` and avoid calling it for now from `.search_stock()` for speed; it seems originally we were doing the 2nd lookup due to weird suffixes in the `.primaryExchange` which we can just discard. --- piker/brokers/ib.py | 144 +++++++++++++++++++++++++++++--------------- 1 file changed, 95 insertions(+), 49 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index f081bd22..89a8fe11 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -25,7 +25,10 @@ from contextlib import asynccontextmanager from dataclasses import asdict from datetime import datetime from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncIterator +from typing import ( + List, Dict, Any, Tuple, Optional, + AsyncIterator, Awaitable, +) import asyncio from pprint import pformat import inspect @@ -292,12 +295,47 @@ class Client: df = ibis.util.df(bars) return bars, from_df(df) + async def con_deats( + self, + contracts: list[Contract], + + ) -> dict[str, ContractDetails]: + + futs = [] + for con in contracts: + if con.primaryExchange not in _exch_skip_list: + futs.append(self.ib.reqContractDetailsAsync(con)) + + # batch request all details + results = await asyncio.gather(*futs) + + # XXX: if there is more then one entry in the details list + details = {} + for details_set in results: + + # then the contract is so called "ambiguous". + for d in details_set: + con = d.contract + unique_sym = f'{con.symbol}.{con.primaryExchange}' + + as_dict = asdict(d) + # nested dataclass we probably don't need and that + # won't IPC serialize + as_dict.pop('secIdList') + + details[unique_sym] = as_dict + + return details + async def search_stocks( self, pattern: str, + + get_details: bool = False, # how many contracts to search "up to" upto: int = 3, - ) -> Dict[str, ContractDetails]: + + ) -> dict[str, ContractDetails]: """Search for stocks matching provided ``str`` pattern. Return a dictionary of ``upto`` entries worth of contract details. @@ -305,37 +343,22 @@ class Client: descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) if descriptions is not None: + descrs = descriptions[:upto] - futs = [] - for d in descriptions: - con = d.contract - if con.primaryExchange not in _exch_skip_list: - futs.append(self.ib.reqContractDetailsAsync(con)) + if get_details: + return await self.con_deats([d.contract for d in descrs]) - # batch request all details - results = await asyncio.gather(*futs) - - # XXX: if there is more then one entry in the details list - details = {} - for details_set in results: - - # then the contract is so called "ambiguous". - for d in details_set: + else: + results = {} + for d in descrs: con = d.contract - unique_sym = f'{con.symbol}.{con.primaryExchange}' - - as_dict = asdict(d) - # nested dataclass we probably don't need and that - # won't IPC serialize - as_dict.pop('secIdList') - - details[unique_sym] = as_dict - - if len(details) == upto: - return details - - return details + # sometimes there's a weird extra suffix returned + # from search? + exch = con.primaryExchange.rsplit('.')[0] + unique_sym = f'{con.symbol}.{exch}' + results[unique_sym] = {} + return results else: return {} @@ -345,20 +368,12 @@ class Client: # how many contracts to search "up to" upto: int = 3, asdicts: bool = True, + ) -> Dict[str, ContractDetails]: # TODO add search though our adhoc-locally defined symbol set # for futes/cmdtys/ - return await self.search_stocks(pattern, upto, asdicts) - - async def search_futes( - self, - pattern: str, - # how many contracts to search "up to" - upto: int = 3, - asdicts: bool = True, - ) -> Dict[str, ContractDetails]: - raise NotImplementedError + return await self.search_stocks(pattern, upto, get_details=True) async def get_cont_fute( self, @@ -1564,20 +1579,51 @@ async def open_symbol_search( log.debug(f'searching for {pattern}') last = time.time() - results = await _trio_run_client_method( - method='search_stocks', - pattern=pattern, - upto=5, - ) - log.debug(f'got results {results.keys()}') - log.debug("fuzzy matching") - matches = fuzzy.extractBests( + # async batch search using api stocks endpoint and module + # defined adhoc symbol set. + stock_results = [] + + async def stash_results(target: Awaitable[list]): + stock_results.extend(await target) + + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + _trio_run_client_method( + method='search_stocks', + pattern=pattern, + upto=5, + ) + ) + + # trigger async request + await trio.sleep(0) + + # match against our ad-hoc set immediately + adhoc_matches = fuzzy.extractBests( + pattern, + list(_adhoc_futes_set), + score_cutoff=90, + ) + log.info(f'fuzzy matched adhocs: {adhoc_matches}') + adhoc_match_results = {} + if adhoc_matches: + # TODO: do we need to pull contract details? + adhoc_match_results = {i[0]: {} for i in adhoc_matches} + + log.debug(f'fuzzy matching stocks {stock_results}') + stock_matches = fuzzy.extractBests( pattern, - results, + stock_results, score_cutoff=50, ) - matches = {item[2]: item[0] for item in matches} + matches = adhoc_match_results | { + item[0]: {} for item in stock_matches + } + # TODO: we used to deliver contract details + # {item[2]: item[0] for item in stock_matches} + log.debug(f"sending matches: {matches.keys()}") await stream.send(matches) From d3838c2a8b33dd6c53994225e5ad4d1a66f8d0e7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Sep 2021 12:55:10 -0400 Subject: [PATCH 6/6] Use built-in type generics --- piker/brokers/ib.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 89a8fe11..c8f8a102 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -26,7 +26,7 @@ from dataclasses import asdict from datetime import datetime from functools import partial from typing import ( - List, Dict, Any, Tuple, Optional, + Any, Optional, AsyncIterator, Awaitable, ) import asyncio @@ -226,8 +226,8 @@ class Client: self.ib.RaiseRequestErrors = True # contract cache - self._contracts: Dict[str, Contract] = {} - self._feeds: Dict[str, trio.abc.SendChannel] = {} + self._contracts: dict[str, Contract] = {} + self._feeds: dict[str, trio.abc.SendChannel] = {} # NOTE: the ib.client here is "throttled" to 45 rps by default @@ -242,7 +242,7 @@ class Client: period_count: int = int(2e3), # <- max per 1s sample query is_paid_feed: bool = False, # placeholder - ) -> List[Dict[str, Any]]: + ) -> list[dict[str, Any]]: """Retreive OHLCV bars for a symbol over a range to the present. """ bars_kwargs = {'whatToShow': 'TRADES'} @@ -369,7 +369,7 @@ class Client: upto: int = 3, asdicts: bool = True, - ) -> Dict[str, ContractDetails]: + ) -> dict[str, ContractDetails]: # TODO add search though our adhoc-locally defined symbol set # for futes/cmdtys/ @@ -399,7 +399,7 @@ class Client: # ``wrapper.starTicker()`` currently cashes ticker instances # which means getting a singel quote will potentially look up # a quote for a ticker that it already streaming and thus run - # into state clobbering (eg. List: Ticker.ticks). It probably + # into state clobbering (eg. list: Ticker.ticks). It probably # makes sense to try this once we get the pub-sub working on # individual symbols... @@ -660,7 +660,7 @@ class Client: async def positions( self, account: str = '', - ) -> List[Position]: + ) -> list[Position]: """ Retrieve position info for ``account``. """ @@ -1067,12 +1067,12 @@ asset_type_map = { } -_quote_streams: Dict[str, trio.abc.ReceiveStream] = {} +_quote_streams: dict[str, trio.abc.ReceiveStream] = {} async def _setup_quote_stream( symbol: str, - opts: Tuple[int] = ('375', '233', '236'), + opts: tuple[int] = ('375', '233', '236'), contract: Optional[Contract] = None, ) -> None: """Stream a ticker using the std L1 api. @@ -1148,13 +1148,13 @@ async def start_aio_quote_stream( async def stream_quotes( send_chan: trio.abc.SendChannel, - symbols: List[str], + symbols: list[str], shm: ShmArray, feed_is_live: trio.Event, loglevel: str = None, # startup sync - task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: """Stream symbol quotes. @@ -1285,7 +1285,7 @@ async def stream_quotes( # last = time.time() -def pack_position(pos: Position) -> Dict[str, Any]: +def pack_position(pos: Position) -> dict[str, Any]: con = pos.contract if isinstance(con, Option): @@ -1367,7 +1367,7 @@ async def trades_dialogue( ctx: tractor.Context, loglevel: str = None, -) -> AsyncIterator[Dict[str, Any]]: +) -> AsyncIterator[dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel)