From 9d01b5367b6a3580c64529ae2e3d5cca5e4e236e Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 4 Feb 2026 19:20:19 -0500 Subject: [PATCH] .fsp._engine: enable console logging in `cascade()` Add console log setup with module name + multiline style for desync warning msg. Also, - fix import: `Flume` from `.data.flows` vs `.data.feed` - move `Feed` to `TYPE_CHECKING` block - add TODO comment about `tractor._state` dict issue (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/fsp/_engine.py | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 5d1fd45a..7d47e05f 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -24,6 +24,7 @@ from functools import partial from typing import ( AsyncIterator, Callable, + TYPE_CHECKING, ) import numpy as np @@ -33,12 +34,12 @@ import tractor from tractor.msg import NamespacePath from piker.types import Struct -from ..log import get_logger, get_console_log -from .. import data -from ..data.feed import ( - Flume, - Feed, +from ..log import ( + get_logger, + get_console_log, ) +from .. import data +from ..data.flows import Flume from ..data._sharedmem import ShmArray from ..data._sampling import ( _default_delay_s, @@ -52,6 +53,9 @@ from ._api import ( ) from ..toolz import Profiler +if TYPE_CHECKING: + from ..data.feed import Feed + log = get_logger(__name__) @@ -169,8 +173,10 @@ class Cascade(Struct): if not synced: fsp: Fsp = self.fsp log.warning( - '***DESYNCED FSP***\n' - f'{fsp.ns_path}@{src_shm.token}\n' + f'***DESYNCED fsp***\n' + f'------------------\n' + f'ns-path: {fsp.ns_path!r}\n' + f'shm-token: {src_shm.token}\n' f'step_diff: {step_diff}\n' f'len_diff: {len_diff}\n' ) @@ -398,7 +404,6 @@ async def connect_streams( @tractor.context async def cascade( - ctx: tractor.Context, # data feed key @@ -412,7 +417,7 @@ async def cascade( shm_registry: dict[str, _Token], zero_on_step: bool = False, - loglevel: str | None = None, + loglevel: str|None = None, ) -> None: ''' @@ -426,7 +431,17 @@ async def cascade( ) if loglevel: - get_console_log(loglevel) + log = get_console_log( + loglevel, + name=__name__, + ) + # XXX TODO! + # figure out why this writes a dict to, + # `tractor._state._runtime_vars['_root_mailbox']` + # XD .. wtf + # TODO, solve this as reported in, + # https://www.pikers.dev/pikers/piker/issues/70 + # await tractor.pause() src: Flume = Flume.from_msg(src_flume_addr) dst: Flume = Flume.from_msg(