diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 95eb6f08..a35e4aea 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -20,10 +20,6 @@ Broker clients, daemons and general back end machinery. from importlib import import_module from types import ModuleType -# TODO: move to urllib3/requests once supported -import asks -asks.init('trio') - __brokers__ = [ 'binance', 'ib', @@ -45,16 +41,20 @@ __brokers__ = [ def get_brokermod(brokername: str) -> ModuleType: - """Return the imported broker module by name. - """ + ''' + Return the imported broker module by name. + + ''' module = import_module('.' + brokername, 'piker.brokers') # we only allow monkeying because it's for internal keying - module.name = module.__name__.split('.')[-1] + module.name = module.__name__.split('.')[-1] return module def iter_brokermods(): - """Iterate all built-in broker modules. - """ + ''' + Iterate all built-in broker modules. + + ''' for name in __brokers__: yield get_brokermod(name) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index f0a8d367..5183d2c4 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -227,26 +227,28 @@ async def get_cached_feed( @tractor.stream async def start_quote_stream( - ctx: tractor.Context, # marks this as a streaming func + stream: tractor.Context, # marks this as a streaming func broker: str, symbols: List[Any], feed_type: str = 'stock', rate: int = 3, ) -> None: - """Handle per-broker quote stream subscriptions using a "lazy" pub-sub + ''' + Handle per-broker quote stream subscriptions using a "lazy" pub-sub pattern. Spawns new quoter tasks for each broker backend on-demand. Since most brokers seems to support batch quote requests we limit to one task per process (for now). - """ + + ''' # XXX: why do we need this again? get_console_log(tractor.current_actor().loglevel) # pull global vars from local actor symbols = list(symbols) log.info( - f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") + f"{stream.chan.uid} subscribed to {broker} for symbols {symbols}") # another actor task may have already created it async with get_cached_feed(broker) as feed: @@ -290,13 +292,13 @@ async def start_quote_stream( assert fquote['displayable'] payload[sym] = fquote - await ctx.send_yield(payload) + await stream.send_yield(payload) await stream_poll_requests( # ``trionics.msgpub`` required kwargs task_name=feed_type, - ctx=ctx, + ctx=stream, topics=symbols, packetizer=feed.mod.packetizer, @@ -319,9 +321,11 @@ async def call_client( class DataFeed: - """Data feed client for streaming symbol data from and making API client calls - to a (remote) ``brokerd`` daemon. - """ + ''' + Data feed client for streaming symbol data from and making API + client calls to a (remote) ``brokerd`` daemon. + + ''' _allowed = ('stock', 'option') def __init__(self, portal, brokermod):