From 9a2b43495dc9c251c6467612935082ac074afcb8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Apr 2025 10:37:33 -0400 Subject: [PATCH 01/13] Update legacy type to `tractor.MsgStream` --- piker/clearing/_ems.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 6547fb1e..ce22d6ba 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -161,7 +161,7 @@ async def clear_dark_triggers( router: Router, brokerd_orders_stream: tractor.MsgStream, - quote_stream: tractor.ReceiveMsgStream, # noqa + quote_stream: tractor.MsgStream, broker: str, fqme: str, @@ -177,6 +177,7 @@ async def clear_dark_triggers( ''' # XXX: optimize this for speed! # TODO: + # - port to the new ringbuf stuff in `tractor.ipc`! # - numba all this! # - this stream may eventually contain multiple symbols quote_stream._raise_on_lag = False From d36575cd0d9edf6d2f9991f0074ad2f778a167ef Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Jun 2025 10:18:08 -0400 Subject: [PATCH 02/13] Port to newer `tractor.get_registry()` --- piker/cli/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index bf26d5ab..833834db 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -316,7 +316,7 @@ def services(config, tl, ports): name='service_query', loglevel=config['loglevel'] if tl else None, ), - tractor.get_arbiter( + tractor.get_registry( host=host, port=ports[0] ) as portal From 28db478da1b0d4f2926ddd611d7858d00f527da4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 26 Jun 2025 11:00:20 -0400 Subject: [PATCH 03/13] Adjust to `trio`'s strict eg nurseries throughout! Using `tractor.trionics.collapse_eg()` as needed to avoid, at the least, crash-worthy (in debug-mode REPL-ing terms) nested cancellation egs that exhibit on SIGINT/ctl-c of each "app" (chart & daemon). Also a bit of renaming of all `trio.Nursery`s to `tn`, the new "task nursery" shorthand-var-name being used in all our other `tractor` related projects. --- piker/brokers/_daemon.py | 5 ++++- piker/clearing/_client.py | 14 ++++++++++---- piker/fsp/_engine.py | 1 + piker/service/_actor_runtime.py | 7 ++++--- piker/ui/_app.py | 2 ++ piker/ui/_chart.py | 1 - piker/ui/_display.py | 5 ++++- piker/ui/_event.py | 11 +++++++++-- piker/ui/_fsp.py | 3 +++ piker/ui/order_mode.py | 1 + 10 files changed, 38 insertions(+), 12 deletions(-) diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 5efb03dd..5414bfb9 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -96,7 +96,10 @@ async def _setup_persistent_brokerd( # - `open_symbol_search()` # NOTE: see ep invocation details inside `.data.feed`. try: - async with trio.open_nursery() as service_nursery: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as service_nursery + ): bus: _FeedsBus = feed.get_feed_bus( brokername, service_nursery, diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 6d8f645e..21edcbb7 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -25,7 +25,10 @@ from typing import TYPE_CHECKING import trio import tractor -from tractor.trionics import broadcast_receiver +from tractor.trionics import ( + broadcast_receiver, + collapse_eg, +) from ._util import ( log, # sub-sys logger @@ -285,8 +288,11 @@ async def open_ems( client._ems_stream = trades_stream # start sync code order msg delivery task - async with trio.open_nursery() as n: - n.start_soon( + async with ( + collapse_eg(), + trio.open_nursery() as tn, + ): + tn.start_soon( relay_orders_from_sync_code, client, fqme, @@ -302,4 +308,4 @@ async def open_ems( ) # stop the sync-msg-relay task on exit. - n.cancel_scope.cancel() + tn.cancel_scope.cancel() diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index acc7309e..5d1fd45a 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -498,6 +498,7 @@ async def cascade( func_name: str = func.__name__ async with ( + tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console trio.open_nursery() as tn, ): # TODO: might be better to just make a "restart" method where diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index a4e3ccf2..91157451 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -200,7 +200,8 @@ async def open_pikerd( reg_addrs, ), tractor.open_nursery() as actor_nursery, - trio.open_nursery() as service_nursery, + tractor.trionics.collapse_eg(), + trio.open_nursery() as service_tn, ): for addr in reg_addrs: if addr not in root_actor.accept_addrs: @@ -211,7 +212,7 @@ async def open_pikerd( # assign globally for future daemon/task creation Services.actor_n = actor_nursery - Services.service_n = service_nursery + Services.service_n = service_tn Services.debug_mode = debug_mode try: @@ -221,7 +222,7 @@ async def open_pikerd( # TODO: is this more clever/efficient? # if 'samplerd' in Services.service_tasks: # await Services.cancel_service('samplerd') - service_nursery.cancel_scope.cancel() + service_tn.cancel_scope.cancel() # TODO: do we even need this? diff --git a/piker/ui/_app.py b/piker/ui/_app.py index 5733e372..68ecb3dd 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -21,6 +21,7 @@ Main app startup and run. from functools import partial from types import ModuleType +import tractor import trio from piker.ui.qt import ( @@ -116,6 +117,7 @@ async def _async_main( needed_brokermods[brokername] = brokers[brokername] async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as root_n, ): # set root nursery and task stack for spawning other charts/feeds diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index afcd7dd0..98b25398 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -33,7 +33,6 @@ import trio from piker.ui.qt import ( QtCore, - QtWidgets, Qt, QLineF, QFrame, diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 46e1b922..690bfb18 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -1445,7 +1445,10 @@ async def display_symbol_data( # for pause/resume on mouse interaction rt_chart.feed = feed - async with trio.open_nursery() as ln: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as ln, + ): # if available load volume related built-in display(s) vlm_charts: dict[ str, diff --git a/piker/ui/_event.py b/piker/ui/_event.py index 44797fa4..28d35de0 100644 --- a/piker/ui/_event.py +++ b/piker/ui/_event.py @@ -22,7 +22,10 @@ from contextlib import asynccontextmanager as acm from typing import Callable import trio -from tractor.trionics import gather_contexts +from tractor.trionics import ( + gather_contexts, + collapse_eg, +) from piker.ui.qt import ( QtCore, @@ -207,7 +210,10 @@ async def open_signal_handler( async for args in recv: await async_handler(*args) - async with trio.open_nursery() as tn: + async with ( + collapse_eg(), + trio.open_nursery() as tn + ): tn.start_soon(proxy_to_handler) async with send: yield @@ -242,6 +248,7 @@ async def open_handlers( widget: QWidget streams: list[trio.abc.ReceiveChannel] async with ( + collapse_eg(), trio.open_nursery() as tn, gather_contexts([ open_event_stream( diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 2e3e392e..ca43ed77 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -600,6 +600,7 @@ async def open_fsp_admin( kwargs=kwargs, ) as (cache_hit, cluster_map), + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): if cache_hit: @@ -613,6 +614,8 @@ async def open_fsp_admin( ) try: yield admin + + # ??TODO, does this *need* to be inside a finally? finally: # terminate all tasks via signals for key, entry in admin._registry.items(): diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 47a3bb97..ea6d498a 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -792,6 +792,7 @@ async def open_order_mode( brokerd_accounts, ems_dialog_msgs, ), + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): From 0bd8cd18823080b7ea7dcb55949690855e0704db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 26 Jun 2025 11:13:26 -0400 Subject: [PATCH 04/13] Adjust feed status fields/display-pane to new actor-ID That is to use the new `tractor.msg.types.Aid` struct to pull the `brokerd` info from the `tractor.Channel.aid: Aid` attr as well as more generally handling the new `Channel.raddr.proto_key: str` and no longer assuming a TCP IPC transport; this per the recent `tractor.ipc` subsys which adds multi-IPC-transports! Downstream tweaks to match, - use an "opt-in" field set to display in the `brokerd` info pane in `.ui._feedstatus.mk_feed_label()`. |_ also add some todos and drop some seemingly unneeded form sizing calcs? - tweak `.ui._label` to allow not using markdown, though ended up not doing that since it looked too plain.. --- piker/data/feed.py | 23 ++++++++++------ piker/ui/_feedstatus.py | 61 +++++++++++++++++++++++++++-------------- piker/ui/_label.py | 20 +++++++++----- 3 files changed, 68 insertions(+), 36 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index df4106e0..9f2e5f41 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -39,6 +39,7 @@ from typing import ( AsyncContextManager, Awaitable, Sequence, + TYPE_CHECKING, ) import trio @@ -75,6 +76,10 @@ from ._sampling import ( uniform_rate_send, ) +if TYPE_CHECKING: + from tractor._addr import Address + from tractor.msg.types import Aid + class Sub(Struct, frozen=True): ''' @@ -899,19 +904,19 @@ async def open_feed( feed.portals[brokermod] = portal # fill out "status info" that the UI can show - host, port = portal.channel.raddr - if host == '127.0.0.1': - host = 'localhost' - + chan: tractor.Channel = portal.chan + raddr: Address = chan.raddr + aid: Aid = chan.aid + # TAG_feed_status_update feed.status.update({ - 'actor_name': portal.channel.uid[0], - 'host': host, - 'port': port, + 'actor_id': aid, + 'actor_short_id': f'{aid.name}@{aid.pid}', + 'ipc': chan.raddr.proto_key, + 'ipc_addr': raddr, 'hist_shm': 'NA', 'rt_shm': 'NA', - 'throttle_rate': tick_throttle, + 'throttle_hz': tick_throttle, }) - # feed.status.update(init_msg.pop('status', {})) # (allocate and) connect to any feed bus for this broker bus_ctxs.append( diff --git a/piker/ui/_feedstatus.py b/piker/ui/_feedstatus.py index 1c9eb772..ea262876 100644 --- a/piker/ui/_feedstatus.py +++ b/piker/ui/_feedstatus.py @@ -18,10 +18,11 @@ Feed status and controls widget(s) for embedding in a UI-pane. """ - from __future__ import annotations -from textwrap import dedent -from typing import TYPE_CHECKING +from typing import ( + Any, + TYPE_CHECKING, +) # from PyQt5.QtCore import Qt @@ -49,35 +50,55 @@ def mk_feed_label( a feed control protocol. ''' - status = feed.status + status: dict[str, Any] = feed.status assert status - msg = dedent(""" - actor: **{actor_name}**\n - |_ @**{host}:{port}**\n - """) + # SO tips on ws/nls, + # https://stackoverflow.com/a/15721400 + ws: str = ' ' + # nl: str = '
' # dun work? + actor_info_repr: str = ( + f')> **{status["actor_short_id"]}**\n' + '\n' # bc md? + ) - for key, val in status.items(): - if key in ('host', 'port', 'actor_name'): - continue - msg += f'\n|_ {key}: **{{{key}}}**\n' + # fields to select *IN* for display + # (see `.data.feed.open_feed()` status + # update -> TAG_feed_status_update) + for key in [ + 'ipc', + 'hist_shm', + 'rt_shm', + 'throttle_hz', + ]: + # NOTE, the 2nd key is filled via `.format()` updates. + actor_info_repr += ( + f'\n' # bc md? + f'{ws}|_{key}: **{{{key}}}**\n' + ) + # ^TODO? formatting and content.. + # -[ ] showing which fqme is "forward" on the + # chart/fsp/order-mode? + # '|_ flows: **{symbols}**\n' + # + # -[x] why isn't the indent working? + # => markdown, now solved.. feed_label = FormatLabel( - fmt_str=msg, - # |_ streams: **{symbols}**\n + fmt_str=actor_info_repr, font=_font.font, font_size=_font_small.px_size, font_color='default_lightest', ) + # ?TODO, remove this? # form.vbox.setAlignment(feed_label, Qt.AlignBottom) # form.vbox.setAlignment(Qt.AlignBottom) - _ = chart.height() - ( - form.height() + - form.fill_bar.height() - # feed_label.height() - ) + # _ = chart.height() - ( + # form.height() + + # form.fill_bar.height() + # # feed_label.height() + # ) feed_label.format(**feed.status) - return feed_label diff --git a/piker/ui/_label.py b/piker/ui/_label.py index 0e90b7fe..07956e4a 100644 --- a/piker/ui/_label.py +++ b/piker/ui/_label.py @@ -285,18 +285,20 @@ class FormatLabel(QLabel): font_size: int, font_color: str, + use_md: bool = True, + parent=None, ) -> None: super().__init__(parent) - # by default set the format string verbatim and expect user to - # call ``.format()`` later (presumably they'll notice the + # by default set the format string verbatim and expect user + # to call ``.format()`` later (presumably they'll notice the # unformatted content if ``fmt_str`` isn't meant to be # unformatted). self.fmt_str = fmt_str - self.setText(fmt_str) + # self.setText(fmt_str) # ?TODO, why here? self.setStyleSheet( f"""QLabel {{ @@ -306,9 +308,10 @@ class FormatLabel(QLabel): """ ) self.setFont(_font.font) - self.setTextFormat( - Qt.TextFormat.MarkdownText - ) + if use_md: + self.setTextFormat( + Qt.TextFormat.MarkdownText + ) self.setMargin(0) self.setSizePolicy( @@ -316,7 +319,10 @@ class FormatLabel(QLabel): size_policy.Expanding, ) self.setAlignment( - Qt.AlignVCenter | Qt.AlignLeft + Qt.AlignLeft + | + Qt.AlignBottom + # Qt.AlignVCenter ) self.setText(self.fmt_str) From 3f6853a437307383e10b72eb3cac56bf231a6885 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 26 Jun 2025 11:38:04 -0400 Subject: [PATCH 05/13] Try running daemons on UDS tpt The root daemon, pikerd, needs to be adjusted to use diff default registry addrs to also utilize non-TCP, but for now this gets us started testing; so far so good B) --- piker/cli/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 833834db..4efe7d6a 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -184,6 +184,7 @@ def pikerd( registry_addrs=regaddrs, loglevel=loglevel, debug_mode=pdb, + enable_transports=['uds'], ) as service_mngr, # normally delivers a ``Services`` handle From ef748c75994b355d0d9803e70814866cef63615b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Jan 2026 17:06:51 -0500 Subject: [PATCH 06/13] Use `.trionics.collapse_eg()` in `.deribit.api` Commit this change separate from the (original) broader set applied to the entire code base since the `.deribit.api` mod contained changes from upstream max-pain work (from our very own @nt) which caused a noticeable conflict and intros un-required changes from his work to re-enable `deribit` support. Note the original commit, "69eac7bb Spurious first-draft of EG collapsing", applied similar changes through the rest of the code base. AGAIN, this mod's change is only being broken out to minimize upstream change conflicts due to updates to the `deribit` backend done earlier in time-history. --- piker/brokers/deribit/api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 03cc301e..c0ab1ed4 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -31,7 +31,7 @@ from typing import ( Callable, ) -import pendulum +from pendulum import now import trio from trio_typing import TaskStatus from rapidfuzz import process as fuzzy @@ -39,6 +39,7 @@ import numpy as np from tractor.trionics import ( broadcast_receiver, maybe_open_context + collapse_eg, ) from tractor import to_asyncio # XXX WOOPS XD @@ -432,6 +433,7 @@ async def get_client( ) -> Client: async with ( + collapse_eg(), trio.open_nursery() as n, open_jsonrpc_session( _testnet_ws_url, dtype=JSONRPCResult) as json_rpc From ff81e57e73378d39f646fa4d4f4a9d638630dba0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 15 Sep 2025 19:10:20 -0400 Subject: [PATCH 07/13] Spurious first-draft of EG collapsing Topically, throughout various (seemingly) console-UX-affecting or benign spots in the code base; nothing that required more intervention beyond things superficial. A few spots also include `trio.Nursery` ref renames (always to something with a `tn` in it) and log-level reductions to quiet (benign) console noise oriented around issues meant to be solved long.. Note there's still a couple spots i left with the loose-ify flag because i haven't fully tested them without using the latest version of `tractor.trionics.collapse_eg()`, but more then likely they should flip over fine. --- examples/basic_order_bot.py | 1 + piker/accounting/_pos.py | 8 ++++++-- piker/brokers/binance/broker.py | 1 + piker/brokers/ib/api.py | 15 ++++++++++----- piker/brokers/ib/broker.py | 14 ++++++++------ piker/clearing/_ems.py | 19 +++++++++++-------- piker/data/feed.py | 5 ++++- piker/tsp/__init__.py | 19 ++++++++++++++++--- tests/test_questrade.py | 11 +++++++++-- 9 files changed, 66 insertions(+), 27 deletions(-) diff --git a/examples/basic_order_bot.py b/examples/basic_order_bot.py index 51575cc3..7aff6966 100644 --- a/examples/basic_order_bot.py +++ b/examples/basic_order_bot.py @@ -121,6 +121,7 @@ async def bot_main(): # tick_throttle=10, ) as feed, + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): assert accounts diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 5952418f..a172f74c 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -365,7 +365,11 @@ class Position(Struct): # added: bool = False tid: str = t.tid if tid in self._events: - log.warning(f'{t} is already added?!') + log.debug( + f'Txn is already added?\n' + f'\n' + f'{t}\n' + ) # return added # TODO: apparently this IS possible with a dict but not @@ -731,7 +735,7 @@ class Account(Struct): else: # TODO: we reallly need a diff set of # loglevels/colors per subsys. - log.warning( + log.debug( f'Recent position for {fqme} was closed!' ) diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index a13ce38f..919e8152 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -440,6 +440,7 @@ async def open_trade_dialog( # - ledger: TransactionLedger async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ctx.open_stream() as ems_stream, ): diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 23222512..74d03075 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -48,6 +48,7 @@ from bidict import bidict import trio import tractor from tractor import to_asyncio +from tractor import trionics from pendulum import ( from_timestamp, DateTime, @@ -1369,8 +1370,8 @@ async def load_clients_for_trio( ''' Pure async mngr proxy to ``load_aio_clients()``. - This is a bootstrap entrypoing to call from - a ``tractor.to_asyncio.open_channel_from()``. + This is a bootstrap entrypoint to call from + a `tractor.to_asyncio.open_channel_from()`. ''' async with load_aio_clients( @@ -1391,7 +1392,10 @@ async def open_client_proxies() -> tuple[ async with ( tractor.trionics.maybe_open_context( acm_func=tractor.to_asyncio.open_channel_from, - kwargs={'target': load_clients_for_trio}, + kwargs={ + 'target': load_clients_for_trio, + # ^XXX, kwarg to `open_channel_from()` + }, # lock around current actor task access # TODO: maybe this should be the default in tractor? @@ -1584,7 +1588,8 @@ async def open_client_proxy( event_consumers=event_table, ) as (first, chan), - trio.open_nursery() as relay_n, + trionics.collapse_eg(), # loose-ify + trio.open_nursery() as relay_tn, ): assert isinstance(first, Client) @@ -1624,7 +1629,7 @@ async def open_client_proxy( continue - relay_n.start_soon(relay_events) + relay_tn.start_soon(relay_events) yield proxy diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index ddda9020..b78f2880 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -34,6 +34,7 @@ import trio from trio_typing import TaskStatus import tractor from tractor.to_asyncio import LinkedTaskChannel +from tractor import trionics from ib_insync.contract import ( Contract, ) @@ -407,7 +408,7 @@ async def update_and_audit_pos_msg( # TODO: make this a "propaganda" log level? if ibpos.avgCost != msg.avg_price: - log.warning( + log.debug( f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' f'ib: {ibfmtmsg}\n' '---------------------------\n' @@ -738,7 +739,7 @@ async def open_trade_dialog( f'UNEXPECTED POSITION says IB => {msg.symbol}\n' 'Maybe they LIQUIDATED YOU or your ledger is wrong?\n' ) - log.error(logmsg) + log.debug(logmsg) await ctx.started(( all_positions, @@ -747,21 +748,22 @@ async def open_trade_dialog( async with ( ctx.open_stream() as ems_stream, - trio.open_nursery() as n, + trionics.collapse_eg(), + trio.open_nursery() as tn, ): # relay existing open orders to ems for msg in order_msgs: await ems_stream.send(msg) for client in set(aioclients.values()): - trade_event_stream: LinkedTaskChannel = await n.start( + trade_event_stream: LinkedTaskChannel = await tn.start( open_trade_event_stream, client, ) # start order request handler **before** local trades # event loop - n.start_soon( + tn.start_soon( handle_order_requests, ems_stream, accounts_def, @@ -769,7 +771,7 @@ async def open_trade_dialog( ) # allocate event relay tasks for each client connection - n.start_soon( + tn.start_soon( deliver_trade_events, trade_event_stream, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ce22d6ba..3794313f 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -42,6 +42,7 @@ from bidict import bidict import trio from trio_typing import TaskStatus import tractor +from tractor import trionics from ._util import ( log, # sub-sys logger @@ -500,7 +501,7 @@ class Router(Struct): ''' # setup at actor spawn time - nursery: trio.Nursery + _tn: trio.Nursery # broker to book map books: dict[str, DarkBook] = {} @@ -670,7 +671,7 @@ class Router(Struct): # dark book clearing loop, also lives with parent # daemon to allow dark order clearing while no # client is connected. - self.nursery.start_soon( + self._tn.start_soon( clear_dark_triggers, self, relay.brokerd_stream, @@ -693,7 +694,7 @@ class Router(Struct): # spawn a ``brokerd`` order control dialog stream # that syncs lifetime with the parent `emsd` daemon. - self.nursery.start_soon( + self._tn.start_soon( translate_and_relay_brokerd_events, broker, relay.brokerd_stream, @@ -767,10 +768,12 @@ async def _setup_persistent_emsd( global _router - # open a root "service nursery" for the ``emsd`` actor - async with trio.open_nursery() as service_nursery: - - _router = Router(nursery=service_nursery) + # open a root "service task-nursery" for the `emsd`-actor + async with ( + trionics.collapse_eg(), + trio.open_nursery() as tn + ): + _router = Router(_tn=tn) # TODO: send back the full set of persistent # orders/execs? @@ -1519,7 +1522,7 @@ async def maybe_open_trade_relays( loglevel: str = 'info', ): - fqme, relay, feed, client_ready = await _router.nursery.start( + fqme, relay, feed, client_ready = await _router._tn.start( _router.open_trade_relays, fqme, exec_mode, diff --git a/piker/data/feed.py b/piker/data/feed.py index 9f2e5f41..9cc37cd7 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -730,7 +730,10 @@ class Feed(Struct): async for msg in stream: await tx.send(msg) - async with trio.open_nursery() as nurse: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as nurse + ): # spawn a relay task for each stream so that they all # multiplex to a common channel. for brokername in mods: diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index f10f0f75..121fcbb7 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -963,7 +963,10 @@ async def tsdb_backfill( # concurrently load the provider's most-recent-frame AND any # pre-existing tsdb history already saved in `piker` storage. dt_eps: list[DateTime, DateTime] = [] - async with trio.open_nursery() as tn: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): tn.start_soon( push_latest_frame, dt_eps, @@ -1012,9 +1015,16 @@ async def tsdb_backfill( int, Duration, ]|None = config.get('frame_types', None) + if def_frame_durs: def_frame_size: Duration = def_frame_durs[timeframe] - assert def_frame_size == calced_frame_size + + if def_frame_size != calced_frame_size: + log.warning( + f'Expected frame size {def_frame_size}\n' + f'Rxed frame {calced_frame_size}\n' + ) + # await tractor.pause() else: # use what we calced from first frame above. def_frame_size = calced_frame_size @@ -1043,7 +1053,9 @@ async def tsdb_backfill( # if there is a gap to backfill from the first # history frame until the last datum loaded from the tsdb # continue that now in the background - async with trio.open_nursery() as tn: + async with trio.open_nursery( + strict_exception_groups=False, + ) as tn: bf_done = await tn.start( partial( @@ -1308,6 +1320,7 @@ async def manage_history( # sampling period) data set since normally differently # sampled timeseries can be loaded / process independently # ;) + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): log.info( diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 4614b4f9..79e51ef5 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel): # async with tractor.open_nursery() as n: # await n.run_in_actor('other', intermittently_refresh_tokens) - async with trio.open_nursery() as n: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery( + # strict_exception_groups=False, + ) as n + ): quoter = await qt.stock_quoter(client, us_symbols) @@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what): else: symbols = [tmx_symbols] - async with trio.open_nursery() as n: + async with trio.open_nursery( + strict_exception_groups=False, + ) as n: for syms, func in zip(symbols, stream_what): n.start_soon(func, feed, syms) From 5dc0ecc80237ff14f70c610e5f62ec6b65617e55 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Sep 2025 22:32:05 -0400 Subject: [PATCH 08/13] binance; unmask around send-chan @acm usage --- piker/brokers/binance/feed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index efe2f717..b7d68edb 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -448,7 +448,6 @@ async def subscribe( async def stream_quotes( - send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, @@ -460,6 +459,7 @@ async def stream_quotes( ) -> None: async with ( + tractor.trionics.maybe_raise_from_masking_exc(), send_chan as send_chan, open_cached_client('binance') as client, ): From c065ff6b86a2fe40d07932289ef597ec63b16524 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Sep 2025 22:38:47 -0400 Subject: [PATCH 09/13] Port `.cli` & `.service` to latest `tractor` registry APIs Namely changes for the `registry_addrs: list`, enable_transports: list` and related `tractor._addr` primitive requirements. Other updates include, - passing `maybe_enable_greenback=True`, - additional exc logging around `pikerd` syncing/booting, - changing to newer `Context.wait_for_result()`, - dropping (unnecessary?) `maybe_open_crash_handler()` around `pikerd` ep. --- piker/cli/__init__.py | 125 +++++++++++++++--------------- piker/service/_actor_runtime.py | 19 +++-- piker/service/_daemon.py | 130 ++++++++++++++++++-------------- piker/service/_mngr.py | 2 +- piker/service/_registry.py | 29 +++++-- 5 files changed, 176 insertions(+), 129 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 4efe7d6a..4fefaae6 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -134,68 +134,65 @@ def pikerd( Spawn the piker broker-daemon. ''' - from tractor.devx import maybe_open_crash_handler - with maybe_open_crash_handler(pdb=pdb): - log = get_console_log(loglevel, name='cli') + # from tractor.devx import maybe_open_crash_handler + # with maybe_open_crash_handler(pdb=False): + log = get_console_log(loglevel, name='cli') - if pdb: - log.warning(( - "\n" - "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" - "When a `piker` daemon crashes it will block the " - "task-thread until resumed from console!\n" - "\n" + if pdb: + log.warning(( + "\n" + "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" + "When a `piker` daemon crashes it will block the " + "task-thread until resumed from console!\n" + "\n" + )) + + # service-actor registry endpoint socket-address set + regaddrs: list[tuple[str, int]] = [] + + conf, _ = config.load( + conf_name='conf', + ) + network: dict = conf.get('network') + if ( + network is None + and not maddr + ): + regaddrs = [( + _default_registry_host, + _default_registry_port, + )] + + else: + eps: dict = load_trans_eps( + network, + maddr, + ) + for layers in eps['pikerd']: + regaddrs.append(( + layers['ipv4']['addr'], + layers['tcp']['port'], )) - # service-actor registry endpoint socket-address set - regaddrs: list[tuple[str, int]] = [] + from .. import service - conf, _ = config.load( - conf_name='conf', - ) - network: dict = conf.get('network') - if ( - network is None - and not maddr + async def main(): + service_mngr: service.Services + async with ( + service.open_pikerd( + registry_addrs=regaddrs, + loglevel=loglevel, + debug_mode=pdb, + enable_transports=['uds'], + # enable_transports=['tcp'], + ) as service_mngr, ): - regaddrs = [( - _default_registry_host, - _default_registry_port, - )] + assert service_mngr + # ?TODO? spawn all other sub-actor daemons according to + # multiaddress endpoint spec defined by user config + await trio.sleep_forever() - else: - eps: dict = load_trans_eps( - network, - maddr, - ) - for layers in eps['pikerd']: - regaddrs.append(( - layers['ipv4']['addr'], - layers['tcp']['port'], - )) - - from .. import service - - async def main(): - service_mngr: service.Services - - async with ( - service.open_pikerd( - registry_addrs=regaddrs, - loglevel=loglevel, - debug_mode=pdb, - enable_transports=['uds'], - - ) as service_mngr, # normally delivers a ``Services`` handle - - # AsyncExitStack() as stack, - ): - assert service_mngr - # ?TODO? spawn all other sub-actor daemons according to - # multiaddress endpoint spec defined by user config - await trio.sleep_forever() - - trio.run(main) + trio.run(main) @click.group(context_settings=config._context_defaults) @@ -310,6 +307,10 @@ def services(config, tl, ports): if not ports: ports = [_default_registry_port] + addr = tractor._addr.wrap_address( + addr=(host, ports[0]) + ) + async def list_services(): nonlocal host async with ( @@ -318,15 +319,17 @@ def services(config, tl, ports): loglevel=config['loglevel'] if tl else None, ), tractor.get_registry( - host=host, - port=ports[0] + addr=addr, ) as portal ): - registry = await portal.run_from_ns('self', 'get_registry') + registry = await portal.run_from_ns( + 'self', + 'get_registry', + ) json_d = {} for key, socket in registry.items(): - host, port = socket - json_d[key] = f'{host}:{port}' + json_d[key] = f'{socket}' + click.echo(f"{colorize_json(json_d)}") trio.run(list_services) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 91157451..33f23453 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -107,17 +107,22 @@ async def open_piker_runtime( async with ( tractor.open_root_actor( - # passed through to ``open_root_actor`` + # passed through to `open_root_actor` registry_addrs=registry_addrs, name=name, + start_method=start_method, loglevel=loglevel, debug_mode=debug_mode, - start_method=start_method, + + # XXX NOTE MEMBER DAT der's a perf hit yo!! + # https://greenback.readthedocs.io/en/latest/principle.html#performance + maybe_enable_greenback=True, # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? enable_modules=enable_modules, + hide_tb=False, **tractor_kwargs, ) as actor, @@ -257,7 +262,10 @@ async def maybe_open_pikerd( loglevel: str | None = None, **kwargs, -) -> tractor._portal.Portal | ClassVar[Services]: +) -> ( + tractor._portal.Portal + |ClassVar[Services] +): ''' If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self @@ -282,10 +290,11 @@ async def maybe_open_pikerd( registry_addrs: list[tuple[str, int]] = ( registry_addrs - or [_default_reg_addr] + or + [_default_reg_addr] ) - pikerd_portal: tractor.Portal | None + pikerd_portal: tractor.Portal|None async with ( open_piker_runtime( name=query_name, diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 1e7ff096..89d7f28d 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -28,6 +28,7 @@ from contextlib import ( ) import tractor +from trio.lowlevel import current_task from ._util import ( log, # sub-sys logger @@ -70,69 +71,84 @@ async def maybe_spawn_daemon( lock = Services.locks[service_name] await lock.acquire() - async with find_service( - service_name, - registry_addrs=[('127.0.0.1', 6116)], - ) as portal: - if portal is not None: - lock.release() - yield portal - return + try: + async with find_service( + service_name, + registry_addrs=[('127.0.0.1', 6116)], + ) as portal: + if portal is not None: + lock.release() + yield portal + return - log.warning( - f"Couldn't find any existing {service_name}\n" - 'Attempting to spawn new daemon-service..' - ) + log.warning( + f"Couldn't find any existing {service_name}\n" + 'Attempting to spawn new daemon-service..' + ) - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( - loglevel=loglevel, - **pikerd_kwargs, + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel, + **pikerd_kwargs, - ) as pikerd_portal: + ) as pikerd_portal: - # we are the root and thus are `pikerd` - # so spawn the target service directly by calling - # the provided target routine. - # XXX: this assumes that the target is well formed and will - # do the right things to setup both a sub-actor **and** call - # the ``_Services`` api from above to start the top level - # service task for that actor. - started: bool - if pikerd_portal is None: - started = await service_task_target( - loglevel=loglevel, - **spawn_args, + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + started: bool + if pikerd_portal is None: + started = await service_task_target( + loglevel=loglevel, + **spawn_args, + ) + + else: + # request a remote `pikerd` (service manager) to start the + # target daemon-task, the target can't return + # a non-serializable value since it is expected that service + # starting is non-blocking and the target task will persist + # running "under" or "within" the `pikerd` actor tree after + # the questing client disconnects. in other words this + # spawns a persistent daemon actor that continues to live + # for the lifespan of whatever the service manager inside + # `pikerd` says it should. + started = await pikerd_portal.run( + service_task_target, + loglevel=loglevel, + **spawn_args, + ) + + if started: + log.info(f'Service {service_name} started!') + + # block until we can discover (by IPC connection) to the newly + # spawned daemon-actor and then deliver the portal to the + # caller. + async with tractor.wait_for_actor(service_name) as portal: + lock.release() + yield portal + await portal.cancel_actor() + + except BaseException as _err: + err = _err + if ( + lock.locked() + and + lock.statistics().owner is current_task() + ): + log.exception( + f'Releasing stale lock after crash..?' + f'{err!r}\n' ) - - else: - # request a remote `pikerd` (service manager) to start the - # target daemon-task, the target can't return - # a non-serializable value since it is expected that service - # starting is non-blocking and the target task will persist - # running "under" or "within" the `pikerd` actor tree after - # the questing client disconnects. in other words this - # spawns a persistent daemon actor that continues to live - # for the lifespan of whatever the service manager inside - # `pikerd` says it should. - started = await pikerd_portal.run( - service_task_target, - loglevel=loglevel, - **spawn_args, - ) - - if started: - log.info(f'Service {service_name} started!') - - # block until we can discover (by IPC connection) to the newly - # spawned daemon-actor and then deliver the portal to the - # caller. - async with tractor.wait_for_actor(service_name) as portal: lock.release() - yield portal - await portal.cancel_actor() + raise err async def spawn_emsd( diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 89e98411..726a34c8 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -109,7 +109,7 @@ class Services: # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res: Any = await ctx.result() + ctx_res: Any = await ctx.wait_for_result() # NOTE: blocks indefinitely until cancelled # either by error from the target context diff --git a/piker/service/_registry.py b/piker/service/_registry.py index ed4569f7..94ccbc68 100644 --- a/piker/service/_registry.py +++ b/piker/service/_registry.py @@ -101,13 +101,15 @@ async def open_registry( if ( not tractor.is_root_process() - and not Registry.addrs + and + not Registry.addrs ): Registry.addrs.extend(actor.reg_addrs) if ( ensure_exists - and not Registry.addrs + and + not Registry.addrs ): raise RuntimeError( f"`{uid}` registry should already exist but doesn't?" @@ -146,7 +148,7 @@ async def find_service( | list[Portal] | None ): - + # try: reg_addrs: list[tuple[str, int]] async with open_registry( addrs=( @@ -157,22 +159,39 @@ async def find_service( or Registry.addrs ), ) as reg_addrs: - log.info(f'Scanning for service `{service_name}`') - maybe_portals: list[Portal] | Portal | None + log.info( + f'Scanning for service {service_name!r}' + ) # attach to existing daemon by name if possible + maybe_portals: list[Portal]|Portal|None async with tractor.find_actor( service_name, registry_addrs=reg_addrs, only_first=first_only, # if set only returns single ref ) as maybe_portals: if not maybe_portals: + # log.info( + print( + f'Could NOT find service {service_name!r} -> {maybe_portals!r}' + ) yield None return + # log.info( + print( + f'Found service {service_name!r} -> {maybe_portals}' + ) yield maybe_portals + # except BaseException as _berr: + # berr = _berr + # log.exception( + # 'tractor.find_actor() failed with,\n' + # ) + # raise berr + async def check_for_service( service_name: str, From f3767e42696989a14ce5f3875b330354dacf6bf2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Sep 2025 11:08:14 -0400 Subject: [PATCH 10/13] Port `.data._web_bs` stuff to strict-EGs Using `tractor.trionics.collapse_eg()` as needed and doing some renames, in similar style as elsewhere: - `pcs` -> `rent_cs`, - `n` -> `tn` for nursery handles, Also, - tweak the `._reconnect_forever()` while loop to use the (also) `trio`-internal `mc_state: trio._channel.MemoryChannelState = snd._state` instead of `snd._close` to poll for open send/receive consumer task counts since, 1. it seems more reliable then using the `snd._closed`, 2. there's no other way to access the info.. afaik? - handle `ConnectionRejected` explicitly alongside handshake-errs as a retry case. - add a base-exc handler which `.exception()` reports the reconnect attempt failure explicitly. - drop some lingering `Optional` usage. --- piker/data/_web_bs.py | 59 +++++++++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 4d886fbc..7e5eb810 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -27,7 +27,6 @@ from functools import partial from types import ModuleType from typing import ( Any, - Optional, Callable, AsyncContextManager, AsyncGenerator, @@ -35,6 +34,7 @@ from typing import ( ) import json +import tractor import trio from trio_typing import TaskStatus from trio_websocket import ( @@ -167,7 +167,7 @@ async def _reconnect_forever( async def proxy_msgs( ws: WebSocketConnection, - pcs: trio.CancelScope, # parent cancel scope + rent_cs: trio.CancelScope, # parent cancel scope ): ''' Receive (under `timeout` deadline) all msgs from from underlying @@ -192,7 +192,7 @@ async def _reconnect_forever( f'{url} connection bail with:' ) await trio.sleep(0.5) - pcs.cancel() + rent_cs.cancel() # go back to reonnect loop in parent task return @@ -204,7 +204,7 @@ async def _reconnect_forever( f'{src_mod}\n' 'WS feed seems down and slow af.. reconnecting\n' ) - pcs.cancel() + rent_cs.cancel() # go back to reonnect loop in parent task return @@ -228,7 +228,12 @@ async def _reconnect_forever( nobsws._connected = trio.Event() task_status.started() - while not snd._closed: + mc_state: trio._channel.MemoryChannelState = snd._state + while ( + mc_state.open_receive_channels > 0 + and + mc_state.open_send_channels > 0 + ): log.info( f'{src_mod}\n' f'{url} trying (RE)CONNECT' @@ -237,10 +242,11 @@ async def _reconnect_forever( ws: WebSocketConnection try: async with ( - trio.open_nursery() as n, open_websocket_url(url) as ws, + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn, ): - cs = nobsws._cs = n.cancel_scope + cs = nobsws._cs = tn.cancel_scope nobsws._ws = ws log.info( f'{src_mod}\n' @@ -248,7 +254,7 @@ async def _reconnect_forever( ) # begin relay loop to forward msgs - n.start_soon( + tn.start_soon( proxy_msgs, ws, cs, @@ -262,7 +268,7 @@ async def _reconnect_forever( # TODO: should we return an explicit sub-cs # from this fixture task? - await n.start( + await tn.start( open_fixture, fixture, nobsws, @@ -272,11 +278,23 @@ async def _reconnect_forever( # to let tasks run **inside** the ws open block above. nobsws._connected.set() await trio.sleep_forever() - except HandshakeError: + + except ( + HandshakeError, + ConnectionRejected, + ): log.exception('Retrying connection') + await trio.sleep(0.5) # throttle - # ws & nursery block ends + except BaseException as _berr: + berr = _berr + log.exception( + 'Reconnect-attempt failed ??\n' + ) + await trio.sleep(0.2) # throttle + raise berr + #|_ws & nursery block ends nobsws._connected = trio.Event() if cs.cancelled_caught: log.cancel( @@ -324,21 +342,25 @@ async def open_autorecon_ws( connetivity errors, or some user defined recv timeout. You can provide a ``fixture`` async-context-manager which will be - entered/exitted around each connection reset; eg. for (re)requesting - subscriptions without requiring streaming setup code to rerun. + entered/exitted around each connection reset; eg. for + (re)requesting subscriptions without requiring streaming setup + code to rerun. ''' snd: trio.MemorySendChannel rcv: trio.MemoryReceiveChannel snd, rcv = trio.open_memory_channel(616) - async with trio.open_nursery() as n: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): nobsws = NoBsWs( url, rcv, msg_recv_timeout=msg_recv_timeout, ) - await n.start( + await tn.start( partial( _reconnect_forever, url, @@ -351,11 +373,10 @@ async def open_autorecon_ws( await nobsws._connected.wait() assert nobsws._cs assert nobsws.connected() - try: yield nobsws finally: - n.cancel_scope.cancel() + tn.cancel_scope.cancel() ''' @@ -368,8 +389,8 @@ of msgs over a `NoBsWs`. class JSONRPCResult(Struct): id: int jsonrpc: str = '2.0' - result: Optional[dict] = None - error: Optional[dict] = None + result: dict|None = None + error: dict|None = None @acm From 2b17b99964db93824bba72a3367b75ee30989bd1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Sep 2025 12:02:04 -0400 Subject: [PATCH 11/13] `.ui._search`: collapse EGs as needed, use `tn` naming. --- piker/ui/_search.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/piker/ui/_search.py b/piker/ui/_search.py index 16b25a46..aa6f6623 100644 --- a/piker/ui/_search.py +++ b/piker/ui/_search.py @@ -15,7 +15,8 @@ # along with this program. If not, see . """ -qompleterz: embeddable search and complete using trio, Qt and rapidfuzz. +qompleterz: embeddable search and complete using trio, Qt and +rapidfuzz. """ @@ -46,6 +47,7 @@ import time from pprint import pformat from rapidfuzz import process as fuzzy +import tractor import trio from trio_typing import TaskStatus @@ -53,7 +55,7 @@ from piker.ui.qt import ( size_policy, align_flag, Qt, - QtCore, + # QtCore, QtWidgets, QModelIndex, QItemSelectionModel, @@ -920,7 +922,10 @@ async def fill_results( # issue multi-provider fan-out search request and place # "searching.." statuses on outstanding results providers - async with trio.open_nursery() as n: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): for provider, (search, pause) in ( _searcher_cache.copy().items() @@ -944,7 +949,7 @@ async def fill_results( status_field='-> searchin..', ) - await n.start( + await tn.start( pack_matches, view, has_results, @@ -1004,12 +1009,14 @@ async def handle_keyboard_input( view.set_font_size(searchbar.dpi_font.px_size) send, recv = trio.open_memory_channel(616) - async with trio.open_nursery() as n: - + async with ( + tractor.trionics.collapse_eg(), # needed? + trio.open_nursery() as tn + ): # start a background multi-searcher task which receives # patterns relayed from this keyboard input handler and # async updates the completer view's results. - n.start_soon( + tn.start_soon( partial( fill_results, searchw, From 3adbabcba69e6cad3c9a872ccf1cd4a3705eeb1a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Sep 2025 14:36:55 -0400 Subject: [PATCH 12/13] Use `pytest` plugin now exposed by `tractor` --- tests/conftest.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 366d5d95..22d1af3c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,12 @@ from piker.service import ( from piker.log import get_console_log +# include `tractor`'s built-in fixtures! +pytest_plugins: tuple[str] = ( + "tractor._testing.pytest", +) + + def pytest_addoption(parser): parser.addoption("--ll", action="store", dest='loglevel', default=None, help="logging level to set when testing") From c77aca1f9014c59646026f962557c4cdbf10e484 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 6 Jan 2026 23:34:30 -0500 Subject: [PATCH 13/13] Flip (back) `pikerd` to use TCP by default It'll break all non-linux OS-platforms atm and bc it should only be set to a "non-std transport" through the config anyways. Yeah yeah, we're slowly appealing to the frickin masses.. --- piker/cli/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 4fefaae6..fdecb818 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -183,8 +183,8 @@ def pikerd( registry_addrs=regaddrs, loglevel=loglevel, debug_mode=pdb, - enable_transports=['uds'], - # enable_transports=['tcp'], + # enable_transports=['uds'], + enable_transports=['tcp'], ) as service_mngr, ): assert service_mngr