Make `Context.started()` a type checked IPC send

As detailed in the surrounding notes, it's pretty advantageous to always
have the child context task ensure the first msg it relays back is
msg-type checked against the current spec and thus `MsgCodec`. Implement
the check via a simple codec-roundtrip of the `Started` msg such that
the `.pld` payload is always validated before transit. This ensures the
child will fail early and notify the parent before any streaming takes
place (i.e. the "nasty" dialog protocol phase).

The main motivation here is to avoid inter-actor task syncing bugs that
are hard(er) to recover from and/or such as if an invalid typed msg is
sent to the parent, who then ignores it (depending on config), and then
the child thinks the parent is in some presumed state while the parent
is still thinking a first msg has yet to arrive. Doing the stringent
check on the sender side (i.e. the child is sending the "first"
application msg via `.started()`) avoids/sidesteps dealing with such
syncing/coordinated-state problems by keeping the entire IPC dialog in
a "cheap" or "control" style transaction up until a stream is opened.

Iow, the parent task's `.open_context()` block entry can't occur until
the child side is definitely (as much as is possible with IPC msg type
checking) in a correct state spec wise. During any streaming phase in
the dialog the msg-type-checking is NOT done for performance (the
"nasty" protocol phase) and instead any type errors are relayed back
from the receiving side. I'm still unsure whether to take the same
approach on the `Return` msg, since at that point erroring early doesn't
benefit the parent task if/when a msg-type error occurs? Definitely more
to ponder and tinker out here..

Impl notes:
- a gotcha with the roundtrip-codec-ed msg is that it often won't match
  the input `value` bc in the `msgpack` case many native python
  sequence/collection types will map to a common array type due to the
  surjection that `msgpack`'s type-sys imposes.
  - so we can't assert that `started == rt_started` but it may be useful
    to at least report the diff of the type-reduced payload so that the
    caller can at least be notified how the input `value` might be
    better type-casted prior to call, for ex. pre-casting to `list`s.
- added a `._strict_started: bool` that could provide the stringent
  checking if desired in the future.
- on any validation error raise our `MsgTypeError` from it.
- ALSO change over the lingering `.send_yield()` deprecated meth body
  to use a `Yield()`.
runtime_to_msgspec
Tyler Goodlet 2024-04-05 16:00:32 -04:00
parent 5c1401bf81
commit b1fd8b2ec3
1 changed files with 67 additions and 12 deletions

View File

@ -32,6 +32,7 @@ from dataclasses import (
) )
from functools import partial from functools import partial
import inspect import inspect
import msgspec
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
@ -60,6 +61,9 @@ from .msg import (
Started, Started,
Stop, Stop,
Yield, Yield,
current_codec,
MsgCodec,
pretty_struct,
) )
from ._ipc import Channel from ._ipc import Channel
from ._streaming import MsgStream from ._streaming import MsgStream
@ -505,6 +509,8 @@ class Context:
_in_overrun: bool = False _in_overrun: bool = False
_allow_overruns: bool = False _allow_overruns: bool = False
# TODO: figure out how we can enforce this without losing our minds..
_strict_started: bool = False
def __str__(self) -> str: def __str__(self) -> str:
ds: str = '=' ds: str = '='
@ -727,7 +733,13 @@ class Context:
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
await self.chan.send({'yield': data, 'cid': self.cid}) # await self.chan.send({'yield': data, 'cid': self.cid})
await self.chan.send(
Yield(
cid=self.cid,
pld=data,
)
)
async def send_stop(self) -> None: async def send_stop(self) -> None:
# await pause() # await pause()
@ -1640,18 +1652,61 @@ class Context:
f'called `.started()` twice on context with {self.chan.uid}' f'called `.started()` twice on context with {self.chan.uid}'
) )
# await self.chan.send( started = Started(
# {
# 'started': value,
# 'cid': self.cid,
# }
# )
await self.chan.send(
Started(
cid=self.cid, cid=self.cid,
pld=value, pld=value,
) )
# XXX MEGA NOTE XXX: ONLY on the first msg sent with
# `Context.started()` do we STRINGENTLY roundtrip-check
# the first payload such that the child side can't send an
# incorrect value according to the currently applied
# msg-spec!
#
# HOWEVER, once a stream is opened via
# `Context.open_stream()` then this check is NEVER done on
# `MsgStream.send()` and instead both the parent and child
# sides are expected to relay back msg-type errors when
# decode failures exhibit on `MsgStream.receive()` calls thus
# enabling a so-called (by the holy 0mq lords)
# "cheap-or-nasty pattern" un-protocol design Bo
#
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
#
codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(started)
try:
# be a "cheap" dialog (see above!)
rt_started = codec.decode(msg_bytes)
if rt_started != started:
# TODO: break these methods out from the struct subtype?
diff = pretty_struct.Struct.__sub__(rt_started, started)
complaint: str = (
'Started value does not match after codec rountrip?\n\n'
f'{diff}'
) )
# TODO: rn this will pretty much always fail with
# any other sequence type embeded in the
# payload...
if self._strict_started:
raise ValueError(complaint)
else:
log.warning(complaint)
await self.chan.send(rt_started)
# raise any msg type error NO MATTER WHAT!
except msgspec.ValidationError as verr:
from tractor._ipc import _raise_msg_type_err
_raise_msg_type_err(
msg=msg_bytes,
codec=codec,
validation_err=verr,
verb_header='Trying to send payload'
# > 'invalid `Started IPC msgs\n'
)
self._started_called = True self._started_called = True
async def _drain_overflows( async def _drain_overflows(