Compare commits

..

3 Commits

Author SHA1 Message Date
Tyler Goodlet f2ce4a3469 Add timeouts around some context test bodies
Since with my in-index runtime-port to our native msg-spec it seems
these ones are hanging B(

- `test_one_end_stream_not_opened()`
- `test_maybe_allow_overruns_stream()`

Tossing in some `trio.fail_after()`s seems to at least gnab them as
failures B)
2024-04-02 14:03:32 -04:00
Tyler Goodlet 3aa964315a Get `test_codec_hooks_mod` working with `Msg`s
Though the runtime hasn't been changed over in this patch (it was in the
local index at the time however), the test does now demonstrate that
using a `Started` the correctly typed `.pld` will codec correctly when
passed manually to `MsgCodec.encode/decode()`.

Despite not having the runtime ported to the new shuttle msg set
(meaning the mentioned test will fail without the runtime port patch),
I was able to get this first original test working that limits payload
packets as a `Msg.pld: NamespacePath`this as long as we spec
`enc/dec_hook()`s then the `Msg.pld` will be processed correctly as per:
https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
in both the `Any` and `NamespacePath|None` spec cases.
^- turns out in this case -^ that the codec hooks only get invoked on
the unknown-fields NOT the entire `Struct`-msg.

A further gotcha was merging a `|None` into the `pld_spec` since this
test spawns a subactor and opens a context via `send_back_nsp()` and
that func has no explicit `return` - so of course it delivers
a `Return(pld=None)` which will fail if we only spec `NamespacePath`.
2024-04-02 14:03:32 -04:00
Tyler Goodlet f3ca8608d5 Get msg spec type limiting working with a `RunVar`
Since `contextvars.ContextVar` seems to reset to the default in every
new task, switching to using `trio.lowlevel.RunVar` kinda gets close to
what we'd like where a child scope can override what's in the rent but
ideally without modifying the rent's. I tried `tricycle.TreeVar` as well
but it also seems to reset across (embedded) nurseries in our runtime;
need to try it again bc apparently that's not how it's suppose to work?

NOTE that for now i'm keeping the `.msg.types._ctxvar_MsgCodec` set to
the `msgspec` default (`Any` types) so that the test suite will still
pass until the runtime is ported to the new msg-spec + codec.

Surrounding and in support of all this the `Msg`-set impl deats changed
a bit as well as various stuff in `.msg` sub-mods:

- drop the `.pld` struct types for `Error`, `Start`, `StartAck` since we
  don't really need the `.pld` payload field in those cases since
  they're runtime control msgs for starting RPC tasks and handling
  remote errors; we can just put the fields directly on each msg since
  the user will never want/need to override the `.pld` field type.

- add a couple new runtime msgs and include them in `msg.__spec__`
  and make them NOT inherit from `Msg` since they are runtime-specific
  and thus have no need for `.pld` type constraints:
  - `Aid` the actor-id identity handshake msg.
  - `SpawnSpec`: the spawn data passed from a parent actor down to a
    a child in `Actor._from_parent()` for which we need a shuttle
    protocol msg, so might as well make it a pendatic one ;)

- fix some `Actor.uid` field types that were type-borked on `Error`

- add notes about how we need built-in `debug_mode` msgs in order to
  avoid msg-type errors when using the TTY lock machinery and
  a different `.pld` spec then the default `Any` is in use..
  -> since `devx._debug.lock_tty_for_child()` and it's client side
  `wait_for_parent_stdin_hijack()` use `Context.started('Locked')`
  and `MsgStream.send('pdb_unlock')` string values as their `.pld`
  contents we'd need to either always do a `ipc_pld_spec | str` or
  pre-define some dedicated `Msg` types which get `Union`-ed in
  for this?

- break out `msg.pretty_struct.Struct._sin_props()` into a helper func
  `iter_fields()` since the impl doesn't require a struct instance.

- as mentioned above since `ContextVar` didn't work as anticipated
  I next tried `tricycle.TreeVar` but that too didn't seem to keep
  the `apply_codec()` setting intact across
  `Portal.open_context()`/`Context.open_stream()` (it kept reverting to
  the default `.pld: Any` default setting) so I finalized on
  a trio.lowlevel.RunVar` for now despite it basically being
  a `global`..
  -> will probably come back to test this with `TreeVar` and some hot
  tips i picked up from @mikenerone in the `trio` gitter, which i put in
  comments surrounding proto-code.
2024-04-02 14:02:13 -04:00
2 changed files with 54 additions and 44 deletions

View File

@ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand.
''' '''
from itertools import count from itertools import count
import math
import platform import platform
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
@ -872,7 +873,7 @@ def test_one_end_stream_not_opened(
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(1): with trio.fail_after(0.8):
async with portal.open_context( async with portal.open_context(
entrypoint, entrypoint,
) as (ctx, sent): ) as (ctx, sent):
@ -1059,54 +1060,63 @@ def test_maybe_allow_overruns_stream(
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
) )
seq = list(range(10))
async with portal.open_context(
echo_back_sequence,
seq=seq,
wait_for_cancel=cancel_ctx,
be_slow=(slow_side == 'child'),
allow_overruns_side=allow_overruns_side,
) as (ctx, sent): # stream-sequence batch info with send delay to determine
assert sent is None # approx timeout determining whether test has hung.
total_batches: int = 2
num_items: int = 10
seq = list(range(num_items))
parent_send_delay: float = 0.16
timeout: float = math.ceil(
total_batches * num_items * parent_send_delay
)
with trio.fail_after(timeout):
async with portal.open_context(
echo_back_sequence,
seq=seq,
wait_for_cancel=cancel_ctx,
be_slow=(slow_side == 'child'),
allow_overruns_side=allow_overruns_side,
async with ctx.open_stream( ) as (ctx, sent):
msg_buffer_size=1 if slow_side == 'parent' else None, assert sent is None
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
) as stream:
total_batches: int = 2 async with ctx.open_stream(
for _ in range(total_batches): msg_buffer_size=1 if slow_side == 'parent' else None,
for msg in seq: allow_overruns=(allow_overruns_side in {'parent', 'both'}),
# print(f'root tx {msg}') ) as stream:
await stream.send(msg)
if slow_side == 'parent':
# NOTE: we make the parent slightly
# slower, when it is slow, to make sure
# that in the overruns everywhere case
await trio.sleep(0.16)
batch = [] for _ in range(total_batches):
async for msg in stream: for msg in seq:
print(f'root rx {msg}') # print(f'root tx {msg}')
batch.append(msg) await stream.send(msg)
if batch == seq: if slow_side == 'parent':
break # NOTE: we make the parent slightly
# slower, when it is slow, to make sure
# that in the overruns everywhere case
await trio.sleep(parent_send_delay)
batch = []
async for msg in stream:
print(f'root rx {msg}')
batch.append(msg)
if batch == seq:
break
if cancel_ctx:
# cancel the remote task
print('Requesting `ctx.cancel()` in parent!')
await ctx.cancel()
res: str|ContextCancelled = await ctx.result()
if cancel_ctx: if cancel_ctx:
# cancel the remote task assert isinstance(res, ContextCancelled)
print('Requesting `ctx.cancel()` in parent!') assert tuple(res.canceller) == current_actor().uid
await ctx.cancel()
res: str|ContextCancelled = await ctx.result() else:
print(f'RX ROOT SIDE RESULT {res}')
if cancel_ctx: assert res == 'yo'
assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == current_actor().uid
else:
print(f'RX ROOT SIDE RESULT {res}')
assert res == 'yo'
# cancel the daemon # cancel the daemon
await portal.cancel_actor() await portal.cancel_actor()

View File

@ -438,8 +438,8 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
'msgspec_codec', 'msgspec_codec',
# TODO: move this to our new `Msg`-spec! # TODO: move this to our new `Msg`-spec!
# default=_def_msgspec_codec, default=_def_msgspec_codec,
default=_def_tractor_codec, # default=_def_tractor_codec,
) )