Add symbol search to ib

symbol_search
Tyler Goodlet 2021-05-10 10:23:08 -04:00
parent 82a8e0a7b6
commit ef1b0911f3
1 changed files with 91 additions and 48 deletions

View File

@ -45,7 +45,9 @@ from ib_insync.objects import Position
import ib_insync as ibis import ib_insync as ibis
from ib_insync.wrapper import Wrapper from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
from fuzzywuzzy import process as fuzzy
from .api import open_cached_client
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df from ..data._source import from_df
@ -322,7 +324,7 @@ class Client:
sym, exch = symbol.upper().rsplit('.', maxsplit=1) sym, exch = symbol.upper().rsplit('.', maxsplit=1)
except ValueError: except ValueError:
# likely there's an embedded `.` for a forex pair # likely there's an embedded `.` for a forex pair
await tractor.breakpoint() breakpoint()
# futes # futes
if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'):
@ -350,10 +352,13 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD' currency = 'CAD'
if exch in ('PURE', 'TSE'): # stupid ib...
# stupid ib... primaryExchange = exch
primaryExchange = exch exch = 'SMART'
exch = 'SMART'
else:
exch = 'SMART'
primaryExchange = exch
con = ibis.Stock( con = ibis.Stock(
symbol=sym, symbol=sym,
@ -994,23 +999,31 @@ async def stream_quotes(
} }
} }
con = first_ticker.contract
# should be real volume for this contract by default
calc_price = False
# check for special contract types # check for special contract types
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange'
# should be real volume for this contract suffix = con.primaryExchange
calc_price = False if not suffix:
suffix = con.exchange
else: else:
# commodities and forex don't have an exchange name and # commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price # no real volume so we have to calculate the price
suffix = 'secType' suffix = con.secType
# no real volume on this tract
calc_price = True calc_price = True
# pass first quote asap
quote = normalize(first_ticker, calc_price=calc_price) quote = normalize(first_ticker, calc_price=calc_price)
con = quote['contract'] con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic quote['symbol'] = topic
# pass first quote asap
first_quote = {topic: quote} first_quote = {topic: quote}
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
@ -1022,50 +1035,50 @@ async def stream_quotes(
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange' # suffix = 'exchange'
calc_price = False # should be real volume for contract # calc_price = False # should be real volume for contract
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be closed)
async with aclosing(stream): while True:
async for ticker in stream: ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if not calc_price and not ticker.rtTime:
# spin consuming tickers until we get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
# tell caller quotes are now coming in live
feed_is_live.set()
async for ticker in stream:
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic
await send_chan.send({topic: quote})
# for a real volume contract we rait for the first
# "real" trade to take place
if not calc_price and not ticker.rtTime:
# spin consuming tickers until we get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
# tell caller quotes are now coming in live
feed_is_live.set()
async with aclosing(stream):
async for ticker in stream:
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
)
# con = quote['contract']
# topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic
await send_chan.send({topic: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
def pack_position(pos: Position) -> Dict[str, Any]: def pack_position(pos: Position) -> Dict[str, Any]:
con = pos.contract con = pos.contract
@ -1183,3 +1196,33 @@ async def stream_trades(
continue continue
yield {'local_trades': (event_name, msg)} yield {'local_trades': (event_name, msg)}
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('ib') as client:
# load all symbols locally for fast search
await ctx.started({})
async with ctx.open_stream() as stream:
async for pattern in stream:
if not pattern:
# will get error on empty request
continue
results = await client.search_stocks(pattern=pattern, upto=5)
matches = fuzzy.extractBests(
pattern,
results,
score_cutoff=50,
)
await stream.send(
{item[2]: item[0]
for item in matches}
)