forked from goodboy/tractor
1
0
Fork 0

Type annot updates

sigint2
Tyler Goodlet 2022-02-14 10:37:31 -05:00
parent 81aa12b46d
commit 266b0053dc
2 changed files with 19 additions and 12 deletions

View File

@ -26,8 +26,11 @@ import importlib
import importlib.util import importlib.util
import inspect import inspect
import uuid import uuid
import typing from typing import (
from typing import Any, Optional, Union Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
from types import ModuleType from types import ModuleType
import sys import sys
import os import os
@ -57,6 +60,10 @@ from . import _state
from . import _mp_fixup_main from . import _mp_fixup_main
if TYPE_CHECKING:
from ._supervise import ActorNursery
log = get_logger('tractor') log = get_logger('tractor')
@ -65,7 +72,7 @@ async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
func: typing.Callable, func: Callable,
kwargs: dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
@ -426,7 +433,7 @@ class Actor:
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
tuple[Channel, str], tuple[Channel, str],
tuple[trio.CancelScope, typing.Callable, trio.Event] tuple[trio.CancelScope, Callable, trio.Event]
] = {} ] = {}
# map {actor uids -> Context} # map {actor uids -> Context}
@ -515,6 +522,7 @@ class Actor:
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
uid: Optional[tuple[str, str]] = chan.uid
log.runtime(f"New connection to us {chan}") log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
@ -562,7 +570,7 @@ class Actor:
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
local_nursery: Optional['ActorNursery'] = None # noqa local_nursery: Optional[ActorNursery] = None # noqa
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
@ -591,10 +599,9 @@ class Actor:
entry = local_nursery._children.get(uid) entry = local_nursery._children.get(uid)
if entry: if entry:
_, proc, _ = entry _, proc, _ = entry
if proc.poll() is not None: log.error(f'Actor {uid}@{proc} IPC connection broke!?')
log.error('Actor {uid} proc died and IPC broke?') # if proc.poll() is not None:
else: # log.error('Actor {uid} proc died and IPC broke?')
log.error(f'Actor {uid} IPC connection broke!?')
log.cancel(f"Waiting on cancel request to peer {chan.uid}") log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
@ -635,7 +642,6 @@ class Actor:
log.runtime(f"Releasing channel {chan} from {chan.uid}") log.runtime(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
uid = chan.uid
if not chans: if not chans:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")

View File

@ -24,7 +24,8 @@ import importlib
import inspect import inspect
from typing import ( from typing import (
Any, Optional, Any, Optional,
Callable, AsyncGenerator Callable, AsyncGenerator,
Type,
) )
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
@ -444,7 +445,7 @@ class Portal:
uid = self.channel.uid uid = self.channel.uid
cid = ctx.cid cid = ctx.cid
etype: Optional[Exception] = None etype: Optional[Type[BaseException]] = None
# deliver context instance and .started() msg value in open tuple. # deliver context instance and .started() msg value in open tuple.
try: try: