Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet d17160519e `.ui._search`: collapse EGs as needed, use `tn` naming. 2025-09-21 12:02:04 -04:00
Tyler Goodlet 5bc7e4c9b6 Bump lock file with `tractor` piker pinned branch 2025-09-21 11:26:49 -04:00
Tyler Goodlet d35e1e5c67 Port `.data._web_bs` stuff to strict-EGs
Using `tractor.trionics.collapse_eg()` as needed and doing
some renames, in similar style as elsewhere:
- `pcs` -> `rent_cs`,
- `n` -> `tn` for nursery handles,

Also,
- tweak the `._reconnect_forever()` while loop to use the
  (also) `trio`-internal
  `mc_state: trio._channel.MemoryChannelState = snd._state` instead
  of `snd._close` to poll for open send/receive consumer task counts
  since,
    1. it seems more reliable then using the `snd._closed`,
    2. there's no other way to access the info.. afaik?

- handle `ConnectionRejected` explicitly alongside handshake-errs as
  a retry case.
- add a base-exc handler which `.exception()` reports the reconnect
  attempt failure explicitly.
- drop some lingering `Optional` usage.
2025-09-21 11:25:10 -04:00
3 changed files with 56 additions and 28 deletions

View File

@ -27,7 +27,6 @@ from functools import partial
from types import ModuleType
from typing import (
Any,
Optional,
Callable,
AsyncContextManager,
AsyncGenerator,
@ -35,6 +34,7 @@ from typing import (
)
import json
import tractor
import trio
from trio_typing import TaskStatus
from trio_websocket import (
@ -167,7 +167,7 @@ async def _reconnect_forever(
async def proxy_msgs(
ws: WebSocketConnection,
pcs: trio.CancelScope, # parent cancel scope
rent_cs: trio.CancelScope, # parent cancel scope
):
'''
Receive (under `timeout` deadline) all msgs from from underlying
@ -192,7 +192,7 @@ async def _reconnect_forever(
f'{url} connection bail with:'
)
await trio.sleep(0.5)
pcs.cancel()
rent_cs.cancel()
# go back to reonnect loop in parent task
return
@ -204,7 +204,7 @@ async def _reconnect_forever(
f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n'
)
pcs.cancel()
rent_cs.cancel()
# go back to reonnect loop in parent task
return
@ -228,7 +228,12 @@ async def _reconnect_forever(
nobsws._connected = trio.Event()
task_status.started()
while not snd._closed:
mc_state: trio._channel.MemoryChannelState = snd._state
while (
mc_state.open_receive_channels > 0
and
mc_state.open_send_channels > 0
):
log.info(
f'{src_mod}\n'
f'{url} trying (RE)CONNECT'
@ -237,10 +242,11 @@ async def _reconnect_forever(
ws: WebSocketConnection
try:
async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
cs = nobsws._cs = n.cancel_scope
cs = nobsws._cs = tn.cancel_scope
nobsws._ws = ws
log.info(
f'{src_mod}\n'
@ -248,7 +254,7 @@ async def _reconnect_forever(
)
# begin relay loop to forward msgs
n.start_soon(
tn.start_soon(
proxy_msgs,
ws,
cs,
@ -262,7 +268,7 @@ async def _reconnect_forever(
# TODO: should we return an explicit sub-cs
# from this fixture task?
await n.start(
await tn.start(
open_fixture,
fixture,
nobsws,
@ -272,11 +278,23 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above.
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
except (
HandshakeError,
ConnectionRejected,
):
log.exception('Retrying connection')
await trio.sleep(0.5) # throttle
# ws & nursery block ends
except BaseException as _berr:
berr = _berr
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
raise berr
#|_ws & nursery block ends
nobsws._connected = trio.Event()
if cs.cancelled_caught:
log.cancel(
@ -324,21 +342,25 @@ async def open_autorecon_ws(
connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for (re)requesting
subscriptions without requiring streaming setup code to rerun.
entered/exitted around each connection reset; eg. for
(re)requesting subscriptions without requiring streaming setup
code to rerun.
'''
snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
nobsws = NoBsWs(
url,
rcv,
msg_recv_timeout=msg_recv_timeout,
)
await n.start(
await tn.start(
partial(
_reconnect_forever,
url,
@ -351,11 +373,10 @@ async def open_autorecon_ws(
await nobsws._connected.wait()
assert nobsws._cs
assert nobsws.connected()
try:
yield nobsws
finally:
n.cancel_scope.cancel()
tn.cancel_scope.cancel()
'''
@ -368,8 +389,8 @@ of msgs over a `NoBsWs`.
class JSONRPCResult(Struct):
id: int
jsonrpc: str = '2.0'
result: Optional[dict] = None
error: Optional[dict] = None
result: dict|None = None
error: dict|None = None
@acm

View File

@ -15,7 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
qompleterz: embeddable search and complete using trio, Qt and rapidfuzz.
qompleterz: embeddable search and complete using trio, Qt and
rapidfuzz.
"""
@ -46,6 +47,7 @@ import time
from pprint import pformat
from rapidfuzz import process as fuzzy
import tractor
import trio
from trio_typing import TaskStatus
@ -53,7 +55,7 @@ from piker.ui.qt import (
size_policy,
align_flag,
Qt,
QtCore,
# QtCore,
QtWidgets,
QModelIndex,
QItemSelectionModel,
@ -920,7 +922,10 @@ async def fill_results(
# issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
for provider, (search, pause) in (
_searcher_cache.copy().items()
@ -944,7 +949,7 @@ async def fill_results(
status_field='-> searchin..',
)
await n.start(
await tn.start(
pack_matches,
view,
has_results,
@ -1004,12 +1009,14 @@ async def handle_keyboard_input(
view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(), # needed?
trio.open_nursery() as tn
):
# start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and
# async updates the completer view's results.
n.start_soon(
tn.start_soon(
partial(
fill_results,
searchw,

View File

@ -888,7 +888,7 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=final_eg_refinements" },
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=piker_pin" },
{ name = "trio", specifier = ">=0.27" },
{ name = "trio-typing", specifier = ">=0.10.0" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
@ -1499,7 +1499,7 @@ source = { git = "https://github.com/pikers/tomlkit.git?branch=piker_pin#8e0239a
[[package]]
name = "tractor"
version = "0.1.0a6.dev0"
source = { git = "https://github.com/goodboy/tractor.git?branch=final_eg_refinements#5fc64107e566a5b59097cb1e9a6b3171f2125106" }
source = { git = "https://github.com/goodboy/tractor.git?branch=piker_pin#fee1ee315c15ff0299c31149283db018f18085b5" }
dependencies = [
{ name = "bidict" },
{ name = "cffi" },