diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 112e935e..98c3a979 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -45,7 +45,9 @@ from ib_insync.objects import Position import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client +from fuzzywuzzy import process as fuzzy +from .api import open_cached_client from ..log import get_logger, get_console_log from .._daemon import maybe_spawn_brokerd from ..data._source import from_df @@ -322,7 +324,7 @@ class Client: sym, exch = symbol.upper().rsplit('.', maxsplit=1) except ValueError: # likely there's an embedded `.` for a forex pair - await tractor.breakpoint() + breakpoint() # futes if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): @@ -350,10 +352,13 @@ class Client: if exch in ('PURE', 'TSE'): # non-yankee currency = 'CAD' - if exch in ('PURE', 'TSE'): - # stupid ib... - primaryExchange = exch - exch = 'SMART' + # stupid ib... + primaryExchange = exch + exch = 'SMART' + + else: + exch = 'SMART' + primaryExchange = exch con = ibis.Stock( symbol=sym, @@ -994,23 +999,31 @@ async def stream_quotes( } } + con = first_ticker.contract + + # should be real volume for this contract by default + calc_price = False + # check for special contract types if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - # should be real volume for this contract - calc_price = False + + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange + else: # commodities and forex don't have an exchange name and # no real volume so we have to calculate the price - suffix = 'secType' + suffix = con.secType + # no real volume on this tract calc_price = True - # pass first quote asap quote = normalize(first_ticker, calc_price=calc_price) con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() + topic = '.'.join((con['symbol'], suffix)).lower() quote['symbol'] = topic + # pass first quote asap first_quote = {topic: quote} # ugh, clear ticks since we've consumed them @@ -1022,50 +1035,50 @@ async def stream_quotes( task_status.started((init_msgs, first_quote)) if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - calc_price = False # should be real volume for contract + # suffix = 'exchange' + # calc_price = False # should be real volume for contract # wait for real volume on feed (trading might be closed) - async with aclosing(stream): + while True: - async for ticker in stream: - - # for a real volume contract we rait for the first - # "real" trade to take place - if not calc_price and not ticker.rtTime: - # spin consuming tickers until we get a real market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - # tell caller quotes are now coming in live - feed_is_live.set() - - async for ticker in stream: - - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic - - await send_chan.send({topic: quote}) + ticker = await stream.receive() + # for a real volume contract we rait for the first + # "real" trade to take place + if not calc_price and not ticker.rtTime: + # spin consuming tickers until we get a real market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) ticker.ticks = [] + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + # tell caller quotes are now coming in live + feed_is_live.set() + + async with aclosing(stream): + async for ticker in stream: + + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price + ) + + # con = quote['contract'] + # topic = '.'.join((con['symbol'], suffix)).lower() + quote['symbol'] = topic + await send_chan.send({topic: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + def pack_position(pos: Position) -> Dict[str, Any]: con = pos.contract @@ -1183,3 +1196,33 @@ async def stream_trades( continue yield {'local_trades': (event_name, msg)} + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, +) -> Client: + async with open_cached_client('ib') as client: + + # load all symbols locally for fast search + await ctx.started({}) + + async with ctx.open_stream() as stream: + + async for pattern in stream: + + if not pattern: + # will get error on empty request + continue + + results = await client.search_stocks(pattern=pattern, upto=5) + + matches = fuzzy.extractBests( + pattern, + results, + score_cutoff=50, + ) + await stream.send( + {item[2]: item[0] + for item in matches} + )