diff --git a/piker/brokers/core.py b/piker/brokers/core.py index fac9d72c..e199ecd2 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -3,6 +3,7 @@ Core broker-daemon tasks and API. """ import time import inspect +import json from functools import partial import socket from types import ModuleType @@ -67,11 +68,55 @@ async def wait_for_network(get_quotes, sleep=1): await trio.sleep(sleep) +class Disconnect(trio.Cancelled): + "Stream was closed" + + +class StreamQueue: + """Stream wrapped as a queue that delivers json serialized "packets" + delimited by ``delim``. + """ + def __init__(self, stream, delim=b'\n'): + self.stream = stream + self._delim = delim + self.peer = stream.socket.getpeername() + + async def get(self): + """Get a packet from the underlying stream. + """ + delim = self._delim + buff = b'' + while True: + data = await self.stream.receive_some(2**10) + log.trace(f"Data is {data}") + if data == b'': + raise Disconnect("Stream connection was closed") + buff += data + if delim in buff: + try: + return json.loads(buff) + except json.decoder.JSONDecodeError: + log.exception("Failed to process JSON packet:") + continue + + async def put(self, data): + return await self.stream.send_all(json.dumps(data).encode() + b'\n') + + async def __aiter__(self): + return self + + async def __anext__(self): + try: + return await self.get() + except Disconnect: + raise StopAsyncIteration + + async def poll_tickers( client: 'Client', quoter: AsyncContextManager, tickers: [str], - q: trio.Queue, + queue: StreamQueue, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -85,6 +130,11 @@ async def poll_tickers( _cache = {} # ticker to quote caching async with quoter(client, tickers) as get_quotes: + # run a first quote smoke test filtering out any bad tickers + first_quotes_dict = await get_quotes(tickers) + # FIXME: oh god it's so hideous + tickers[:] = list(first_quotes_dict.keys())[:] + while True: # use an event here to trigger exit? prequote_start = time.time() @@ -118,7 +168,7 @@ async def poll_tickers( payload[symbol] = quote if payload: - q.put_nowait(payload) + await queue.put(payload) req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) @@ -132,3 +182,47 @@ async def poll_tickers( else: log.debug(f"Sleeping for {delay}") await trio.sleep(delay) + + +async def _handle_subs( + queue, + stream2tickers, + nursery, + task_status=trio.TASK_STATUS_IGNORED +): + """Handle quote stream subscriptions. + """ + async with queue.stream: + async for tickers in queue: + task_status.started(tickers) + log.info(f"{queue.peer} subscribed for tickers {tickers}") + stream2tickers[queue.peer] = tickers + else: + log.info(f"{queue.peer} was disconnected") + nursery.cancel_scope.cancel() + + +async def _daemon_main(brokermod): + """Main entry point for the piker daemon. + """ + stream2tickers = {} + async with brokermod.get_client() as client: + + async def start_quoter(stream): + queue = StreamQueue(stream) # wrap in a shabby queue-like api + log.debug(f"Accepted new connection from {queue.peer}") + + # spawn request handler + async with trio.open_nursery() as nursery: + await nursery.start( + _handle_subs, queue, stream2tickers, nursery) + nursery.start_soon( + poll_tickers, client, brokermod.quoter, + stream2tickers[queue.peer], queue + ) + + async with trio.open_nursery() as nursery: + listeners = await nursery.start( + partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1') + ) + log.debug(f"Spawned {listeners}")