Compare commits
No commits in common. "ef3309adf95bc9a9f2ac78e60e8088ea015d45b8" and "9d01b5367b6a3580c64529ae2e3d5cca5e4e236e" have entirely different histories.
ef3309adf9
...
9d01b5367b
|
|
@ -729,7 +729,6 @@ class Router(Struct):
|
|||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
tractor.TransportClosed,
|
||||
):
|
||||
to_remove.add(client_stream)
|
||||
log.warning(
|
||||
|
|
@ -1700,5 +1699,5 @@ async def _emsd_main(
|
|||
if not client_streams:
|
||||
log.warning(
|
||||
f'Order dialog is not being monitored:\n'
|
||||
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
|
||||
f'{oid} ->\n{client_stream._ctx.chan.uid}'
|
||||
)
|
||||
|
|
|
|||
|
|
@ -99,7 +99,6 @@ class Sampler:
|
|||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
tractor.TransportClosed,
|
||||
)
|
||||
|
||||
# holds all the ``tractor.Context`` remote subscriptions for
|
||||
|
|
@ -754,7 +753,7 @@ async def sample_and_broadcast(
|
|||
log.warning(
|
||||
f'Feed OVERRUN {sub_key}'
|
||||
f'@{bus.brokername} -> \n'
|
||||
f'feed @ {chan.aid.reprol()}\n'
|
||||
f'feed @ {chan.uid}\n'
|
||||
f'throttle = {throttle} Hz'
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -105,10 +105,7 @@ class NoBsWs:
|
|||
def connected(self) -> bool:
|
||||
return self._connected.is_set()
|
||||
|
||||
async def reset(
|
||||
self,
|
||||
timeout: float,
|
||||
) -> bool:
|
||||
async def reset(self) -> None:
|
||||
'''
|
||||
Reset the underlying ws connection by cancelling
|
||||
the bg relay task and waiting for it to signal
|
||||
|
|
@ -117,31 +114,18 @@ class NoBsWs:
|
|||
'''
|
||||
self._connected = trio.Event()
|
||||
self._cs.cancel()
|
||||
with trio.move_on_after(timeout) as cs:
|
||||
await self._connected.wait()
|
||||
return True
|
||||
|
||||
assert cs.cancelled_caught
|
||||
return False
|
||||
await self._connected.wait()
|
||||
|
||||
async def send_msg(
|
||||
self,
|
||||
data: Any,
|
||||
timeout: float = 3,
|
||||
) -> None:
|
||||
while True:
|
||||
try:
|
||||
msg: Any = self._dumps(data)
|
||||
return await self._ws.send_message(msg)
|
||||
except self.recon_errors:
|
||||
with trio.CancelScope(shield=True):
|
||||
reconnected: bool = await self.reset(
|
||||
timeout=timeout,
|
||||
)
|
||||
if not reconnected:
|
||||
log.warning(
|
||||
'Failed to reconnect after {timeout!r}s ??'
|
||||
)
|
||||
await self.reset()
|
||||
|
||||
async def recv_msg(self) -> Any:
|
||||
msg: Any = await self._rx.receive()
|
||||
|
|
@ -207,9 +191,7 @@ async def _reconnect_forever(
|
|||
f'{src_mod}\n'
|
||||
f'{url} connection bail with:'
|
||||
)
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(0.5)
|
||||
|
||||
await trio.sleep(0.5)
|
||||
rent_cs.cancel()
|
||||
|
||||
# go back to reonnect loop in parent task
|
||||
|
|
@ -309,8 +291,7 @@ async def _reconnect_forever(
|
|||
log.exception(
|
||||
'Reconnect-attempt failed ??\n'
|
||||
)
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(0.2) # throttle
|
||||
await trio.sleep(0.2) # throttle
|
||||
raise berr
|
||||
|
||||
#|_ws & nursery block ends
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ from ._util import (
|
|||
log,
|
||||
get_console_log,
|
||||
)
|
||||
from .flows import Flume
|
||||
from .validate import (
|
||||
FeedInit,
|
||||
validate_backend,
|
||||
|
|
@ -76,7 +77,6 @@ from ._sampling import (
|
|||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .flows import Flume
|
||||
from tractor._addr import Address
|
||||
from tractor.msg.types import Aid
|
||||
|
||||
|
|
@ -362,8 +362,6 @@ async def allocate_persistent_feed(
|
|||
)
|
||||
await some_data_ready.wait()
|
||||
|
||||
# XXX, avoid cycle; it imports this mod.
|
||||
from .flows import Flume
|
||||
flume = Flume(
|
||||
|
||||
# TODO: we have to use this for now since currently the
|
||||
|
|
@ -502,6 +500,7 @@ async def open_feed_bus(
|
|||
sub_registered = trio.Event()
|
||||
|
||||
flumes: dict[str, Flume] = {}
|
||||
|
||||
for symbol in symbols:
|
||||
|
||||
# if no cached feed for this symbol has been created for this
|
||||
|
|
@ -685,7 +684,6 @@ class Feed(Struct):
|
|||
'''
|
||||
mods: dict[str, ModuleType] = {}
|
||||
portals: dict[ModuleType, tractor.Portal] = {}
|
||||
|
||||
flumes: dict[
|
||||
str, # FQME
|
||||
Flume,
|
||||
|
|
@ -953,8 +951,6 @@ async def open_feed(
|
|||
|
||||
assert len(feed.mods) == len(feed.portals)
|
||||
|
||||
# XXX, avoid cycle; it imports this mod.
|
||||
from .flows import Flume
|
||||
async with (
|
||||
trionics.gather_contexts(bus_ctxs) as ctxs,
|
||||
):
|
||||
|
|
|
|||
Loading…
Reference in New Issue