From d31c38ef5145957faba3885e41336c20bea10fd4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Jul 2022 16:51:36 -0400 Subject: [PATCH] Mucking with custom `msgspec.Struct` codecs Syncs with https://github.com/goodboy/tractor/pull/311 which is nowhere near ready and this approach didn't end up being as straight forward as hoped. We're going to need a top level `Msg`-boxing type/protocol in `tractor` first... --- piker/_daemon.py | 77 ++++++++++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 836ce60c..31061b37 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -22,13 +22,13 @@ from typing import Optional, Union, Callable, Any from contextlib import asynccontextmanager as acm from collections import defaultdict -from msgspec import Struct import tractor import trio from trio_typing import TaskStatus from .log import get_logger, get_console_log from .brokers import get_brokermod +from .data.types import Struct log = get_logger(__name__) @@ -204,23 +204,26 @@ async def open_piker_runtime( assert _services is None # XXX: this may open a root actor as well - async with ( - tractor.open_root_actor( - - # passed through to ``open_root_actor`` - arbiter_addr=_registry_addr, - name=name, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, - - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules, - ) as _, + with tractor.msg.configure_native_msgs( + [Struct], ): - yield tractor.current_actor() + async with ( + tractor.open_root_actor( + + # passed through to ``open_root_actor`` + arbiter_addr=_registry_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules, + ) as _, + ): + yield tractor.current_actor() @acm @@ -260,27 +263,31 @@ async def maybe_open_pikerd( if loglevel: get_console_log(loglevel) - # subtle, we must have the runtime up here or portal lookup will fail - async with maybe_open_runtime(loglevel, **kwargs): + # XXX: this may open a root actor as well + with tractor.msg.configure_native_msgs( + [Struct], + ): + # subtle, we must have the runtime up here or portal lookup will fail + async with maybe_open_runtime(loglevel, **kwargs): - async with tractor.find_actor(_root_dname) as portal: - # assert portal is not None - if portal is not None: - yield portal - return + async with tractor.find_actor(_root_dname) as portal: + # assert portal is not None + if portal is not None: + yield portal + return - # presume pikerd role since no daemon could be found at - # configured address - async with open_pikerd( + # presume pikerd role since no daemon could be found at + # configured address + async with open_pikerd( - loglevel=loglevel, - debug_mode=kwargs.get('debug_mode', False), + loglevel=loglevel, + debug_mode=kwargs.get('debug_mode', False), - ) as _: - # in the case where we're starting up the - # tractor-piker runtime stack in **this** process - # we return no portal to self. - yield None + ) as _: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we return no portal to self. + yield None # brokerd enabled modules @@ -442,7 +449,7 @@ async def spawn_brokerd( ) # non-blocking setup of brokerd service nursery - from .data import _setup_persistent_brokerd + from .data.feed import _setup_persistent_brokerd await _services.start_service_task( dname,