Compare commits
3 Commits
9d01b5367b
...
ef3309adf9
| Author | SHA1 | Date |
|---|---|---|
|
|
ef3309adf9 | |
|
|
ac6ab3791e | |
|
|
d0eb6b479d |
|
|
@ -729,6 +729,7 @@ class Router(Struct):
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
tractor.TransportClosed,
|
||||||
):
|
):
|
||||||
to_remove.add(client_stream)
|
to_remove.add(client_stream)
|
||||||
log.warning(
|
log.warning(
|
||||||
|
|
@ -1699,5 +1700,5 @@ async def _emsd_main(
|
||||||
if not client_streams:
|
if not client_streams:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Order dialog is not being monitored:\n'
|
f'Order dialog is not being monitored:\n'
|
||||||
f'{oid} ->\n{client_stream._ctx.chan.uid}'
|
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,7 @@ class Sampler:
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.EndOfChannel,
|
trio.EndOfChannel,
|
||||||
|
tractor.TransportClosed,
|
||||||
)
|
)
|
||||||
|
|
||||||
# holds all the ``tractor.Context`` remote subscriptions for
|
# holds all the ``tractor.Context`` remote subscriptions for
|
||||||
|
|
@ -753,7 +754,7 @@ async def sample_and_broadcast(
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Feed OVERRUN {sub_key}'
|
f'Feed OVERRUN {sub_key}'
|
||||||
f'@{bus.brokername} -> \n'
|
f'@{bus.brokername} -> \n'
|
||||||
f'feed @ {chan.uid}\n'
|
f'feed @ {chan.aid.reprol()}\n'
|
||||||
f'throttle = {throttle} Hz'
|
f'throttle = {throttle} Hz'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,10 @@ class NoBsWs:
|
||||||
def connected(self) -> bool:
|
def connected(self) -> bool:
|
||||||
return self._connected.is_set()
|
return self._connected.is_set()
|
||||||
|
|
||||||
async def reset(self) -> None:
|
async def reset(
|
||||||
|
self,
|
||||||
|
timeout: float,
|
||||||
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Reset the underlying ws connection by cancelling
|
Reset the underlying ws connection by cancelling
|
||||||
the bg relay task and waiting for it to signal
|
the bg relay task and waiting for it to signal
|
||||||
|
|
@ -114,18 +117,31 @@ class NoBsWs:
|
||||||
'''
|
'''
|
||||||
self._connected = trio.Event()
|
self._connected = trio.Event()
|
||||||
self._cs.cancel()
|
self._cs.cancel()
|
||||||
|
with trio.move_on_after(timeout) as cs:
|
||||||
await self._connected.wait()
|
await self._connected.wait()
|
||||||
|
return True
|
||||||
|
|
||||||
|
assert cs.cancelled_caught
|
||||||
|
return False
|
||||||
|
|
||||||
async def send_msg(
|
async def send_msg(
|
||||||
self,
|
self,
|
||||||
data: Any,
|
data: Any,
|
||||||
|
timeout: float = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
msg: Any = self._dumps(data)
|
msg: Any = self._dumps(data)
|
||||||
return await self._ws.send_message(msg)
|
return await self._ws.send_message(msg)
|
||||||
except self.recon_errors:
|
except self.recon_errors:
|
||||||
await self.reset()
|
with trio.CancelScope(shield=True):
|
||||||
|
reconnected: bool = await self.reset(
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
if not reconnected:
|
||||||
|
log.warning(
|
||||||
|
'Failed to reconnect after {timeout!r}s ??'
|
||||||
|
)
|
||||||
|
|
||||||
async def recv_msg(self) -> Any:
|
async def recv_msg(self) -> Any:
|
||||||
msg: Any = await self._rx.receive()
|
msg: Any = await self._rx.receive()
|
||||||
|
|
@ -191,7 +207,9 @@ async def _reconnect_forever(
|
||||||
f'{src_mod}\n'
|
f'{src_mod}\n'
|
||||||
f'{url} connection bail with:'
|
f'{url} connection bail with:'
|
||||||
)
|
)
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
await trio.sleep(0.5)
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
rent_cs.cancel()
|
rent_cs.cancel()
|
||||||
|
|
||||||
# go back to reonnect loop in parent task
|
# go back to reonnect loop in parent task
|
||||||
|
|
@ -291,6 +309,7 @@ async def _reconnect_forever(
|
||||||
log.exception(
|
log.exception(
|
||||||
'Reconnect-attempt failed ??\n'
|
'Reconnect-attempt failed ??\n'
|
||||||
)
|
)
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
await trio.sleep(0.2) # throttle
|
await trio.sleep(0.2) # throttle
|
||||||
raise berr
|
raise berr
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,6 @@ from ._util import (
|
||||||
log,
|
log,
|
||||||
get_console_log,
|
get_console_log,
|
||||||
)
|
)
|
||||||
from .flows import Flume
|
|
||||||
from .validate import (
|
from .validate import (
|
||||||
FeedInit,
|
FeedInit,
|
||||||
validate_backend,
|
validate_backend,
|
||||||
|
|
@ -77,6 +76,7 @@ from ._sampling import (
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from .flows import Flume
|
||||||
from tractor._addr import Address
|
from tractor._addr import Address
|
||||||
from tractor.msg.types import Aid
|
from tractor.msg.types import Aid
|
||||||
|
|
||||||
|
|
@ -362,6 +362,8 @@ async def allocate_persistent_feed(
|
||||||
)
|
)
|
||||||
await some_data_ready.wait()
|
await some_data_ready.wait()
|
||||||
|
|
||||||
|
# XXX, avoid cycle; it imports this mod.
|
||||||
|
from .flows import Flume
|
||||||
flume = Flume(
|
flume = Flume(
|
||||||
|
|
||||||
# TODO: we have to use this for now since currently the
|
# TODO: we have to use this for now since currently the
|
||||||
|
|
@ -500,7 +502,6 @@ async def open_feed_bus(
|
||||||
sub_registered = trio.Event()
|
sub_registered = trio.Event()
|
||||||
|
|
||||||
flumes: dict[str, Flume] = {}
|
flumes: dict[str, Flume] = {}
|
||||||
|
|
||||||
for symbol in symbols:
|
for symbol in symbols:
|
||||||
|
|
||||||
# if no cached feed for this symbol has been created for this
|
# if no cached feed for this symbol has been created for this
|
||||||
|
|
@ -684,6 +685,7 @@ class Feed(Struct):
|
||||||
'''
|
'''
|
||||||
mods: dict[str, ModuleType] = {}
|
mods: dict[str, ModuleType] = {}
|
||||||
portals: dict[ModuleType, tractor.Portal] = {}
|
portals: dict[ModuleType, tractor.Portal] = {}
|
||||||
|
|
||||||
flumes: dict[
|
flumes: dict[
|
||||||
str, # FQME
|
str, # FQME
|
||||||
Flume,
|
Flume,
|
||||||
|
|
@ -951,6 +953,8 @@ async def open_feed(
|
||||||
|
|
||||||
assert len(feed.mods) == len(feed.portals)
|
assert len(feed.mods) == len(feed.portals)
|
||||||
|
|
||||||
|
# XXX, avoid cycle; it imports this mod.
|
||||||
|
from .flows import Flume
|
||||||
async with (
|
async with (
|
||||||
trionics.gather_contexts(bus_ctxs) as ctxs,
|
trionics.gather_contexts(bus_ctxs) as ctxs,
|
||||||
):
|
):
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue