forked from goodboy/tractor
'Fix mypy, change interal type name to `ReceiveStream`, settle on `.shield()`'
parent
15ead6b561
commit
201771a521
|
@ -262,7 +262,7 @@ async def test_respawn_consumer_task(
|
||||||
spawn_backend,
|
spawn_backend,
|
||||||
loglevel,
|
loglevel,
|
||||||
):
|
):
|
||||||
"""Verify that ``._portal.StreamReceiveChannel.shield_channel()``
|
"""Verify that ``._portal.ReceiveStream.shield()``
|
||||||
sucessfully protects the underlying IPC channel from being closed
|
sucessfully protects the underlying IPC channel from being closed
|
||||||
when cancelling and respawning a consumer task.
|
when cancelling and respawning a consumer task.
|
||||||
|
|
||||||
|
@ -292,7 +292,7 @@ async def test_respawn_consumer_task(
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
|
|
||||||
# shield stream's underlying channel from cancellation
|
# shield stream's underlying channel from cancellation
|
||||||
with stream.shield_channel():
|
with stream.shield():
|
||||||
|
|
||||||
async for v in stream:
|
async for v in stream:
|
||||||
print(f'from stream: {v}')
|
print(f'from stream: {v}')
|
||||||
|
|
|
@ -4,7 +4,7 @@ Portal api
|
||||||
import importlib
|
import importlib
|
||||||
import inspect
|
import inspect
|
||||||
import typing
|
import typing
|
||||||
from typing import Tuple, Any, Dict, Optional, Set
|
from typing import Tuple, Any, Dict, Optional, Set, Iterator
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
@ -38,7 +38,7 @@ async def maybe_open_nursery(
|
||||||
yield nursery
|
yield nursery
|
||||||
|
|
||||||
|
|
||||||
class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
class ReceiveStream(trio.abc.ReceiveChannel):
|
||||||
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
|
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
|
||||||
special behaviour for signalling stream termination across an
|
special behaviour for signalling stream termination across an
|
||||||
inter-actor ``Channel``. This is the type returned to a local task
|
inter-actor ``Channel``. This is the type returned to a local task
|
||||||
|
@ -86,9 +86,9 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
||||||
raise unpack_error(msg, self._portal.channel)
|
raise unpack_error(msg, self._portal.channel)
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def shield_channel(
|
def shield(
|
||||||
self
|
self
|
||||||
) -> typing.AsyncGenerator['StreamReceiveChannel', None]:
|
) -> Iterator['ReceiveStream']: # noqa
|
||||||
"""Shield this stream's underlying channel such that a local consumer task
|
"""Shield this stream's underlying channel such that a local consumer task
|
||||||
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
|
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
|
||||||
|
|
||||||
|
@ -156,7 +156,7 @@ class Portal:
|
||||||
self._expect_result: Optional[
|
self._expect_result: Optional[
|
||||||
Tuple[str, Any, str, Dict[str, Any]]
|
Tuple[str, Any, str, Dict[str, Any]]
|
||||||
] = None
|
] = None
|
||||||
self._streams: Set[StreamReceiveChannel] = set()
|
self._streams: Set[ReceiveStream] = set()
|
||||||
self.actor = current_actor()
|
self.actor = current_actor()
|
||||||
|
|
||||||
async def _submit(
|
async def _submit(
|
||||||
|
@ -219,7 +219,7 @@ class Portal:
|
||||||
# to make async-generators the fundamental IPC API over channels!
|
# to make async-generators the fundamental IPC API over channels!
|
||||||
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
||||||
if resptype == 'yield': # stream response
|
if resptype == 'yield': # stream response
|
||||||
rchan = StreamReceiveChannel(cid, recv_chan, self)
|
rchan = ReceiveStream(cid, recv_chan, self)
|
||||||
self._streams.add(rchan)
|
self._streams.add(rchan)
|
||||||
return rchan
|
return rchan
|
||||||
|
|
||||||
|
@ -322,7 +322,7 @@ class LocalPortal:
|
||||||
A compatibility shim for normal portals but for invoking functions
|
A compatibility shim for normal portals but for invoking functions
|
||||||
using an in process actor instance.
|
using an in process actor instance.
|
||||||
"""
|
"""
|
||||||
actor: 'Actor' # type: ignore
|
actor: 'Actor' # type: ignore # noqa
|
||||||
channel: Channel
|
channel: Channel
|
||||||
|
|
||||||
async def run(self, ns: str, func_name: str, **kwargs) -> Any:
|
async def run(self, ns: str, func_name: str, **kwargs) -> Any:
|
||||||
|
|
Loading…
Reference in New Issue