From f71f986dae35b0ba1c5c7d46cce3f0e360aa9fdc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Jun 2018 00:23:11 -0400 Subject: [PATCH] Use new IPC apis throughout core Move to using `Channel` throughout instead of the `StreamQueue` and add a very basic function for upcoming actor model testing. --- piker/brokers/core.py | 87 ++++++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index dafe195b..7a05c070 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -11,7 +11,7 @@ from typing import Coroutine, Callable import trio from ..log import get_logger -from ..ipc import StreamQueue +from ..ipc import StreamQueue, Channel from . import get_brokermod log = get_logger('broker.core') @@ -72,7 +72,7 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def stream_quotes( brokermod: ModuleType, get_quotes: Coroutine, - tickers2qs: {str: StreamQueue}, + tickers2chans: {str: Channel}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -93,11 +93,11 @@ async def stream_quotes( while True: # use an event here to trigger exit? prequote_start = time.time() - if not any(tickers2qs.values()): + if not any(tickers2chans.values()): log.warn(f"No subs left for broker {brokermod.name}, exiting task") break - tickers = list(tickers2qs.keys()) + tickers = list(tickers2chans.keys()) with trio.move_on_after(3) as cancel_scope: quotes = await get_quotes(tickers) @@ -118,25 +118,25 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - for queue in tickers2qs[symbol]: - q_payloads.setdefault(queue, {})[symbol] = quote + for chan in tickers2chans[symbol]: + q_payloads.setdefault(chan, {})[symbol] = quote else: - for queue in tickers2qs[symbol]: - q_payloads.setdefault(queue, {})[symbol] = quote + for chan in tickers2chans[symbol]: + q_payloads.setdefault(chan, {})[symbol] = quote # deliver to each subscriber if q_payloads: - for queue, payload in q_payloads.items(): + for chan, payload in q_payloads.items(): try: - await queue.put(payload) + await chan.send(payload) except ( # That's right, anything you can think of... trio.ClosedStreamError, ConnectionResetError, ConnectionRefusedError, ): - log.warn(f"{queue.peer} went down?") - for qset in tickers2qs.values(): - qset.discard(queue) + log.warn(f"{chan.addr} went down?") + for qset in tickers2chans.values(): + qset.discard(chan) req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) @@ -156,7 +156,7 @@ async def start_quoter( broker2tickersubs: dict, clients: dict, dtasks: set, # daemon task registry - nursery: "Nusery", + nursery: 'Nursery', stream: trio.SocketStream, ) -> None: """Handle per-broker quote stream subscriptions. @@ -165,12 +165,13 @@ async def start_quoter( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - queue = StreamQueue(stream) # wrap in a shabby queue-like api - log.info(f"Accepted new connection from {queue.peer}") - async with queue.stream: - async for broker, tickers in queue: + chan = Channel(stream=stream) + log.info(f"Accepted new connection from {chan.raddr}") + async with chan.squeue.stream: + async for broker, tickers in chan: + tickers = list(tickers) log.info( - f"{queue.peer} subscribed to {broker} for tickers {tickers}") + f"{chan.raddr} subscribed to {broker} for tickers {tickers}") if broker not in broker2tickersubs: brokermod = get_brokermod(broker) @@ -181,11 +182,11 @@ async def start_quoter( get_quotes = await brokermod.quoter(client, tickers) clients[broker] = ( brokermod, client, client_cntxmng, get_quotes) - tickers2qs = broker2tickersubs.setdefault(broker, {}) + tickers2chans = broker2tickersubs.setdefault(broker, {}) else: log.info(f"Subscribing with existing `{broker}` daemon") brokermod, client, _, get_quotes = clients[broker] - tickers2qs = broker2tickersubs[broker] + tickers2chans = broker2tickersubs[broker] # beginning of section to be trimmed out with #37 ################################################# @@ -193,7 +194,7 @@ async def start_quoter( # NOTE: this code is always run for every new client # subscription even when a broker quoter task is already running # since the new client needs to know what symbols are accepted - log.warn(f"Retrieving smoke quote for {queue.peer}") + log.warn(f"Retrieving smoke quote for {chan.raddr}") quotes = await get_quotes(tickers) # report any tickers that aren't returned in the first quote invalid_tickers = set(tickers) - set(quotes) @@ -221,32 +222,32 @@ async def start_quoter( # clients to receive broker specific setup info) sd = await client.symbol_data(tickers) assert sd, "No symbol data could be found?" - await queue.put(sd) + await chan.send(sd) - # update map from each symbol to requesting client's queue + # update map from each symbol to requesting client's chan for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) + tickers2chans.setdefault(ticker, set()).add(chan) # push initial quotes response for client initialization - await queue.put(payload) + await chan.send(payload) if broker not in dtasks: # no quoter task yet # task should begin on the next checkpoint/iteration log.info(f"Spawning quoter task for {brokermod.name}") nursery.start_soon( - stream_quotes, brokermod, get_quotes, tickers2qs) + stream_quotes, brokermod, get_quotes, tickers2chans) dtasks.add(broker) log.debug("Waiting on subscription request") else: - log.info(f"client @ {queue.peer} disconnected") + log.info(f"client @ {chan.raddr} disconnected") # drop any lingering subscriptions - for ticker, qset in tickers2qs.items(): - qset.discard(queue) + for ticker, qset in tickers2chans.items(): + qset.discard(chan) # if there are no more subscriptions with this broker # drop from broker subs dict - if not any(tickers2qs.values()): + if not any(tickers2chans.values()): log.info(f"No more subscriptions for {broker}") broker2tickersubs.pop(broker, None) dtasks.discard(broker) @@ -278,3 +279,27 @@ async def _brokerd_main(host) -> None: ) ) log.debug(f"Spawned {listeners}") + + +async def _test_price_stream(broker, symbols, *, chan=None): + """Test function for initial tractor draft. + """ + brokermod = get_brokermod(broker) + client_cntxmng = brokermod.get_client() + client = await client_cntxmng.__aenter__() + get_quotes = await brokermod.quoter(client, symbols) + log.info(f"Spawning quoter task for {brokermod.name}") + assert chan + tickers2chans = {}.fromkeys(symbols, {chan, }) + + async def terminate_on_None(chan, nursery): + val = await chan.recv() + if val is None: + log.info("Got terminate sentinel!") + nursery.cancel_scope.cancel() + + async with trio.open_nursery() as nursery: + nursery.start_soon( + stream_quotes, brokermod, get_quotes, tickers2chans) + # nursery.start_soon( + # terminate_on_None, chan, nursery)