Port stream_quotes to tractor ipc protocol

kivy_mainline_and_py3.8
Tyler Goodlet 2018-06-23 14:35:48 -04:00
parent 37eb8a8552
commit fa8418f97f
1 changed files with 15 additions and 18 deletions

View File

@ -75,6 +75,7 @@ async def stream_quotes(
tickers2chans: {str: Channel},
rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
cid: str = None,
) -> None:
"""Stream quotes for a sequence of tickers at the given ``rate``
per second.
@ -82,6 +83,7 @@ async def stream_quotes(
A broker-client ``quoter`` async context manager must be provided which
returns an async quote function.
"""
packet = {'yield': {}, 'cid': cid}
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
@ -108,7 +110,7 @@ async def stream_quotes(
quotes = await wait_for_network(partial(get_quotes, tickers))
postquote_start = time.time()
q_payloads = {}
chan_payloads = {}
for symbol, quote in quotes.items():
if diff_cached:
# if cache is enabled then only deliver "new" changes
@ -119,14 +121,15 @@ async def stream_quotes(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
for chan in tickers2chans[symbol]:
q_payloads.setdefault(chan, {})[symbol] = quote
chan_payloads.setdefault(
chan, packet.copy())['yield'][symbol] = quote
else:
for chan in tickers2chans[symbol]:
q_payloads.setdefault(chan, {})[symbol] = quote
chan_payloads.setdefault(
chan, packet.copy())['yield'][symbol] = quote
# deliver to each subscriber
if q_payloads:
for chan, payload in q_payloads.items():
if chan_payloads:
for chan, payload in chan_payloads.items():
try:
await chan.send(payload)
except (
@ -134,7 +137,7 @@ async def stream_quotes(
trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError,
):
log.warn(f"{chan.addr} went down?")
log.warn(f"{chan.raddr} went down?")
for qset in tickers2chans.values():
qset.discard(chan)
@ -275,13 +278,13 @@ async def _brokerd_main(host) -> None:
start_quoter, broker2tickersubs, clients,
dtasks, nursery
),
1616, host=host,
1617, host=host,
)
)
log.debug(f"Spawned {listeners}")
async def _test_price_stream(broker, symbols, *, chan=None):
async def _test_price_stream(broker, symbols, *, chan=None, cid=None):
"""Test function for initial tractor draft.
"""
brokermod = get_brokermod(broker)
@ -292,14 +295,8 @@ async def _test_price_stream(broker, symbols, *, chan=None):
assert chan
tickers2chans = {}.fromkeys(symbols, {chan, })
async def terminate_on_None(chan, nursery):
val = await chan.recv()
if val is None:
log.info("Got terminate sentinel!")
nursery.cancel_scope.cancel()
async with trio.open_nursery() as nursery:
nursery.start_soon(
stream_quotes, brokermod, get_quotes, tickers2chans)
# nursery.start_soon(
# terminate_on_None, chan, nursery)
partial(
stream_quotes, brokermod, get_quotes, tickers2chans, cid=cid)
)