Use new IPC apis throughout core

Move to using `Channel` throughout instead of the `StreamQueue`
and add a very basic function for upcoming actor model testing.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-06-07 00:23:11 -04:00
parent 2f82db33f4
commit f71f986dae
1 changed files with 56 additions and 31 deletions

View File

@ -11,7 +11,7 @@ from typing import Coroutine, Callable
import trio import trio
from ..log import get_logger from ..log import get_logger
from ..ipc import StreamQueue from ..ipc import StreamQueue, Channel
from . import get_brokermod from . import get_brokermod
log = get_logger('broker.core') log = get_logger('broker.core')
@ -72,7 +72,7 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
async def stream_quotes( async def stream_quotes(
brokermod: ModuleType, brokermod: ModuleType,
get_quotes: Coroutine, get_quotes: Coroutine,
tickers2qs: {str: StreamQueue}, tickers2chans: {str: Channel},
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None: ) -> None:
@ -93,11 +93,11 @@ async def stream_quotes(
while True: # use an event here to trigger exit? while True: # use an event here to trigger exit?
prequote_start = time.time() prequote_start = time.time()
if not any(tickers2qs.values()): if not any(tickers2chans.values()):
log.warn(f"No subs left for broker {brokermod.name}, exiting task") log.warn(f"No subs left for broker {brokermod.name}, exiting task")
break break
tickers = list(tickers2qs.keys()) tickers = list(tickers2chans.keys())
with trio.move_on_after(3) as cancel_scope: with trio.move_on_after(3) as cancel_scope:
quotes = await get_quotes(tickers) quotes = await get_quotes(tickers)
@ -118,25 +118,25 @@ async def stream_quotes(
log.info( log.info(
f"New quote {quote['symbol']}:\n{new}") f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote _cache[symbol] = quote
for queue in tickers2qs[symbol]: for chan in tickers2chans[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote q_payloads.setdefault(chan, {})[symbol] = quote
else: else:
for queue in tickers2qs[symbol]: for chan in tickers2chans[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote q_payloads.setdefault(chan, {})[symbol] = quote
# deliver to each subscriber # deliver to each subscriber
if q_payloads: if q_payloads:
for queue, payload in q_payloads.items(): for chan, payload in q_payloads.items():
try: try:
await queue.put(payload) await chan.send(payload)
except ( except (
# That's right, anything you can think of... # That's right, anything you can think of...
trio.ClosedStreamError, ConnectionResetError, trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError, ConnectionRefusedError,
): ):
log.warn(f"{queue.peer} went down?") log.warn(f"{chan.addr} went down?")
for qset in tickers2qs.values(): for qset in tickers2chans.values():
qset.discard(queue) qset.discard(chan)
req_time = round(postquote_start - prequote_start, 3) req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3) proc_time = round(time.time() - postquote_start, 3)
@ -156,7 +156,7 @@ async def start_quoter(
broker2tickersubs: dict, broker2tickersubs: dict,
clients: dict, clients: dict,
dtasks: set, # daemon task registry dtasks: set, # daemon task registry
nursery: "Nusery", nursery: 'Nursery',
stream: trio.SocketStream, stream: trio.SocketStream,
) -> None: ) -> None:
"""Handle per-broker quote stream subscriptions. """Handle per-broker quote stream subscriptions.
@ -165,12 +165,13 @@ async def start_quoter(
Since most brokers seems to support batch quote requests we Since most brokers seems to support batch quote requests we
limit to one task per process for now. limit to one task per process for now.
""" """
queue = StreamQueue(stream) # wrap in a shabby queue-like api chan = Channel(stream=stream)
log.info(f"Accepted new connection from {queue.peer}") log.info(f"Accepted new connection from {chan.raddr}")
async with queue.stream: async with chan.squeue.stream:
async for broker, tickers in queue: async for broker, tickers in chan:
tickers = list(tickers)
log.info( log.info(
f"{queue.peer} subscribed to {broker} for tickers {tickers}") f"{chan.raddr} subscribed to {broker} for tickers {tickers}")
if broker not in broker2tickersubs: if broker not in broker2tickersubs:
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
@ -181,11 +182,11 @@ async def start_quoter(
get_quotes = await brokermod.quoter(client, tickers) get_quotes = await brokermod.quoter(client, tickers)
clients[broker] = ( clients[broker] = (
brokermod, client, client_cntxmng, get_quotes) brokermod, client, client_cntxmng, get_quotes)
tickers2qs = broker2tickersubs.setdefault(broker, {}) tickers2chans = broker2tickersubs.setdefault(broker, {})
else: else:
log.info(f"Subscribing with existing `{broker}` daemon") log.info(f"Subscribing with existing `{broker}` daemon")
brokermod, client, _, get_quotes = clients[broker] brokermod, client, _, get_quotes = clients[broker]
tickers2qs = broker2tickersubs[broker] tickers2chans = broker2tickersubs[broker]
# beginning of section to be trimmed out with #37 # beginning of section to be trimmed out with #37
################################################# #################################################
@ -193,7 +194,7 @@ async def start_quoter(
# NOTE: this code is always run for every new client # NOTE: this code is always run for every new client
# subscription even when a broker quoter task is already running # subscription even when a broker quoter task is already running
# since the new client needs to know what symbols are accepted # since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for {queue.peer}") log.warn(f"Retrieving smoke quote for {chan.raddr}")
quotes = await get_quotes(tickers) quotes = await get_quotes(tickers)
# report any tickers that aren't returned in the first quote # report any tickers that aren't returned in the first quote
invalid_tickers = set(tickers) - set(quotes) invalid_tickers = set(tickers) - set(quotes)
@ -221,32 +222,32 @@ async def start_quoter(
# clients to receive broker specific setup info) # clients to receive broker specific setup info)
sd = await client.symbol_data(tickers) sd = await client.symbol_data(tickers)
assert sd, "No symbol data could be found?" assert sd, "No symbol data could be found?"
await queue.put(sd) await chan.send(sd)
# update map from each symbol to requesting client's queue # update map from each symbol to requesting client's chan
for ticker in tickers: for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue) tickers2chans.setdefault(ticker, set()).add(chan)
# push initial quotes response for client initialization # push initial quotes response for client initialization
await queue.put(payload) await chan.send(payload)
if broker not in dtasks: # no quoter task yet if broker not in dtasks: # no quoter task yet
# task should begin on the next checkpoint/iteration # task should begin on the next checkpoint/iteration
log.info(f"Spawning quoter task for {brokermod.name}") log.info(f"Spawning quoter task for {brokermod.name}")
nursery.start_soon( nursery.start_soon(
stream_quotes, brokermod, get_quotes, tickers2qs) stream_quotes, brokermod, get_quotes, tickers2chans)
dtasks.add(broker) dtasks.add(broker)
log.debug("Waiting on subscription request") log.debug("Waiting on subscription request")
else: else:
log.info(f"client @ {queue.peer} disconnected") log.info(f"client @ {chan.raddr} disconnected")
# drop any lingering subscriptions # drop any lingering subscriptions
for ticker, qset in tickers2qs.items(): for ticker, qset in tickers2chans.items():
qset.discard(queue) qset.discard(chan)
# if there are no more subscriptions with this broker # if there are no more subscriptions with this broker
# drop from broker subs dict # drop from broker subs dict
if not any(tickers2qs.values()): if not any(tickers2chans.values()):
log.info(f"No more subscriptions for {broker}") log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None) broker2tickersubs.pop(broker, None)
dtasks.discard(broker) dtasks.discard(broker)
@ -278,3 +279,27 @@ async def _brokerd_main(host) -> None:
) )
) )
log.debug(f"Spawned {listeners}") log.debug(f"Spawned {listeners}")
async def _test_price_stream(broker, symbols, *, chan=None):
"""Test function for initial tractor draft.
"""
brokermod = get_brokermod(broker)
client_cntxmng = brokermod.get_client()
client = await client_cntxmng.__aenter__()
get_quotes = await brokermod.quoter(client, symbols)
log.info(f"Spawning quoter task for {brokermod.name}")
assert chan
tickers2chans = {}.fromkeys(symbols, {chan, })
async def terminate_on_None(chan, nursery):
val = await chan.recv()
if val is None:
log.info("Got terminate sentinel!")
nursery.cancel_scope.cancel()
async with trio.open_nursery() as nursery:
nursery.start_soon(
stream_quotes, brokermod, get_quotes, tickers2chans)
# nursery.start_soon(
# terminate_on_None, chan, nursery)