forked from goodboy/tractor
Type annot updates
parent
4e06b10502
commit
e2169f227d
|
@ -26,8 +26,11 @@ import importlib
|
|||
import importlib.util
|
||||
import inspect
|
||||
import uuid
|
||||
import typing
|
||||
from typing import Any, Optional, Union
|
||||
from typing import (
|
||||
Any, Optional,
|
||||
Union, TYPE_CHECKING,
|
||||
Callable,
|
||||
)
|
||||
from types import ModuleType
|
||||
import sys
|
||||
import os
|
||||
|
@ -57,6 +60,10 @@ from . import _state
|
|||
from . import _mp_fixup_main
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._supervise import ActorNursery
|
||||
|
||||
|
||||
log = get_logger('tractor')
|
||||
|
||||
|
||||
|
@ -65,7 +72,7 @@ async def _invoke(
|
|||
actor: 'Actor',
|
||||
cid: str,
|
||||
chan: Channel,
|
||||
func: typing.Callable,
|
||||
func: Callable,
|
||||
kwargs: dict[str, Any],
|
||||
|
||||
is_rpc: bool = True,
|
||||
|
@ -426,7 +433,7 @@ class Actor:
|
|||
# (chan, cid) -> (cancel_scope, func)
|
||||
self._rpc_tasks: dict[
|
||||
tuple[Channel, str],
|
||||
tuple[trio.CancelScope, typing.Callable, trio.Event]
|
||||
tuple[trio.CancelScope, Callable, trio.Event]
|
||||
] = {}
|
||||
|
||||
# map {actor uids -> Context}
|
||||
|
@ -515,6 +522,7 @@ class Actor:
|
|||
self._no_more_peers = trio.Event() # unset
|
||||
|
||||
chan = Channel.from_stream(stream)
|
||||
uid: Optional[tuple[str, str]] = chan.uid
|
||||
log.runtime(f"New connection to us {chan}")
|
||||
|
||||
# send/receive initial handshake response
|
||||
|
@ -562,7 +570,7 @@ class Actor:
|
|||
# append new channel
|
||||
self._peers[uid].append(chan)
|
||||
|
||||
local_nursery: Optional['ActorNursery'] = None # noqa
|
||||
local_nursery: Optional[ActorNursery] = None # noqa
|
||||
|
||||
# Begin channel management - respond to remote requests and
|
||||
# process received reponses.
|
||||
|
@ -591,10 +599,9 @@ class Actor:
|
|||
entry = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
_, proc, _ = entry
|
||||
if proc.poll() is not None:
|
||||
log.error('Actor {uid} proc died and IPC broke?')
|
||||
else:
|
||||
log.error(f'Actor {uid} IPC connection broke!?')
|
||||
log.error(f'Actor {uid}@{proc} IPC connection broke!?')
|
||||
# if proc.poll() is not None:
|
||||
# log.error('Actor {uid} proc died and IPC broke?')
|
||||
|
||||
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
|
@ -635,7 +642,6 @@ class Actor:
|
|||
log.runtime(f"Releasing channel {chan} from {chan.uid}")
|
||||
chans = self._peers.get(chan.uid)
|
||||
chans.remove(chan)
|
||||
uid = chan.uid
|
||||
|
||||
if not chans:
|
||||
log.runtime(f"No more channels for {chan.uid}")
|
||||
|
|
|
@ -24,7 +24,8 @@ import importlib
|
|||
import inspect
|
||||
from typing import (
|
||||
Any, Optional,
|
||||
Callable, AsyncGenerator
|
||||
Callable, AsyncGenerator,
|
||||
Type,
|
||||
)
|
||||
from functools import partial
|
||||
from dataclasses import dataclass
|
||||
|
@ -444,7 +445,7 @@ class Portal:
|
|||
|
||||
uid = self.channel.uid
|
||||
cid = ctx.cid
|
||||
etype: Optional[Exception] = None
|
||||
etype: Optional[Type[BaseException]] = None
|
||||
|
||||
# deliver context instance and .started() msg value in open tuple.
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue