Drop old stream test, rename stock quote func

kivy_mainline_and_py3.8
Tyler Goodlet 2018-11-11 21:44:46 -05:00
parent c8cb5a2fdc
commit 2b1818ba33
2 changed files with 29 additions and 36 deletions

View File

@ -6,7 +6,7 @@ import inspect
from functools import partial from functools import partial
import socket import socket
from types import ModuleType from types import ModuleType
from typing import Coroutine, Callable from typing import Coroutine, Callable, List, Dict, Any
import trio import trio
import tractor import tractor
@ -19,14 +19,21 @@ log = get_logger('broker.core')
async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict:
"""Make (proxy through) an api call by name and return its result. """Make (proxy through) a broker API call by name and return its result.
""" """
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
meth = getattr(client.api, methname, None) meth = getattr(client.api, methname, None)
if meth is None:
log.warning(
"Couldn't find API method {methname} looking up on client")
meth = getattr(client, methname, None)
if meth is None: if meth is None:
log.error(f"No api method `{methname}` could be found?") log.error(f"No api method `{methname}` could be found?")
return return
elif not kwargs:
if not kwargs:
# verify kwargs requirements are met # verify kwargs requirements are met
sig = inspect.signature(meth) sig = inspect.signature(meth)
if sig.parameters: if sig.parameters:
@ -38,7 +45,10 @@ async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict:
return await meth(**kwargs) return await meth(**kwargs)
async def quote(brokermod: ModuleType, tickers: [str]) -> dict: async def stocks_quote(
brokermod: ModuleType,
tickers: List[str]
) -> Dict[str, Dict[str, Any]]:
"""Return quotes dict for ``tickers``. """Return quotes dict for ``tickers``.
""" """
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
@ -74,7 +84,7 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
async def stream_quotes( async def stream_quotes(
brokermod: ModuleType, brokermod: ModuleType,
get_quotes: Coroutine, get_quotes: Coroutine,
tickers2chans: {str: tractor.Channel}, tickers2chans: Dict[str, tractor.Channel],
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
cid: str = None, cid: str = None,
@ -82,8 +92,8 @@ async def stream_quotes(
"""Stream quotes for a sequence of tickers at the given ``rate`` """Stream quotes for a sequence of tickers at the given ``rate``
per second. per second.
A broker-client ``quoter`` async context manager must be provided which A stock-broker client ``get_quotes()`` async context manager must be
returns an async quote function. provided which returns an async quote retrieval function.
""" """
broker_limit = getattr(brokermod, '_rate_limit', float('inf')) broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate: if broker_limit < rate:
@ -133,7 +143,7 @@ async def stream_quotes(
{'yield': {}, 'cid': cid} {'yield': {}, 'cid': cid}
)['yield'][symbol] = quote )['yield'][symbol] = quote
# deliver to each subscriber # deliver to each subscriber (fan out)
if chan_payloads: if chan_payloads:
for chan, payload in chan_payloads.items(): for chan, payload in chan_payloads.items():
try: try:
@ -147,6 +157,7 @@ async def stream_quotes(
for chanset in tickers2chans.values(): for chanset in tickers2chans.values():
chanset.discard((chan, cid)) chanset.discard((chan, cid))
# latency monitoring
req_time = round(postquote_start - prequote_start, 3) req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3) proc_time = round(time.time() - postquote_start, 3)
tot = req_time + proc_time tot = req_time + proc_time
@ -164,8 +175,7 @@ async def stream_quotes(
async def get_cached_client(broker, tickers): async def get_cached_client(broker, tickers):
"""Get the current actor's cached broker client if available or create a """Get or create the current actor's cached broker client.
new one.
""" """
# check if a cached client is in the local actor's statespace # check if a cached client is in the local actor's statespace
clients = tractor.current_actor().statespace.setdefault('clients', {}) clients = tractor.current_actor().statespace.setdefault('clients', {})
@ -232,6 +242,8 @@ async def smoke_quote(get_quotes, tickers, broker):
def modify_quote_stream(broker, tickers, chan=None, cid=None): def modify_quote_stream(broker, tickers, chan=None, cid=None):
"""Absolute symbol subscription list for each quote stream. """Absolute symbol subscription list for each quote stream.
Effectively a consumer subscription api.
""" """
log.info(f"{chan} changed symbol subscription to {tickers}") log.info(f"{chan} changed symbol subscription to {tickers}")
ss = tractor.current_actor().statespace ss = tractor.current_actor().statespace
@ -261,7 +273,8 @@ async def start_quote_stream(
chan: tractor.Channel = None, chan: tractor.Channel = None,
cid: str = None, cid: str = None,
) -> None: ) -> None:
"""Handle per-broker quote stream subscriptions. """Handle per-broker quote stream subscriptions using a "lazy" pub-sub
pattern.
Spawns new quoter tasks for each broker backend on-demand. Spawns new quoter tasks for each broker backend on-demand.
Since most brokers seems to support batch quote requests we Since most brokers seems to support batch quote requests we
@ -286,7 +299,7 @@ async def start_quote_stream(
log.info(f"Subscribing with existing `{broker}` daemon") log.info(f"Subscribing with existing `{broker}` daemon")
tickers2chans = broker2tickersubs[broker] tickers2chans = broker2tickersubs[broker]
# do a smoke quote (not this mutates the input list and filters out bad # do a smoke quote (note this mutates the input list and filters out bad
# symbols for now) # symbols for now)
payload = await smoke_quote(get_quotes, tickers, broker) payload = await smoke_quote(get_quotes, tickers, broker)
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
@ -296,11 +309,9 @@ async def start_quote_stream(
modify_quote_stream(broker, tickers, chan=chan, cid=cid) modify_quote_stream(broker, tickers, chan=chan, cid=cid)
try: try:
if broker not in dtasks: # no quoter task yet if broker not in dtasks:
# task should begin on the next checkpoint/iteration # no quoter task yet so start a daemon task
# with trio.open_cancel_scope(shield=True):
log.info(f"Spawning quoter task for {brokermod.name}") log.info(f"Spawning quoter task for {brokermod.name}")
# await actor._root_nursery.start(partial(
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
nursery.start_soon(partial( nursery.start_soon(partial(
stream_quotes, brokermod, get_quotes, tickers2chans, stream_quotes, brokermod, get_quotes, tickers2chans,
@ -325,21 +336,3 @@ async def start_quote_stream(
log.info(f"No more subscriptions for {broker}") log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None) broker2tickersubs.pop(broker, None)
dtasks.discard(broker) dtasks.discard(broker)
async def _test_price_stream(broker, symbols, *, chan=None, cid=None):
"""Test function for initial tractor draft.
"""
brokermod = get_brokermod(broker)
client_cntxmng = brokermod.get_client()
client = await client_cntxmng.__aenter__()
get_quotes = await brokermod.quoter(client, symbols)
log.info(f"Spawning quoter task for {brokermod.name}")
assert chan
tickers2chans = {}.fromkeys(symbols, {(chan, cid), })
async with trio.open_nursery() as nursery:
nursery.start_soon(
partial(
stream_quotes, brokermod, get_quotes, tickers2chans, cid=cid)
)

View File

@ -90,7 +90,7 @@ def api(meth, kwargs, loglevel, broker, keys):
help='Broker backend to use') help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--df-output', '-df', flag_value=True, @click.option('--df-output', '-df', flag_value=True,
help='Ouput in `pandas.DataFrame` format') help='Output in `pandas.DataFrame` format')
@click.argument('tickers', nargs=-1, required=True) @click.argument('tickers', nargs=-1, required=True)
def quote(loglevel, broker, tickers, df_output): def quote(loglevel, broker, tickers, df_output):
"""Retreive symbol quotes on the console in either json or dataframe """Retreive symbol quotes on the console in either json or dataframe
@ -98,7 +98,7 @@ def quote(loglevel, broker, tickers, df_output):
""" """
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
get_console_log(loglevel) get_console_log(loglevel)
quotes = trio.run(partial(core.quote, brokermod, tickers)) quotes = trio.run(partial(core.stocks_quote, brokermod, tickers))
if not quotes: if not quotes:
log.error(f"No quotes could be found for {tickers}?") log.error(f"No quotes could be found for {tickers}?")
return return