Fix up some test warnings (summary) spots

loglevel_to_testpikerd
Tyler Goodlet 2023-02-21 10:05:28 -05:00
parent cd3e9b1b2a
commit 2bad692703
2 changed files with 22 additions and 18 deletions

View File

@ -20,10 +20,6 @@ Broker clients, daemons and general back end machinery.
from importlib import import_module from importlib import import_module
from types import ModuleType from types import ModuleType
# TODO: move to urllib3/requests once supported
import asks
asks.init('trio')
__brokers__ = [ __brokers__ = [
'binance', 'binance',
'ib', 'ib',
@ -45,8 +41,10 @@ __brokers__ = [
def get_brokermod(brokername: str) -> ModuleType: def get_brokermod(brokername: str) -> ModuleType:
"""Return the imported broker module by name. '''
""" Return the imported broker module by name.
'''
module = import_module('.' + brokername, 'piker.brokers') module = import_module('.' + brokername, 'piker.brokers')
# we only allow monkeying because it's for internal keying # we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1] module.name = module.__name__.split('.')[-1]
@ -54,7 +52,9 @@ def get_brokermod(brokername: str) -> ModuleType:
def iter_brokermods(): def iter_brokermods():
"""Iterate all built-in broker modules. '''
""" Iterate all built-in broker modules.
'''
for name in __brokers__: for name in __brokers__:
yield get_brokermod(name) yield get_brokermod(name)

View File

@ -227,26 +227,28 @@ async def get_cached_feed(
@tractor.stream @tractor.stream
async def start_quote_stream( async def start_quote_stream(
ctx: tractor.Context, # marks this as a streaming func stream: tractor.Context, # marks this as a streaming func
broker: str, broker: str,
symbols: List[Any], symbols: List[Any],
feed_type: str = 'stock', feed_type: str = 'stock',
rate: int = 3, rate: int = 3,
) -> None: ) -> None:
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub '''
Handle per-broker quote stream subscriptions using a "lazy" pub-sub
pattern. pattern.
Spawns new quoter tasks for each broker backend on-demand. Spawns new quoter tasks for each broker backend on-demand.
Since most brokers seems to support batch quote requests we Since most brokers seems to support batch quote requests we
limit to one task per process (for now). limit to one task per process (for now).
"""
'''
# XXX: why do we need this again? # XXX: why do we need this again?
get_console_log(tractor.current_actor().loglevel) get_console_log(tractor.current_actor().loglevel)
# pull global vars from local actor # pull global vars from local actor
symbols = list(symbols) symbols = list(symbols)
log.info( log.info(
f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") f"{stream.chan.uid} subscribed to {broker} for symbols {symbols}")
# another actor task may have already created it # another actor task may have already created it
async with get_cached_feed(broker) as feed: async with get_cached_feed(broker) as feed:
@ -290,13 +292,13 @@ async def start_quote_stream(
assert fquote['displayable'] assert fquote['displayable']
payload[sym] = fquote payload[sym] = fquote
await ctx.send_yield(payload) await stream.send_yield(payload)
await stream_poll_requests( await stream_poll_requests(
# ``trionics.msgpub`` required kwargs # ``trionics.msgpub`` required kwargs
task_name=feed_type, task_name=feed_type,
ctx=ctx, ctx=stream,
topics=symbols, topics=symbols,
packetizer=feed.mod.packetizer, packetizer=feed.mod.packetizer,
@ -319,9 +321,11 @@ async def call_client(
class DataFeed: class DataFeed:
"""Data feed client for streaming symbol data from and making API client calls '''
to a (remote) ``brokerd`` daemon. Data feed client for streaming symbol data from and making API
""" client calls to a (remote) ``brokerd`` daemon.
'''
_allowed = ('stock', 'option') _allowed = ('stock', 'option')
def __init__(self, portal, brokermod): def __init__(self, portal, brokermod):