Merge pull request #204 from pikers/ib_adhoc_derivs

Ib adhoc derivs search
pause_feeds_on_sym_switch
goodboy 2021-09-02 12:57:12 -04:00 committed by GitHub
commit ad174c5c21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 171 additions and 70 deletions

View File

@ -283,7 +283,7 @@ async def maybe_spawn_daemon(
lock = Brokerd.locks[service_name]
await lock.acquire()
# attach to existing brokerd if possible
# attach to existing daemon by name if possible
async with tractor.find_actor(service_name) as portal:
if portal is not None:
lock.release()

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -25,7 +25,10 @@ from contextlib import asynccontextmanager
from dataclasses import asdict
from datetime import datetime
from functools import partial
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
from typing import (
Any, Optional,
AsyncIterator, Awaitable,
)
import asyncio
from pprint import pformat
import inspect
@ -171,6 +174,7 @@ _adhoc_futes_set = {
# equities
'nq.globex',
'mnq.globex',
'es.globex',
'mes.globex',
@ -178,8 +182,20 @@ _adhoc_futes_set = {
'brr.cmecrypto',
'ethusdrr.cmecrypto',
# agriculture
'he.globex', # lean hogs
'le.globex', # live cattle (geezers)
'gf.globex', # feeder cattle (younguns)
# raw
'lb.globex', # random len lumber
# metals
'xauusd.cmdty',
'xauusd.cmdty', # gold spot
'gc.nymex',
'mgc.nymex',
'xagusd.cmdty', # silver spot
}
# exchanges we don't support at the moment due to not knowing
@ -210,8 +226,8 @@ class Client:
self.ib.RaiseRequestErrors = True
# contract cache
self._contracts: Dict[str, Contract] = {}
self._feeds: Dict[str, trio.abc.SendChannel] = {}
self._contracts: dict[str, Contract] = {}
self._feeds: dict[str, trio.abc.SendChannel] = {}
# NOTE: the ib.client here is "throttled" to 45 rps by default
@ -226,7 +242,7 @@ class Client:
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]:
) -> list[dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
"""
bars_kwargs = {'whatToShow': 'TRADES'}
@ -279,12 +295,47 @@ class Client:
df = ibis.util.df(bars)
return bars, from_df(df)
async def con_deats(
self,
contracts: list[Contract],
) -> dict[str, ContractDetails]:
futs = []
for con in contracts:
if con.primaryExchange not in _exch_skip_list:
futs.append(self.ib.reqContractDetailsAsync(con))
# batch request all details
results = await asyncio.gather(*futs)
# XXX: if there is more then one entry in the details list
details = {}
for details_set in results:
# then the contract is so called "ambiguous".
for d in details_set:
con = d.contract
unique_sym = f'{con.symbol}.{con.primaryExchange}'
as_dict = asdict(d)
# nested dataclass we probably don't need and that
# won't IPC serialize
as_dict.pop('secIdList')
details[unique_sym] = as_dict
return details
async def search_stocks(
self,
pattern: str,
get_details: bool = False,
# how many contracts to search "up to"
upto: int = 3,
) -> Dict[str, ContractDetails]:
) -> dict[str, ContractDetails]:
"""Search for stocks matching provided ``str`` pattern.
Return a dictionary of ``upto`` entries worth of contract details.
@ -292,37 +343,22 @@ class Client:
descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)
if descriptions is not None:
descrs = descriptions[:upto]
futs = []
for d in descriptions:
con = d.contract
if con.primaryExchange not in _exch_skip_list:
futs.append(self.ib.reqContractDetailsAsync(con))
if get_details:
return await self.con_deats([d.contract for d in descrs])
# batch request all details
results = await asyncio.gather(*futs)
# XXX: if there is more then one entry in the details list
details = {}
for details_set in results:
# then the contract is so called "ambiguous".
for d in details_set:
else:
results = {}
for d in descrs:
con = d.contract
unique_sym = f'{con.symbol}.{con.primaryExchange}'
as_dict = asdict(d)
# nested dataclass we probably don't need and that
# won't IPC serialize
as_dict.pop('secIdList')
details[unique_sym] = as_dict
if len(details) == upto:
return details
return details
# sometimes there's a weird extra suffix returned
# from search?
exch = con.primaryExchange.rsplit('.')[0]
unique_sym = f'{con.symbol}.{exch}'
results[unique_sym] = {}
return results
else:
return {}
@ -332,20 +368,12 @@ class Client:
# how many contracts to search "up to"
upto: int = 3,
asdicts: bool = True,
) -> Dict[str, ContractDetails]:
) -> dict[str, ContractDetails]:
# TODO add search though our adhoc-locally defined symbol set
# for futes/cmdtys/
return await self.search_stocks(pattern, upto, asdicts)
async def search_futes(
self,
pattern: str,
# how many contracts to search "up to"
upto: int = 3,
asdicts: bool = True,
) -> Dict[str, ContractDetails]:
raise NotImplementedError
return await self.search_stocks(pattern, upto, get_details=True)
async def get_cont_fute(
self,
@ -371,7 +399,7 @@ class Client:
# ``wrapper.starTicker()`` currently cashes ticker instances
# which means getting a singel quote will potentially look up
# a quote for a ticker that it already streaming and thus run
# into state clobbering (eg. List: Ticker.ticks). It probably
# into state clobbering (eg. list: Ticker.ticks). It probably
# makes sense to try this once we get the pub-sub working on
# individual symbols...
@ -483,6 +511,7 @@ class Client:
price: float,
action: str,
size: int,
account: str = '', # if blank the "default" tws account is used
# XXX: by default 0 tells ``ib_insync`` methods that there is no
# existing order so ask the client to create a new one (which it
@ -505,6 +534,7 @@ class Client:
Order(
orderId=reqid or 0, # stupid api devs..
action=action.upper(), # BUY/SELL
account=account,
orderType='LMT',
lmtPrice=price,
totalQuantity=size,
@ -556,7 +586,7 @@ class Client:
else:
item = ('status', obj)
log.info(f'eventkit event -> {eventkit_obj}: {item}')
log.info(f'eventkit event ->\n{pformat(item)}')
try:
to_trio.send_nowait(item)
@ -630,7 +660,7 @@ class Client:
async def positions(
self,
account: str = '',
) -> List[Position]:
) -> list[Position]:
"""
Retrieve position info for ``account``.
"""
@ -688,6 +718,10 @@ async def _aio_get_client(
# grab first cached client
client = list(_client_cache.values())[0]
if not client.ib.isConnected():
# we have a stale client to re-allocate
raise KeyError
yield client
except (KeyError, IndexError):
@ -767,7 +801,6 @@ async def _aio_run_client_method(
kwargs['to_trio'] = to_trio
log.runtime(f'Running {meth}({kwargs})')
return await async_meth(**kwargs)
@ -1034,12 +1067,12 @@ asset_type_map = {
}
_quote_streams: Dict[str, trio.abc.ReceiveStream] = {}
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
async def _setup_quote_stream(
symbol: str,
opts: Tuple[int] = ('375', '233', '236'),
opts: tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None,
) -> None:
"""Stream a ticker using the std L1 api.
@ -1075,6 +1108,11 @@ async def _setup_quote_stream(
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
# # with log msgs
# pass
ticker.updateEvent.connect(push)
return from_aio
@ -1110,13 +1148,13 @@ async def start_aio_quote_stream(
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: List[str],
symbols: list[str],
shm: ShmArray,
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Stream symbol quotes.
@ -1247,7 +1285,7 @@ async def stream_quotes(
# last = time.time()
def pack_position(pos: Position) -> Dict[str, Any]:
def pack_position(pos: Position) -> dict[str, Any]:
con = pos.contract
if isinstance(con, Option):
@ -1329,7 +1367,7 @@ async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[Dict[str, Any]]:
) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
@ -1362,12 +1400,35 @@ async def trades_dialogue(
# ib-gw goes down? Not sure exactly how that's happening looking
# at the eventkit code above but we should probably handle it...
async for event_name, item in ib_trade_events_stream:
print(f' ib sending {item}')
# XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# XXX: here's some other sucky cases from the api
# - short-sale but securities haven't been located, in this
# case we should probably keep the order in some kind of
# weird state or cancel it outright?
# status='PendingSubmit', message=''),
# status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
if event_name == 'status':
# XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
# unwrap needed data from ib_insync internal types
trade: Trade = item
status: OrderStatus = trade.orderStatus
@ -1378,10 +1439,13 @@ async def trades_dialogue(
reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not
# everyone doin camel case..
status=status.status.lower(), # force lower case
filled=status.filled,
reason=status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
@ -1500,6 +1564,12 @@ async def open_symbol_search(
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client
# side will cache a null set result and not showing
# anything to the use on re-searches when this query
# timed out. We probably need a special "timeout" msg
# or something...
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
@ -1507,22 +1577,53 @@ async def open_symbol_search(
continue
log.debug(f'searching for {pattern}')
# await tractor.breakpoint()
last = time.time()
results = await _trio_run_client_method(
method='search_stocks',
pattern=pattern,
upto=5,
)
log.debug(f'got results {results.keys()}')
log.debug("fuzzy matching")
matches = fuzzy.extractBests(
last = time.time()
# async batch search using api stocks endpoint and module
# defined adhoc symbol set.
stock_results = []
async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
async with trio.open_nursery() as sn:
sn.start_soon(
stash_results,
_trio_run_client_method(
method='search_stocks',
pattern=pattern,
upto=5,
)
)
# trigger async request
await trio.sleep(0)
# match against our ad-hoc set immediately
adhoc_matches = fuzzy.extractBests(
pattern,
list(_adhoc_futes_set),
score_cutoff=90,
)
log.info(f'fuzzy matched adhocs: {adhoc_matches}')
adhoc_match_results = {}
if adhoc_matches:
# TODO: do we need to pull contract details?
adhoc_match_results = {i[0]: {} for i in adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}')
stock_matches = fuzzy.extractBests(
pattern,
results,
stock_results,
score_cutoff=50,
)
matches = {item[2]: item[0] for item in matches}
matches = adhoc_match_results | {
item[0]: {} for item in stock_matches
}
# TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches}
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)