.fsp._engine: enable console logging in `cascade()`

Add console log setup with module name + multiline style for
desync warning msg.

Also,
- fix import: `Flume` from `.data.flows` vs `.data.feed`
- move `Feed` to `TYPE_CHECKING` block
- add TODO comment about `tractor._state` dict issue

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
order_line_cancel_nowork_debugging
Gud Boi 2026-02-04 19:20:19 -05:00
parent 3db0cf9054
commit 9d01b5367b
1 changed files with 25 additions and 10 deletions

View File

@ -24,6 +24,7 @@ from functools import partial
from typing import ( from typing import (
AsyncIterator, AsyncIterator,
Callable, Callable,
TYPE_CHECKING,
) )
import numpy as np import numpy as np
@ -33,12 +34,12 @@ import tractor
from tractor.msg import NamespacePath from tractor.msg import NamespacePath
from piker.types import Struct from piker.types import Struct
from ..log import get_logger, get_console_log from ..log import (
from .. import data get_logger,
from ..data.feed import ( get_console_log,
Flume,
Feed,
) )
from .. import data
from ..data.flows import Flume
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ..data._sampling import ( from ..data._sampling import (
_default_delay_s, _default_delay_s,
@ -52,6 +53,9 @@ from ._api import (
) )
from ..toolz import Profiler from ..toolz import Profiler
if TYPE_CHECKING:
from ..data.feed import Feed
log = get_logger(__name__) log = get_logger(__name__)
@ -169,8 +173,10 @@ class Cascade(Struct):
if not synced: if not synced:
fsp: Fsp = self.fsp fsp: Fsp = self.fsp
log.warning( log.warning(
'***DESYNCED FSP***\n' f'***DESYNCED fsp***\n'
f'{fsp.ns_path}@{src_shm.token}\n' f'------------------\n'
f'ns-path: {fsp.ns_path!r}\n'
f'shm-token: {src_shm.token}\n'
f'step_diff: {step_diff}\n' f'step_diff: {step_diff}\n'
f'len_diff: {len_diff}\n' f'len_diff: {len_diff}\n'
) )
@ -398,7 +404,6 @@ async def connect_streams(
@tractor.context @tractor.context
async def cascade( async def cascade(
ctx: tractor.Context, ctx: tractor.Context,
# data feed key # data feed key
@ -412,7 +417,7 @@ async def cascade(
shm_registry: dict[str, _Token], shm_registry: dict[str, _Token],
zero_on_step: bool = False, zero_on_step: bool = False,
loglevel: str | None = None, loglevel: str|None = None,
) -> None: ) -> None:
''' '''
@ -426,7 +431,17 @@ async def cascade(
) )
if loglevel: if loglevel:
get_console_log(loglevel) log = get_console_log(
loglevel,
name=__name__,
)
# XXX TODO!
# figure out why this writes a dict to,
# `tractor._state._runtime_vars['_root_mailbox']`
# XD .. wtf
# TODO, solve this as reported in,
# https://www.pikers.dev/pikers/piker/issues/70
# await tractor.pause()
src: Flume = Flume.from_msg(src_flume_addr) src: Flume = Flume.from_msg(src_flume_addr)
dst: Flume = Flume.from_msg( dst: Flume = Flume.from_msg(