diff --git a/tractor/_context.py b/tractor/_context.py index 38b4431..6e55c3c 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -32,6 +32,7 @@ from dataclasses import ( ) from functools import partial import inspect +import msgspec from pprint import pformat from typing import ( Any, @@ -60,6 +61,9 @@ from .msg import ( Started, Stop, Yield, + current_codec, + MsgCodec, + pretty_struct, ) from ._ipc import Channel from ._streaming import MsgStream @@ -505,6 +509,8 @@ class Context: _in_overrun: bool = False _allow_overruns: bool = False + # TODO: figure out how we can enforce this without losing our minds.. + _strict_started: bool = False def __str__(self) -> str: ds: str = '=' @@ -727,7 +733,13 @@ class Context: DeprecationWarning, stacklevel=2, ) - await self.chan.send({'yield': data, 'cid': self.cid}) + # await self.chan.send({'yield': data, 'cid': self.cid}) + await self.chan.send( + Yield( + cid=self.cid, + pld=data, + ) + ) async def send_stop(self) -> None: # await pause() @@ -1640,18 +1652,61 @@ class Context: f'called `.started()` twice on context with {self.chan.uid}' ) - # await self.chan.send( - # { - # 'started': value, - # 'cid': self.cid, - # } - # ) - await self.chan.send( - Started( - cid=self.cid, - pld=value, - ) + started = Started( + cid=self.cid, + pld=value, ) + # XXX MEGA NOTE XXX: ONLY on the first msg sent with + # `Context.started()` do we STRINGENTLY roundtrip-check + # the first payload such that the child side can't send an + # incorrect value according to the currently applied + # msg-spec! + # + # HOWEVER, once a stream is opened via + # `Context.open_stream()` then this check is NEVER done on + # `MsgStream.send()` and instead both the parent and child + # sides are expected to relay back msg-type errors when + # decode failures exhibit on `MsgStream.receive()` calls thus + # enabling a so-called (by the holy 0mq lords) + # "cheap-or-nasty pattern" un-protocol design Bo + # + # https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern + # + codec: MsgCodec = current_codec() + msg_bytes: bytes = codec.encode(started) + try: + # be a "cheap" dialog (see above!) + rt_started = codec.decode(msg_bytes) + if rt_started != started: + + # TODO: break these methods out from the struct subtype? + diff = pretty_struct.Struct.__sub__(rt_started, started) + + complaint: str = ( + 'Started value does not match after codec rountrip?\n\n' + f'{diff}' + ) + # TODO: rn this will pretty much always fail with + # any other sequence type embeded in the + # payload... + if self._strict_started: + raise ValueError(complaint) + else: + log.warning(complaint) + + await self.chan.send(rt_started) + + # raise any msg type error NO MATTER WHAT! + except msgspec.ValidationError as verr: + from tractor._ipc import _raise_msg_type_err + _raise_msg_type_err( + msg=msg_bytes, + codec=codec, + validation_err=verr, + verb_header='Trying to send payload' + # > 'invalid `Started IPC msgs\n' + ) + self._started_called = True async def _drain_overflows(