Compare commits

..

No commits in common. "ef3309adf95bc9a9f2ac78e60e8088ea015d45b8" and "9d01b5367b6a3580c64529ae2e3d5cca5e4e236e" have entirely different histories.

4 changed files with 9 additions and 34 deletions

View File

@ -729,7 +729,6 @@ class Router(Struct):
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
tractor.TransportClosed,
):
to_remove.add(client_stream)
log.warning(
@ -1700,5 +1699,5 @@ async def _emsd_main(
if not client_streams:
log.warning(
f'Order dialog is not being monitored:\n'
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)

View File

@ -99,7 +99,6 @@ class Sampler:
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
tractor.TransportClosed,
)
# holds all the ``tractor.Context`` remote subscriptions for
@ -754,7 +753,7 @@ async def sample_and_broadcast(
log.warning(
f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n'
f'feed @ {chan.aid.reprol()}\n'
f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz'
)

View File

@ -105,10 +105,7 @@ class NoBsWs:
def connected(self) -> bool:
return self._connected.is_set()
async def reset(
self,
timeout: float,
) -> bool:
async def reset(self) -> None:
'''
Reset the underlying ws connection by cancelling
the bg relay task and waiting for it to signal
@ -117,31 +114,18 @@ class NoBsWs:
'''
self._connected = trio.Event()
self._cs.cancel()
with trio.move_on_after(timeout) as cs:
await self._connected.wait()
return True
assert cs.cancelled_caught
return False
await self._connected.wait()
async def send_msg(
self,
data: Any,
timeout: float = 3,
) -> None:
while True:
try:
msg: Any = self._dumps(data)
return await self._ws.send_message(msg)
except self.recon_errors:
with trio.CancelScope(shield=True):
reconnected: bool = await self.reset(
timeout=timeout,
)
if not reconnected:
log.warning(
'Failed to reconnect after {timeout!r}s ??'
)
await self.reset()
async def recv_msg(self) -> Any:
msg: Any = await self._rx.receive()
@ -207,9 +191,7 @@ async def _reconnect_forever(
f'{src_mod}\n'
f'{url} connection bail with:'
)
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
await trio.sleep(0.5)
rent_cs.cancel()
# go back to reonnect loop in parent task
@ -309,8 +291,7 @@ async def _reconnect_forever(
log.exception(
'Reconnect-attempt failed ??\n'
)
with trio.CancelScope(shield=True):
await trio.sleep(0.2) # throttle
await trio.sleep(0.2) # throttle
raise berr
#|_ws & nursery block ends

View File

@ -62,6 +62,7 @@ from ._util import (
log,
get_console_log,
)
from .flows import Flume
from .validate import (
FeedInit,
validate_backend,
@ -76,7 +77,6 @@ from ._sampling import (
)
if TYPE_CHECKING:
from .flows import Flume
from tractor._addr import Address
from tractor.msg.types import Aid
@ -362,8 +362,6 @@ async def allocate_persistent_feed(
)
await some_data_ready.wait()
# XXX, avoid cycle; it imports this mod.
from .flows import Flume
flume = Flume(
# TODO: we have to use this for now since currently the
@ -502,6 +500,7 @@ async def open_feed_bus(
sub_registered = trio.Event()
flumes: dict[str, Flume] = {}
for symbol in symbols:
# if no cached feed for this symbol has been created for this
@ -685,7 +684,6 @@ class Feed(Struct):
'''
mods: dict[str, ModuleType] = {}
portals: dict[ModuleType, tractor.Portal] = {}
flumes: dict[
str, # FQME
Flume,
@ -953,8 +951,6 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals)
# XXX, avoid cycle; it imports this mod.
from .flows import Flume
async with (
trionics.gather_contexts(bus_ctxs) as ctxs,
):