Add a quote stream server task

Add a daemon-server task for delivering subscription based
quote streams via json serialized packets wrapped in a queue
interface.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-04-16 02:03:22 -04:00
parent 07c95e4f3f
commit d65bd78f5d
1 changed files with 96 additions and 2 deletions

View File

@ -3,6 +3,7 @@ Core broker-daemon tasks and API.
"""
import time
import inspect
import json
from functools import partial
import socket
from types import ModuleType
@ -67,11 +68,55 @@ async def wait_for_network(get_quotes, sleep=1):
await trio.sleep(sleep)
class Disconnect(trio.Cancelled):
"Stream was closed"
class StreamQueue:
"""Stream wrapped as a queue that delivers json serialized "packets"
delimited by ``delim``.
"""
def __init__(self, stream, delim=b'\n'):
self.stream = stream
self._delim = delim
self.peer = stream.socket.getpeername()
async def get(self):
"""Get a packet from the underlying stream.
"""
delim = self._delim
buff = b''
while True:
data = await self.stream.receive_some(2**10)
log.trace(f"Data is {data}")
if data == b'':
raise Disconnect("Stream connection was closed")
buff += data
if delim in buff:
try:
return json.loads(buff)
except json.decoder.JSONDecodeError:
log.exception("Failed to process JSON packet:")
continue
async def put(self, data):
return await self.stream.send_all(json.dumps(data).encode() + b'\n')
async def __aiter__(self):
return self
async def __anext__(self):
try:
return await self.get()
except Disconnect:
raise StopAsyncIteration
async def poll_tickers(
client: 'Client',
quoter: AsyncContextManager,
tickers: [str],
q: trio.Queue,
queue: StreamQueue,
rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None:
@ -85,6 +130,11 @@ async def poll_tickers(
_cache = {} # ticker to quote caching
async with quoter(client, tickers) as get_quotes:
# run a first quote smoke test filtering out any bad tickers
first_quotes_dict = await get_quotes(tickers)
# FIXME: oh god it's so hideous
tickers[:] = list(first_quotes_dict.keys())[:]
while True: # use an event here to trigger exit?
prequote_start = time.time()
@ -118,7 +168,7 @@ async def poll_tickers(
payload[symbol] = quote
if payload:
q.put_nowait(payload)
await queue.put(payload)
req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3)
@ -132,3 +182,47 @@ async def poll_tickers(
else:
log.debug(f"Sleeping for {delay}")
await trio.sleep(delay)
async def _handle_subs(
queue,
stream2tickers,
nursery,
task_status=trio.TASK_STATUS_IGNORED
):
"""Handle quote stream subscriptions.
"""
async with queue.stream:
async for tickers in queue:
task_status.started(tickers)
log.info(f"{queue.peer} subscribed for tickers {tickers}")
stream2tickers[queue.peer] = tickers
else:
log.info(f"{queue.peer} was disconnected")
nursery.cancel_scope.cancel()
async def _daemon_main(brokermod):
"""Main entry point for the piker daemon.
"""
stream2tickers = {}
async with brokermod.get_client() as client:
async def start_quoter(stream):
queue = StreamQueue(stream) # wrap in a shabby queue-like api
log.debug(f"Accepted new connection from {queue.peer}")
# spawn request handler
async with trio.open_nursery() as nursery:
await nursery.start(
_handle_subs, queue, stream2tickers, nursery)
nursery.start_soon(
poll_tickers, client, brokermod.quoter,
stream2tickers[queue.peer], queue
)
async with trio.open_nursery() as nursery:
listeners = await nursery.start(
partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1')
)
log.debug(f"Spawned {listeners}")