diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 5d1fd45a..7d47e05f 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -24,6 +24,7 @@ from functools import partial from typing import ( AsyncIterator, Callable, + TYPE_CHECKING, ) import numpy as np @@ -33,12 +34,12 @@ import tractor from tractor.msg import NamespacePath from piker.types import Struct -from ..log import get_logger, get_console_log -from .. import data -from ..data.feed import ( - Flume, - Feed, +from ..log import ( + get_logger, + get_console_log, ) +from .. import data +from ..data.flows import Flume from ..data._sharedmem import ShmArray from ..data._sampling import ( _default_delay_s, @@ -52,6 +53,9 @@ from ._api import ( ) from ..toolz import Profiler +if TYPE_CHECKING: + from ..data.feed import Feed + log = get_logger(__name__) @@ -169,8 +173,10 @@ class Cascade(Struct): if not synced: fsp: Fsp = self.fsp log.warning( - '***DESYNCED FSP***\n' - f'{fsp.ns_path}@{src_shm.token}\n' + f'***DESYNCED fsp***\n' + f'------------------\n' + f'ns-path: {fsp.ns_path!r}\n' + f'shm-token: {src_shm.token}\n' f'step_diff: {step_diff}\n' f'len_diff: {len_diff}\n' ) @@ -398,7 +404,6 @@ async def connect_streams( @tractor.context async def cascade( - ctx: tractor.Context, # data feed key @@ -412,7 +417,7 @@ async def cascade( shm_registry: dict[str, _Token], zero_on_step: bool = False, - loglevel: str | None = None, + loglevel: str|None = None, ) -> None: ''' @@ -426,7 +431,17 @@ async def cascade( ) if loglevel: - get_console_log(loglevel) + log = get_console_log( + loglevel, + name=__name__, + ) + # XXX TODO! + # figure out why this writes a dict to, + # `tractor._state._runtime_vars['_root_mailbox']` + # XD .. wtf + # TODO, solve this as reported in, + # https://www.pikers.dev/pikers/piker/issues/70 + # await tractor.pause() src: Flume = Flume.from_msg(src_flume_addr) dst: Flume = Flume.from_msg(