piker/piker/data/_web_bs.py

503 lines
14 KiB
Python
Raw Normal View History

# piker: trading gear for hackers
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
ToOlS fOr CoPInG wITh "tHE wEB" protocols.
"""
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
from __future__ import annotations
2023-01-05 22:28:10 +00:00
from contextlib import (
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
asynccontextmanager as acm,
2023-01-05 22:28:10 +00:00
)
from itertools import count
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
from functools import partial
from types import ModuleType
2023-01-05 22:28:10 +00:00
from typing import (
Any,
Optional,
Callable,
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
AsyncContextManager,
2023-01-05 22:28:10 +00:00
AsyncGenerator,
Iterable,
)
import json
import trio
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
from trio_typing import TaskStatus
from trio_websocket import (
WebSocketConnection,
open_websocket_url,
)
2023-01-05 22:28:10 +00:00
from wsproto.utilities import LocalProtocolError
from trio_websocket._impl import (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
from piker.types import Struct
2023-04-18 22:17:45 +00:00
from ._util import log
class NoBsWs:
2023-01-05 22:28:10 +00:00
'''
Make ``trio_websocket`` sockets stay up no matter the bs.
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
A shim interface that allows client code to stream from some
``WebSocketConnection`` but where any connectivy bs is handled
automatcially and entirely in the background.
NOTE: this type should never be created directly but instead is
provided via the ``open_autorecon_ws()`` factor below.
2023-01-05 22:28:10 +00:00
'''
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
# apparently we can QoS for all sorts of reasons..so catch em.
recon_errors = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
LocalProtocolError,
)
def __init__(
self,
url: str,
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
rxchan: trio.MemoryReceiveChannel,
msg_recv_timeout: float,
2022-08-22 02:01:03 +00:00
serializer: ModuleType = json
):
self.url = url
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
self._rx = rxchan
self._timeout = msg_recv_timeout
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
# signaling between caller and relay task which determines when
# socket is connected (and subscribed).
self._connected: trio.Event = trio.Event()
2022-08-22 02:01:03 +00:00
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
# dynamically reset by the bg relay task
self._ws: WebSocketConnection | None = None
self._cs: trio.CancelScope | None = None
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
# interchange codec methods
# TODO: obviously the method API here may be different
# for another interchange format..
self._dumps: Callable = serializer.dumps
self._loads: Callable = serializer.loads
2023-01-05 22:28:10 +00:00
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
def connected(self) -> bool:
return self._connected.is_set()
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
async def reset(self) -> None:
'''
Reset the underlying ws connection by cancelling
the bg relay task and waiting for it to signal
a new connection.
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
'''
self._connected = trio.Event()
self._cs.cancel()
await self._connected.wait()
2023-01-05 22:28:10 +00:00
async def send_msg(
self,
data: Any,
) -> None:
while True:
try:
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
msg: Any = self._dumps(data)
return await self._ws.send_message(msg)
except self.recon_errors:
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
await self.reset()
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
async def recv_msg(self) -> Any:
msg: Any = await self._rx.receive()
data = self._loads(msg)
return data
def __aiter__(self):
return self
async def __anext__(self):
return await self.recv_msg()
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
def set_recv_timeout(
self,
timeout: float,
) -> None:
self._timeout = timeout
async def _reconnect_forever(
url: str,
snd: trio.MemorySendChannel,
nobsws: NoBsWs,
reset_after: int, # msg recv timeout before reset attempt
fixture: AsyncContextManager | None = None,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
) -> None:
# TODO: can we just report "where" in the call stack
# the client code is using the ws stream?
# Maybe we can just drop this since it's already in the log msg
# orefix?
2023-05-09 18:36:36 +00:00
if fixture is not None:
src_mod: str = fixture.__module__
else:
src_mod: str = 'unknown'
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
async def proxy_msgs(
ws: WebSocketConnection,
pcs: trio.CancelScope, # parent cancel scope
):
'''
Receive (under `timeout` deadline) all msgs from from underlying
websocket and relay them to (calling) parent task via ``trio``
mem chan.
'''
# after so many msg recv timeouts, reset the connection
timeouts: int = 0
while True:
with trio.move_on_after(
# can be dynamically changed by user code
nobsws._timeout,
) as cs:
try:
msg: Any = await ws.get_message()
await snd.send(msg)
except nobsws.recon_errors:
log.exception(
f'{src_mod}\n'
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
f'{url} connection bail with:'
)
await trio.sleep(0.5)
pcs.cancel()
# go back to reonnect loop in parent task
return
if cs.cancelled_caught:
timeouts += 1
if timeouts > reset_after:
log.error(
f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n'
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
)
pcs.cancel()
# go back to reonnect loop in parent task
return
async def open_fixture(
fixture: AsyncContextManager,
nobsws: NoBsWs,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
):
'''
Open user provided `@acm` and sleep until any connection
reset occurs.
'''
async with fixture(nobsws) as ret:
assert ret is None
task_status.started()
await trio.sleep_forever()
# last_err = None
nobsws._connected = trio.Event()
task_status.started()
while not snd._closed:
log.info(
f'{src_mod}\n'
f'{url} trying (RE)CONNECT'
)
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
ws: WebSocketConnection
try:
async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws,
):
cs = nobsws._cs = n.cancel_scope
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
nobsws._ws = ws
log.info(
f'{src_mod}\n'
f'Connection success: {url}'
)
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
# begin relay loop to forward msgs
n.start_soon(
proxy_msgs,
ws,
cs,
)
if fixture is not None:
log.info(
f'{src_mod}\n'
f'Entering fixture: {fixture}'
)
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
# TODO: should we return an explicit sub-cs
# from this fixture task?
await n.start(
open_fixture,
fixture,
nobsws,
)
# indicate to wrapper / opener that we are up and block
# to let tasks run **inside** the ws open block above.
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception('Retrying connection')
# ws & nursery block ends
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
nobsws._connected = trio.Event()
if cs.cancelled_caught:
log.cancel(
f'{url} connection cancelled!'
)
# if wrapper cancelled us, we expect it to also
# have re-assigned a new event
assert (
nobsws._connected
and not nobsws._connected.is_set()
)
# -> from here, move to next reconnect attempt iteration
# in the while loop above Bp
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
else:
log.exception(
f'{src_mod}\n'
'ws connection closed by client...'
)
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
@acm
async def open_autorecon_ws(
url: str,
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
fixture: AsyncContextManager | None = None,
2022-07-03 21:07:35 +00:00
# time in sec between msgs received before
# we presume connection might need a reset.
msg_recv_timeout: float = 16,
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
# count of the number of above timeouts before connection reset
reset_after: int = 3,
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
) -> AsyncGenerator[tuple[...], NoBsWs]:
'''
An auto-reconnect websocket (wrapper API) around
``trio_websocket.open_websocket_url()`` providing automatic
re-connection on network errors, msg latency and thus roaming.
Here we implement a re-connect websocket interface where a bg
nursery runs ``WebSocketConnection.receive_message()``s in a loop
and restarts the full http(s) handshake on catches of certain
connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for (re)requesting
subscriptions without requiring streaming setup code to rerun.
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
'''
snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
nobsws = NoBsWs(
url,
rcv,
msg_recv_timeout=msg_recv_timeout,
)
await n.start(
partial(
_reconnect_forever,
url,
snd,
nobsws,
fixture=fixture,
reset_after=reset_after,
)
)
await nobsws._connected.wait()
assert nobsws._cs
assert nobsws.connected()
try:
yield nobsws
finally:
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
n.cancel_scope.cancel()
'''
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a NoBsWs.
2023-01-05 22:28:10 +00:00
'''
class JSONRPCResult(Struct):
id: int
jsonrpc: str = '2.0'
result: Optional[dict] = None
error: Optional[dict] = None
2023-01-05 22:28:10 +00:00
Rework `NoBsWs` to avoid agen/`trio` incompatibility `trio`'s internals don't allow for async generator (and thus by consequence dynamic reset of async exit stacks containing `@acm`s) interleaving since doing so corrupts the cancel-scope stack. See details in: - https://github.com/python-trio/trio/issues/638 - https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator - `trio._core._run.MISNESTING_ADVICE` We originally tried to address this using `@trio_util.trio_async_generator` in backend streaming code but for whatever reason stopped working recently (at least for me) and it's more or less implemented the same way as this patch but with more layers and an extra dep. I also don't want us to have to address this problem again if/when that lib isn't able to keep up to date with wtv `trio` is doing.. So instead this is a complete rewrite of the conc design of our auto-reconnect ws API to move all reset logic and msg relay into a bg task which is respawned on reset-requiring events: user spec-ed msg recv latency, network errors, roaming events. Deatz: - drop all usage of `AsyncExitStack` and no longer require client code to (hackily) call `NoBsWs._connect()` on msg latency conditions, intead this is all done behind the scenes and the user can instead pass in a `msg_recv_timeout: float`. - massively simplify impl of `NoBsWs` and move all reset logic into a new `_reconnect_forever()` task. - offer use of `reset_after: int` a count value that determines how many `msg_recv_timeout` events are allowed to occur before reconnecting the entire ws from scratch again.
2023-04-21 17:05:34 +00:00
@acm
async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as tn,
open_autorecon_ws(url) as ws
):
rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': req_id,
'method': method,
'params': params
}
_id = msg['id']
result = rpc_results[_id] = {
'result': None,
'error': None,
'event': trio.Event(), # signal caller resp arrived
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
return ret
async def recv_task():
'''
2023-01-05 22:28:10 +00:00
receives every ws message and stores it in its corresponding
result field, then sets the event to wakeup original sender
tasks. also recieves responses to requests originated from
the server side.
2022-12-19 17:55:06 +00:00
2023-01-05 22:28:10 +00:00
'''
nonlocal req_msgs
2022-12-19 17:55:06 +00:00
async for msg in ws:
match msg:
case {
2023-01-05 22:28:10 +00:00
'result': _,
2022-12-19 17:55:06 +00:00
'id': mid,
} if res_entry := rpc_results.get(mid):
res_entry['result'] = response_type(**msg)
res_entry['event'].set()
case {
'result': _,
'id': mid,
} if not rpc_results.get(mid):
2023-01-05 22:28:10 +00:00
log.warning(
f'Unexpected ws msg: {json.dumps(msg, indent=4)}'
)
2022-12-19 17:55:06 +00:00
case {
'method': _,
'params': _,
}:
log.debug(f'Recieved\n{msg}')
case {
'error': error
}:
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
2022-12-19 17:55:06 +00:00
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
tn.start_soon(recv_task)
yield json_rpc
tn.cancel_scope.cancel()