Compare commits

..

No commits in common. "d17160519e41e1056904ee864ff61171e5ec6ae3" and "d4c10b2b0fa0227b4d9d9ed35d57fb5c08a38a09" have entirely different histories.

3 changed files with 28 additions and 56 deletions

View File

@ -27,6 +27,7 @@ from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any, Any,
Optional,
Callable, Callable,
AsyncContextManager, AsyncContextManager,
AsyncGenerator, AsyncGenerator,
@ -34,7 +35,6 @@ from typing import (
) )
import json import json
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_websocket import ( from trio_websocket import (
@ -167,7 +167,7 @@ async def _reconnect_forever(
async def proxy_msgs( async def proxy_msgs(
ws: WebSocketConnection, ws: WebSocketConnection,
rent_cs: trio.CancelScope, # parent cancel scope pcs: trio.CancelScope, # parent cancel scope
): ):
''' '''
Receive (under `timeout` deadline) all msgs from from underlying Receive (under `timeout` deadline) all msgs from from underlying
@ -192,7 +192,7 @@ async def _reconnect_forever(
f'{url} connection bail with:' f'{url} connection bail with:'
) )
await trio.sleep(0.5) await trio.sleep(0.5)
rent_cs.cancel() pcs.cancel()
# go back to reonnect loop in parent task # go back to reonnect loop in parent task
return return
@ -204,7 +204,7 @@ async def _reconnect_forever(
f'{src_mod}\n' f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n' 'WS feed seems down and slow af.. reconnecting\n'
) )
rent_cs.cancel() pcs.cancel()
# go back to reonnect loop in parent task # go back to reonnect loop in parent task
return return
@ -228,12 +228,7 @@ async def _reconnect_forever(
nobsws._connected = trio.Event() nobsws._connected = trio.Event()
task_status.started() task_status.started()
mc_state: trio._channel.MemoryChannelState = snd._state while not snd._closed:
while (
mc_state.open_receive_channels > 0
and
mc_state.open_send_channels > 0
):
log.info( log.info(
f'{src_mod}\n' f'{src_mod}\n'
f'{url} trying (RE)CONNECT' f'{url} trying (RE)CONNECT'
@ -242,11 +237,10 @@ async def _reconnect_forever(
ws: WebSocketConnection ws: WebSocketConnection
try: try:
async with ( async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws, open_websocket_url(url) as ws,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
): ):
cs = nobsws._cs = tn.cancel_scope cs = nobsws._cs = n.cancel_scope
nobsws._ws = ws nobsws._ws = ws
log.info( log.info(
f'{src_mod}\n' f'{src_mod}\n'
@ -254,7 +248,7 @@ async def _reconnect_forever(
) )
# begin relay loop to forward msgs # begin relay loop to forward msgs
tn.start_soon( n.start_soon(
proxy_msgs, proxy_msgs,
ws, ws,
cs, cs,
@ -268,7 +262,7 @@ async def _reconnect_forever(
# TODO: should we return an explicit sub-cs # TODO: should we return an explicit sub-cs
# from this fixture task? # from this fixture task?
await tn.start( await n.start(
open_fixture, open_fixture,
fixture, fixture,
nobsws, nobsws,
@ -278,23 +272,11 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above. # to let tasks run **inside** the ws open block above.
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError:
except (
HandshakeError,
ConnectionRejected,
):
log.exception('Retrying connection') log.exception('Retrying connection')
await trio.sleep(0.5) # throttle
except BaseException as _berr: # ws & nursery block ends
berr = _berr
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
raise berr
#|_ws & nursery block ends
nobsws._connected = trio.Event() nobsws._connected = trio.Event()
if cs.cancelled_caught: if cs.cancelled_caught:
log.cancel( log.cancel(
@ -342,25 +324,21 @@ async def open_autorecon_ws(
connetivity errors, or some user defined recv timeout. connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for entered/exitted around each connection reset; eg. for (re)requesting
(re)requesting subscriptions without requiring streaming setup subscriptions without requiring streaming setup code to rerun.
code to rerun.
''' '''
snd: trio.MemorySendChannel snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616) snd, rcv = trio.open_memory_channel(616)
async with ( async with trio.open_nursery() as n:
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
nobsws = NoBsWs( nobsws = NoBsWs(
url, url,
rcv, rcv,
msg_recv_timeout=msg_recv_timeout, msg_recv_timeout=msg_recv_timeout,
) )
await tn.start( await n.start(
partial( partial(
_reconnect_forever, _reconnect_forever,
url, url,
@ -373,10 +351,11 @@ async def open_autorecon_ws(
await nobsws._connected.wait() await nobsws._connected.wait()
assert nobsws._cs assert nobsws._cs
assert nobsws.connected() assert nobsws.connected()
try: try:
yield nobsws yield nobsws
finally: finally:
tn.cancel_scope.cancel() n.cancel_scope.cancel()
''' '''
@ -389,8 +368,8 @@ of msgs over a `NoBsWs`.
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
id: int id: int
jsonrpc: str = '2.0' jsonrpc: str = '2.0'
result: dict|None = None result: Optional[dict] = None
error: dict|None = None error: Optional[dict] = None
@acm @acm

View File

@ -15,8 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
qompleterz: embeddable search and complete using trio, Qt and qompleterz: embeddable search and complete using trio, Qt and rapidfuzz.
rapidfuzz.
""" """
@ -47,7 +46,6 @@ import time
from pprint import pformat from pprint import pformat
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -55,7 +53,7 @@ from piker.ui.qt import (
size_policy, size_policy,
align_flag, align_flag,
Qt, Qt,
# QtCore, QtCore,
QtWidgets, QtWidgets,
QModelIndex, QModelIndex,
QItemSelectionModel, QItemSelectionModel,
@ -922,10 +920,7 @@ async def fill_results(
# issue multi-provider fan-out search request and place # issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers # "searching.." statuses on outstanding results providers
async with ( async with trio.open_nursery() as n:
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
for provider, (search, pause) in ( for provider, (search, pause) in (
_searcher_cache.copy().items() _searcher_cache.copy().items()
@ -949,7 +944,7 @@ async def fill_results(
status_field='-> searchin..', status_field='-> searchin..',
) )
await tn.start( await n.start(
pack_matches, pack_matches,
view, view,
has_results, has_results,
@ -1009,14 +1004,12 @@ async def handle_keyboard_input(
view.set_font_size(searchbar.dpi_font.px_size) view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616) send, recv = trio.open_memory_channel(616)
async with ( async with trio.open_nursery() as n:
tractor.trionics.collapse_eg(), # needed?
trio.open_nursery() as tn
):
# start a background multi-searcher task which receives # start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and # patterns relayed from this keyboard input handler and
# async updates the completer view's results. # async updates the completer view's results.
tn.start_soon( n.start_soon(
partial( partial(
fill_results, fill_results,
searchw, searchw,

View File

@ -888,7 +888,7 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" }, { name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" }, { name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" }, { name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=piker_pin" }, { name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=final_eg_refinements" },
{ name = "trio", specifier = ">=0.27" }, { name = "trio", specifier = ">=0.27" },
{ name = "trio-typing", specifier = ">=0.10.0" }, { name = "trio-typing", specifier = ">=0.10.0" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" }, { name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
@ -1499,7 +1499,7 @@ source = { git = "https://github.com/pikers/tomlkit.git?branch=piker_pin#8e0239a
[[package]] [[package]]
name = "tractor" name = "tractor"
version = "0.1.0a6.dev0" version = "0.1.0a6.dev0"
source = { git = "https://github.com/goodboy/tractor.git?branch=piker_pin#fee1ee315c15ff0299c31149283db018f18085b5" } source = { git = "https://github.com/goodboy/tractor.git?branch=final_eg_refinements#5fc64107e566a5b59097cb1e9a6b3171f2125106" }
dependencies = [ dependencies = [
{ name = "bidict" }, { name = "bidict" },
{ name = "cffi" }, { name = "cffi" },