Every ticker-chan subscription must include a caller id

kivy_mainline_and_py3.8
Tyler Goodlet 2018-07-05 15:23:38 -04:00
parent 5383dd6446
commit e395845ddb
1 changed files with 55 additions and 40 deletions

View File

@ -86,7 +86,6 @@ async def stream_quotes(
A broker-client ``quoter`` async context manager must be provided which A broker-client ``quoter`` async context manager must be provided which
returns an async quote function. returns an async quote function.
""" """
packet = {'yield': {}, 'cid': cid}
broker_limit = getattr(brokermod, '_rate_limit', float('inf')) broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate: if broker_limit < rate:
rate = broker_limit rate = broker_limit
@ -123,13 +122,18 @@ async def stream_quotes(
log.info( log.info(
f"New quote {quote['symbol']}:\n{new}") f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote _cache[symbol] = quote
for chan in tickers2chans[symbol]: for chan, cid in tickers2chans.get(symbol, set()):
chan_payloads.setdefault( chan_payloads.setdefault(
chan, packet.copy())['yield'][symbol] = quote chan,
{'yield': {}, 'cid': cid}
)['yield'][symbol] = quote
else: else:
for chan in tickers2chans[symbol]: for chan, cid in tickers2chans[symbol]:
chan_payloads.setdefault( chan_payloads.setdefault(
chan, packet.copy())['yield'][symbol] = quote chan,
{'yield': {}, 'cid': cid}
)['yield'][symbol] = quote
# deliver to each subscriber # deliver to each subscriber
if chan_payloads: if chan_payloads:
for chan, payload in chan_payloads.items(): for chan, payload in chan_payloads.items():
@ -140,9 +144,9 @@ async def stream_quotes(
trio.ClosedStreamError, ConnectionResetError, trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError, ConnectionRefusedError,
): ):
log.warn(f"{chan.raddr} went down?") log.warn(f"{chan} went down?")
for chanset in tickers2chans.values(): for chanset in tickers2chans.values():
chanset.discard(chan) chanset.discard((chan, cid))
req_time = round(postquote_start - prequote_start, 3) req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3) proc_time = round(time.time() - postquote_start, 3)
@ -157,6 +161,8 @@ async def stream_quotes(
log.debug(f"Sleeping for {delay}") log.debug(f"Sleeping for {delay}")
await trio.sleep(delay) await trio.sleep(delay)
log.info(f"Terminating stream quoter task for {brokermod.name}")
async def get_cached_client(broker, tickers): async def get_cached_client(broker, tickers):
"""Get the current actor's cached broker client if available or create a """Get the current actor's cached broker client if available or create a
@ -164,9 +170,9 @@ async def get_cached_client(broker, tickers):
""" """
# check if a cached client is in the local actor's statespace # check if a cached client is in the local actor's statespace
clients = tractor.current_actor().statespace.setdefault('clients', {}) clients = tractor.current_actor().statespace.setdefault('clients', {})
if clients: try:
return clients[broker] return clients[broker]
else: except KeyError:
log.info(f"Creating new client for broker {broker}") log.info(f"Creating new client for broker {broker}")
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
# TODO: move to AsyncExitStack in 3.7 # TODO: move to AsyncExitStack in 3.7
@ -188,7 +194,7 @@ async def symbol_data(broker, tickers):
async def smoke_quote(get_quotes, tickers, broker): async def smoke_quote(get_quotes, tickers, broker):
"""Do an initial "smoke" request for symbols in ``tickers`` filtering """Do an initial "smoke" request for symbols in ``tickers`` filtering
oout any symbols not supported by the broker queried in the call to out any symbols not supported by the broker queried in the call to
``get_quotes()``. ``get_quotes()``.
""" """
# TODO: trim out with #37 # TODO: trim out with #37
@ -234,14 +240,18 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None):
tickers2chans = broker2tickersubs.get(broker) tickers2chans = broker2tickersubs.get(broker)
# update map from each symbol to requesting client's chan # update map from each symbol to requesting client's chan
for ticker in tickers: for ticker in tickers:
chanset = tickers2chans.setdefault(ticker, set()) tickers2chans.setdefault(ticker, set()).add((chan, cid))
if chan not in chanset:
chanset.add(chan)
for ticker in filter(lambda ticker: ticker not in tickers, tickers2chans): for ticker in filter(
lambda ticker: ticker not in tickers, tickers2chans.copy()
):
chanset = tickers2chans.get(ticker) chanset = tickers2chans.get(ticker)
if chanset and chan in chanset: if chanset:
chanset.discard(chan) chanset.discard((chan, cid))
if not chanset:
# pop empty sets which will trigger bg quoter task termination
tickers2chans.pop(ticker)
async def start_quote_stream( async def start_quote_stream(
@ -281,31 +291,36 @@ async def start_quote_stream(
# update map from each symbol to requesting client's chan # update map from each symbol to requesting client's chan
modify_quote_stream(broker, tickers, chan=chan, cid=cid) modify_quote_stream(broker, tickers, chan=chan, cid=cid)
if broker not in dtasks: # no quoter task yet try:
# task should begin on the next checkpoint/iteration if broker not in dtasks: # no quoter task yet
log.info(f"Spawning quoter task for {brokermod.name}") # task should begin on the next checkpoint/iteration
async with trio.open_nursery() as nursery: # with trio.open_cancel_scope(shield=True):
nursery.start_soon(partial( log.info(f"Spawning quoter task for {brokermod.name}")
stream_quotes, brokermod, get_quotes, tickers2chans, # await actor._root_nursery.start(partial(
cid=cid) async with trio.open_nursery() as nursery:
) nursery.start_soon(partial(
dtasks.add(broker) stream_quotes, brokermod, get_quotes, tickers2chans,
# unblocks when no more symbols subscriptions exist and the cid=cid)
# quote streamer task terminates (usually because another call )
# was made to `modify_quoter` to unsubscribe from streaming dtasks.add(broker)
# symbols)
# if there are truly no more subscriptions with this broker # unblocks when no more symbols subscriptions exist and the
# drop from broker subs dict # quote streamer task terminates (usually because another call
if not any(tickers2chans.values()): # was made to `modify_quoter` to unsubscribe from streaming
log.info(f"No more subscriptions for {broker}") # symbols)
broker2tickersubs.pop(broker, None) log.info(f"Terminated quoter task for {brokermod.name}")
dtasks.discard(broker)
# TODO: move to AsyncExitStack in 3.7 # TODO: move to AsyncExitStack in 3.7
for _, _, cntxmng, _ in clients.values(): for _, _, cntxmng, _ in clients.values():
# FIXME: yes I know there's no error handling.. # FIXME: yes I know there's no error handling..
await cntxmng.__aexit__(None, None, None) await cntxmng.__aexit__(None, None, None)
finally:
# if there are truly no more subscriptions with this broker
# drop from broker subs dict
if not any(tickers2chans.values()):
log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None)
dtasks.discard(broker)
async def _test_price_stream(broker, symbols, *, chan=None, cid=None): async def _test_price_stream(broker, symbols, *, chan=None, cid=None):
@ -317,7 +332,7 @@ async def _test_price_stream(broker, symbols, *, chan=None, cid=None):
get_quotes = await brokermod.quoter(client, symbols) get_quotes = await brokermod.quoter(client, symbols)
log.info(f"Spawning quoter task for {brokermod.name}") log.info(f"Spawning quoter task for {brokermod.name}")
assert chan assert chan
tickers2chans = {}.fromkeys(symbols, {chan, }) tickers2chans = {}.fromkeys(symbols, {(chan, cid), })
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
nursery.start_soon( nursery.start_soon(