Make daemon registry cross-task
parent
4f387ea2be
commit
063dfad5b4
|
@ -215,6 +215,10 @@ async def stream_quotes(
|
||||||
while True: # use an event here to trigger exit?
|
while True: # use an event here to trigger exit?
|
||||||
prequote_start = time.time()
|
prequote_start = time.time()
|
||||||
|
|
||||||
|
if not any(tickers2qs.values()):
|
||||||
|
log.warn(f"No subs left for broker {brokermod.name}, exiting task")
|
||||||
|
break
|
||||||
|
|
||||||
tickers = list(tickers2qs.keys())
|
tickers = list(tickers2qs.keys())
|
||||||
with trio.move_on_after(3) as cancel_scope:
|
with trio.move_on_after(3) as cancel_scope:
|
||||||
quotes = await get_quotes(tickers)
|
quotes = await get_quotes(tickers)
|
||||||
|
@ -274,15 +278,11 @@ async def stream_quotes(
|
||||||
log.debug(f"Sleeping for {delay}")
|
log.debug(f"Sleeping for {delay}")
|
||||||
await trio.sleep(delay)
|
await trio.sleep(delay)
|
||||||
|
|
||||||
if not any(tickers2qs.values()):
|
|
||||||
log.warn(
|
|
||||||
f"No subscriptions left, tearing down {brokermod.name} daemon")
|
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
async def start_quoter(
|
async def start_quoter(
|
||||||
broker2tickersubs: dict,
|
broker2tickersubs: dict,
|
||||||
clients: dict,
|
clients: dict,
|
||||||
|
dtasks: set, # daemon task registry
|
||||||
nursery: "Nusery",
|
nursery: "Nusery",
|
||||||
stream: trio.SocketStream,
|
stream: trio.SocketStream,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -292,7 +292,6 @@ async def start_quoter(
|
||||||
Since most brokers seems to support batch quote requests we
|
Since most brokers seems to support batch quote requests we
|
||||||
limit to one task per process for now.
|
limit to one task per process for now.
|
||||||
"""
|
"""
|
||||||
daemons = set()
|
|
||||||
queue = StreamQueue(stream) # wrap in a shabby queue-like api
|
queue = StreamQueue(stream) # wrap in a shabby queue-like api
|
||||||
log.info(f"Accepted new connection from {queue.peer}")
|
log.info(f"Accepted new connection from {queue.peer}")
|
||||||
async with queue.stream:
|
async with queue.stream:
|
||||||
|
@ -352,12 +351,12 @@ async def start_quoter(
|
||||||
# end of section to be trimmed out with #37
|
# end of section to be trimmed out with #37
|
||||||
###########################################
|
###########################################
|
||||||
|
|
||||||
if broker not in daemons: # no quoter task yet
|
if broker not in dtasks: # no quoter task yet
|
||||||
# task should begin on the next checkpoint/iteration
|
# task should begin on the next checkpoint/iteration
|
||||||
log.info(f"Spawning quoter task for {brokermod.name}")
|
log.info(f"Spawning quoter task for {brokermod.name}")
|
||||||
nursery.start_soon(
|
nursery.start_soon(
|
||||||
stream_quotes, brokermod, get_quotes, tickers2qs)
|
stream_quotes, brokermod, get_quotes, tickers2qs)
|
||||||
daemons.add(broker)
|
dtasks.add(broker)
|
||||||
|
|
||||||
log.debug("Waiting on subscription request")
|
log.debug("Waiting on subscription request")
|
||||||
else:
|
else:
|
||||||
|
@ -370,7 +369,8 @@ async def start_quoter(
|
||||||
# drop from broker subs dict
|
# drop from broker subs dict
|
||||||
if not any(tickers2qs.values()):
|
if not any(tickers2qs.values()):
|
||||||
log.info(f"No more subscriptions for {broker}")
|
log.info(f"No more subscriptions for {broker}")
|
||||||
broker2tickersubs.pop(broker)
|
broker2tickersubs.pop(broker, None)
|
||||||
|
dtasks.discard(broker)
|
||||||
|
|
||||||
# TODO: move to AsyncExitStack in 3.7
|
# TODO: move to AsyncExitStack in 3.7
|
||||||
for _, _, cntxmng, _ in clients.values():
|
for _, _, cntxmng, _ in clients.values():
|
||||||
|
@ -385,12 +385,16 @@ async def _daemon_main() -> None:
|
||||||
# global space for broker-daemon subscriptions
|
# global space for broker-daemon subscriptions
|
||||||
broker2tickersubs = {}
|
broker2tickersubs = {}
|
||||||
clients = {}
|
clients = {}
|
||||||
|
dtasks = set()
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
listeners = await nursery.start(
|
listeners = await nursery.start(
|
||||||
partial(
|
partial(
|
||||||
trio.serve_tcp,
|
trio.serve_tcp,
|
||||||
partial(start_quoter, broker2tickersubs, clients, nursery),
|
partial(
|
||||||
|
start_quoter, broker2tickersubs, clients,
|
||||||
|
dtasks, nursery
|
||||||
|
),
|
||||||
1616, host='127.0.0.1'
|
1616, host='127.0.0.1'
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue