Add error suppress flag to `current_ipc_ctx()`

runtime_to_msgspec
Tyler Goodlet 2024-05-20 16:12:51 -04:00
parent 60fc43e530
commit 13bc3c308d
1 changed files with 8 additions and 2 deletions

View File

@ -124,9 +124,15 @@ _ctxvar_Context: ContextVar[Context] = ContextVar(
)
def current_ipc_ctx() -> Context:
def current_ipc_ctx(
error_on_not_set: bool = False,
) -> Context|None:
ctx: Context = _ctxvar_Context.get()
if not ctx:
if (
not ctx
and error_on_not_set
):
from ._exceptions import InternalError
raise InternalError(
'No IPC context has been allocated for this task yet?\n'