Mucking with custom `msgspec.Struct` codecs

Syncs with https://github.com/goodboy/tractor/pull/311
which is nowhere near ready and this approach didn't end up being
as straight forward as hoped. We're going to need a top level
`Msg`-boxing type/protocol in `tractor` first...
tractor_typed_msg_hackin
Tyler Goodlet 2022-07-07 16:51:36 -04:00
parent de91c2196d
commit d31c38ef51
1 changed files with 42 additions and 35 deletions

View File

@ -22,13 +22,13 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from collections import defaultdict from collections import defaultdict
from msgspec import Struct
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from .log import get_logger, get_console_log from .log import get_logger, get_console_log
from .brokers import get_brokermod from .brokers import get_brokermod
from .data.types import Struct
log = get_logger(__name__) log = get_logger(__name__)
@ -204,23 +204,26 @@ async def open_piker_runtime(
assert _services is None assert _services is None
# XXX: this may open a root actor as well # XXX: this may open a root actor as well
async with ( with tractor.msg.configure_native_msgs(
tractor.open_root_actor( [Struct],
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
): ):
yield tractor.current_actor() async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
):
yield tractor.current_actor()
@acm @acm
@ -260,27 +263,31 @@ async def maybe_open_pikerd(
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail # XXX: this may open a root actor as well
async with maybe_open_runtime(loglevel, **kwargs): with tractor.msg.configure_native_msgs(
[Struct],
):
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal: async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None # assert portal is not None
if portal is not None: if portal is not None:
yield portal yield portal
return return
# presume pikerd role since no daemon could be found at # presume pikerd role since no daemon could be found at
# configured address # configured address
async with open_pikerd( async with open_pikerd(
loglevel=loglevel, loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False), debug_mode=kwargs.get('debug_mode', False),
) as _: ) as _:
# in the case where we're starting up the # in the case where we're starting up the
# tractor-piker runtime stack in **this** process # tractor-piker runtime stack in **this** process
# we return no portal to self. # we return no portal to self.
yield None yield None
# brokerd enabled modules # brokerd enabled modules
@ -442,7 +449,7 @@ async def spawn_brokerd(
) )
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd from .data.feed import _setup_persistent_brokerd
await _services.start_service_task( await _services.start_service_task(
dname, dname,