Don't recurse in Client.aiter_recv()
parent
6a6f773477
commit
a2c4f0c80b
|
@ -119,13 +119,13 @@ class Client:
|
||||||
startup_seq: Coroutine,
|
startup_seq: Coroutine,
|
||||||
auto_reconnect: bool = True,
|
auto_reconnect: bool = True,
|
||||||
):
|
):
|
||||||
self._sockaddr = sockaddr
|
self.sockaddr = sockaddr
|
||||||
self._startup_seq = startup_seq
|
self._startup_seq = startup_seq
|
||||||
self._autorecon = auto_reconnect
|
self._autorecon = auto_reconnect
|
||||||
self.squeue = None
|
self.squeue = None
|
||||||
|
|
||||||
async def connect(self, sockaddr: tuple = None, **kwargs):
|
async def connect(self, sockaddr: tuple = None, **kwargs):
|
||||||
sockaddr = sockaddr or self._sockaddr
|
sockaddr = sockaddr or self.sockaddr
|
||||||
stream = await trio.open_tcp_stream(*sockaddr, **kwargs)
|
stream = await trio.open_tcp_stream(*sockaddr, **kwargs)
|
||||||
self.squeue = StreamQueue(stream)
|
self.squeue = StreamQueue(stream)
|
||||||
await self._startup_seq(self)
|
await self._startup_seq(self)
|
||||||
|
@ -146,7 +146,7 @@ class Client:
|
||||||
await self.squeue.stream.aclose()
|
await self.squeue.stream.aclose()
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
await self.connect(self._sockaddr)
|
await self.connect(self.sockaddr)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def __aexit__(self, *args):
|
async def __aexit__(self, *args):
|
||||||
|
@ -163,7 +163,8 @@ class Client:
|
||||||
await self.connect()
|
await self.connect()
|
||||||
cancelled = cancel_scope.cancelled_caught
|
cancelled = cancel_scope.cancelled_caught
|
||||||
if cancelled:
|
if cancelled:
|
||||||
log.warn("Reconnect timed out after 3 seconds, retrying...")
|
log.warn(
|
||||||
|
"Reconnect timed out after 3 seconds, retrying...")
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
log.warn("Stream connection re-established!")
|
log.warn("Stream connection re-established!")
|
||||||
|
@ -172,23 +173,25 @@ class Client:
|
||||||
if not down:
|
if not down:
|
||||||
down = True
|
down = True
|
||||||
log.warn(
|
log.warn(
|
||||||
f"Connection to {self._sockaddr} went down, waiting"
|
f"Connection to {self.sockaddr} went down, waiting"
|
||||||
" for re-establishment")
|
" for re-establishment")
|
||||||
await trio.sleep(1)
|
await trio.sleep(1)
|
||||||
|
|
||||||
async def aiter_recv(self):
|
async def aiter_recv(self):
|
||||||
"""Async iterate items from underlying stream.
|
"""Async iterate items from underlying stream.
|
||||||
"""
|
"""
|
||||||
try:
|
while True:
|
||||||
async for item in self.squeue:
|
try:
|
||||||
yield item
|
async for item in self.squeue:
|
||||||
except trio.BrokenStreamError as err:
|
yield item
|
||||||
if not self._autorecon:
|
except trio.BrokenStreamError as err:
|
||||||
raise
|
if not self._autorecon:
|
||||||
if self._autorecon: # attempt reconnect
|
raise
|
||||||
await self._reconnect()
|
if self._autorecon: # attempt reconnect
|
||||||
async for item in self.aiter_recv():
|
await self._reconnect()
|
||||||
yield item
|
continue
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
@ -232,11 +235,6 @@ async def stream_quotes(
|
||||||
postquote_start = time.time()
|
postquote_start = time.time()
|
||||||
q_payloads = {}
|
q_payloads = {}
|
||||||
for symbol, quote in quotes.items():
|
for symbol, quote in quotes.items():
|
||||||
# FIXME: None is returned if a symbol can't be found.
|
|
||||||
# Consider filtering out such symbols before starting poll loop
|
|
||||||
if quote is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if diff_cached:
|
if diff_cached:
|
||||||
# if cache is enabled then only deliver "new" changes
|
# if cache is enabled then only deliver "new" changes
|
||||||
last = _cache.setdefault(symbol, {})
|
last = _cache.setdefault(symbol, {})
|
||||||
|
@ -301,7 +299,6 @@ async def start_quoter(
|
||||||
|
|
||||||
if broker not in broker2tickersubs:
|
if broker not in broker2tickersubs:
|
||||||
brokermod = get_brokermod(broker)
|
brokermod = get_brokermod(broker)
|
||||||
log.info(f"Spawning quote streamer for broker {broker}")
|
|
||||||
|
|
||||||
# TODO: move to AsyncExitStack in 3.7
|
# TODO: move to AsyncExitStack in 3.7
|
||||||
client_cntxmng = brokermod.get_client()
|
client_cntxmng = brokermod.get_client()
|
||||||
|
@ -310,12 +307,12 @@ async def start_quoter(
|
||||||
clients[broker] = (
|
clients[broker] = (
|
||||||
brokermod, client, client_cntxmng, get_quotes)
|
brokermod, client, client_cntxmng, get_quotes)
|
||||||
tickers2qs = broker2tickersubs.setdefault(
|
tickers2qs = broker2tickersubs.setdefault(
|
||||||
broker, {}.fromkeys(tickers, {queue,}))
|
broker, {}.fromkeys(tickers, {queue, }))
|
||||||
else:
|
else:
|
||||||
log.info(f"Subscribing with existing `{broker}` daemon")
|
log.info(f"Subscribing with existing `{broker}` daemon")
|
||||||
brokermod, client, _, get_quotes = clients[broker]
|
brokermod, client, _, get_quotes = clients[broker]
|
||||||
tickers2qs = broker2tickersubs[broker]
|
tickers2qs = broker2tickersubs[broker]
|
||||||
# update map from each symbol to requesting new client's queue
|
# update map from each symbol to requesting client's queue
|
||||||
for ticker in tickers:
|
for ticker in tickers:
|
||||||
tickers2qs.setdefault(ticker, set()).add(queue)
|
tickers2qs.setdefault(ticker, set()).add(queue)
|
||||||
|
|
||||||
|
@ -331,7 +328,8 @@ async def start_quoter(
|
||||||
tickers = set(tickers) - set(quotes)
|
tickers = set(tickers) - set(quotes)
|
||||||
for ticker in tickers:
|
for ticker in tickers:
|
||||||
log.warn(
|
log.warn(
|
||||||
f"Symbol `{ticker}` not found by broker `{brokermod.name}`")
|
f"Symbol `{ticker}` not found by broker `{brokermod.name}`"
|
||||||
|
)
|
||||||
tickers2qs.pop(ticker, None)
|
tickers2qs.pop(ticker, None)
|
||||||
|
|
||||||
# pop any tickers that return "empty" quotes
|
# pop any tickers that return "empty" quotes
|
||||||
|
|
Loading…
Reference in New Issue