Use a receive mem channel inside portals
For now stop `.aclose()`-ing all async gens on portal close since it can cause hangs and other weird behaviour if another task operates on the same instance. See https://bugs.python.org/issue32526.trio_memchans
parent
f44ac4528a
commit
61680b3729
|
@ -36,7 +36,7 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
|
||||||
async def _do_handshake(
|
async def _do_handshake(
|
||||||
actor: 'Actor', # type: ignore
|
actor: 'Actor', # type: ignore
|
||||||
chan: Channel
|
chan: Channel
|
||||||
)-> Any:
|
) -> Any:
|
||||||
await chan.send(actor.uid)
|
await chan.send(actor.uid)
|
||||||
uid: Tuple[str, str] = await chan.recv()
|
uid: Tuple[str, str] = await chan.recv()
|
||||||
|
|
||||||
|
@ -76,8 +76,11 @@ class Portal:
|
||||||
await self.channel.aclose()
|
await self.channel.aclose()
|
||||||
|
|
||||||
async def _submit(
|
async def _submit(
|
||||||
self, ns: str, func: str, **kwargs
|
self,
|
||||||
) -> Tuple[str, trio.Queue, str, Dict[str, Any]]:
|
ns: str,
|
||||||
|
func: str,
|
||||||
|
kwargs,
|
||||||
|
) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]:
|
||||||
"""Submit a function to be scheduled and run by actor, return the
|
"""Submit a function to be scheduled and run by actor, return the
|
||||||
associated caller id, response queue, response type str,
|
associated caller id, response queue, response type str,
|
||||||
first message packet as a tuple.
|
first message packet as a tuple.
|
||||||
|
@ -85,11 +88,13 @@ class Portal:
|
||||||
This is an async call.
|
This is an async call.
|
||||||
"""
|
"""
|
||||||
# ship a function call request to the remote actor
|
# ship a function call request to the remote actor
|
||||||
cid, q = await current_actor().send_cmd(self.channel, ns, func, kwargs)
|
cid, recv_chan = await current_actor().send_cmd(
|
||||||
|
self.channel, ns, func, kwargs)
|
||||||
|
|
||||||
# wait on first response msg and handle (this should be
|
# wait on first response msg and handle (this should be
|
||||||
# in an immediate response)
|
# in an immediate response)
|
||||||
first_msg = await q.get()
|
|
||||||
|
first_msg = await recv_chan.receive()
|
||||||
functype = first_msg.get('functype')
|
functype = first_msg.get('functype')
|
||||||
|
|
||||||
if functype == 'function' or functype == 'asyncfunction':
|
if functype == 'function' or functype == 'asyncfunction':
|
||||||
|
@ -101,12 +106,12 @@ class Portal:
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"{first_msg} is an invalid response packet?")
|
raise ValueError(f"{first_msg} is an invalid response packet?")
|
||||||
|
|
||||||
return cid, q, resp_type, first_msg
|
return cid, recv_chan, resp_type, first_msg
|
||||||
|
|
||||||
async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
|
async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
|
||||||
assert self._expect_result is None, \
|
assert self._expect_result is None, \
|
||||||
"A pending main result has already been submitted"
|
"A pending main result has already been submitted"
|
||||||
self._expect_result = await self._submit(ns, func, **kwargs)
|
self._expect_result = await self._submit(ns, func, kwargs)
|
||||||
|
|
||||||
async def run(self, ns: str, func: str, **kwargs) -> Any:
|
async def run(self, ns: str, func: str, **kwargs) -> Any:
|
||||||
"""Submit a remote function to be scheduled and run by actor,
|
"""Submit a remote function to be scheduled and run by actor,
|
||||||
|
@ -116,62 +121,67 @@ class Portal:
|
||||||
remote rpc task or a local async generator instance.
|
remote rpc task or a local async generator instance.
|
||||||
"""
|
"""
|
||||||
return await self._return_from_resptype(
|
return await self._return_from_resptype(
|
||||||
*(await self._submit(ns, func, **kwargs))
|
*(await self._submit(ns, func, kwargs))
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _return_from_resptype(
|
async def _return_from_resptype(
|
||||||
self, cid: str, q: trio.Queue, resptype: str, first_msg: dict
|
self,
|
||||||
|
cid: str,
|
||||||
|
recv_chan: trio.abc.ReceiveChannel,
|
||||||
|
resptype: str,
|
||||||
|
first_msg: dict
|
||||||
) -> Any:
|
) -> Any:
|
||||||
# TODO: not this needs some serious work and thinking about how
|
# TODO: not this needs some serious work and thinking about how
|
||||||
# to make async-generators the fundamental IPC API over channels!
|
# to make async-generators the fundamental IPC API over channels!
|
||||||
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
||||||
|
|
||||||
if resptype == 'yield': # stream response
|
if resptype == 'yield': # stream response
|
||||||
|
|
||||||
async def yield_from_q():
|
async def yield_from_recvchan():
|
||||||
try:
|
async with recv_chan:
|
||||||
async for msg in q:
|
try:
|
||||||
try:
|
async for msg in recv_chan:
|
||||||
yield msg['yield']
|
try:
|
||||||
except KeyError:
|
yield msg['yield']
|
||||||
if 'stop' in msg:
|
except KeyError:
|
||||||
break # far end async gen terminated
|
if 'stop' in msg:
|
||||||
else:
|
break # far end async gen terminated
|
||||||
# internal error should never get here
|
else:
|
||||||
assert msg.get('cid'), (
|
# internal error should never get here
|
||||||
"Received internal error at portal?")
|
assert msg.get('cid'), (
|
||||||
raise unpack_error(msg, self.channel)
|
"Received internal error at portal?")
|
||||||
|
raise unpack_error(msg, self.channel)
|
||||||
|
|
||||||
except (GeneratorExit, trio.Cancelled):
|
except (GeneratorExit, trio.Cancelled) as err:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Cancelling async gen call {cid} to "
|
f"Cancelling async gen call {cid} to "
|
||||||
f"{self.channel.uid}")
|
f"{self.channel.uid}")
|
||||||
with trio.move_on_after(0.5) as cs:
|
with trio.move_on_after(0.5) as cs:
|
||||||
cs.shield = True
|
cs.shield = True
|
||||||
# TODO: yeah.. it'd be nice if this was just an
|
# TODO: yeah.. it'd be nice if this was just an
|
||||||
# async func on the far end. Gotta figure out a
|
# async func on the far end. Gotta figure out a
|
||||||
# better way then implicitly feeding the ctx
|
# better way then implicitly feeding the ctx
|
||||||
# to declaring functions; likely a decorator
|
# to declaring functions; likely a decorator
|
||||||
# sytsem.
|
# system.
|
||||||
agen = await self.run('self', 'cancel_task', cid=cid)
|
agen = await self.run(
|
||||||
async with aclosing(agen) as agen:
|
'self', 'cancel_task', cid=cid)
|
||||||
async for _ in agen:
|
async with aclosing(agen) as agen:
|
||||||
pass
|
async for _ in agen:
|
||||||
if cs.cancelled_caught:
|
pass
|
||||||
if not self.channel.connected():
|
|
||||||
log.warning(
|
if cs.cancelled_caught:
|
||||||
"May have failed to cancel remote task "
|
if not self.channel.connected():
|
||||||
f"{cid} for {self.channel.uid}")
|
log.warning(
|
||||||
raise
|
"May have failed to cancel remote task "
|
||||||
|
f"{cid} for {self.channel.uid}")
|
||||||
|
|
||||||
# TODO: use AsyncExitStack to aclose() all agens
|
# TODO: use AsyncExitStack to aclose() all agens
|
||||||
# on teardown
|
# on teardown
|
||||||
agen = yield_from_q()
|
agen = yield_from_recvchan()
|
||||||
self._agens.add(agen)
|
self._agens.add(agen)
|
||||||
return agen
|
return agen
|
||||||
|
|
||||||
elif resptype == 'return': # single response
|
elif resptype == 'return': # single response
|
||||||
msg = await q.get()
|
msg = await recv_chan.receive()
|
||||||
try:
|
try:
|
||||||
return msg['return']
|
return msg['return']
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -214,17 +224,20 @@ class Portal:
|
||||||
|
|
||||||
return self._result
|
return self._result
|
||||||
|
|
||||||
async def _cancel_streams(self):
|
# async def _cancel_streams(self):
|
||||||
# terminate all locally running async generator
|
# # terminate all locally running async generator
|
||||||
# IPC calls
|
# # IPC calls
|
||||||
if self._agens:
|
# if self._agens:
|
||||||
log.warning(
|
# log.warning(
|
||||||
f"Cancelling all streams with {self.channel.uid}")
|
# f"Cancelling all streams with {self.channel.uid}")
|
||||||
for agen in self._agens:
|
# for agen in self._agens:
|
||||||
await agen.aclose()
|
# await agen.aclose()
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def aclose(self) -> None:
|
||||||
await self._cancel_streams()
|
# TODO: once we move to implementing our own `ReceiveChannel`
|
||||||
|
# (including remote task cancellation inside its `.aclose()`)
|
||||||
|
# we'll need to .aclose all those channels here
|
||||||
|
pass
|
||||||
|
|
||||||
async def cancel_actor(self) -> bool:
|
async def cancel_actor(self) -> bool:
|
||||||
"""Cancel the actor on the other end of this portal.
|
"""Cancel the actor on the other end of this portal.
|
||||||
|
@ -233,7 +246,7 @@ class Portal:
|
||||||
log.warning("This portal is already closed can't cancel")
|
log.warning("This portal is already closed can't cancel")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
await self._cancel_streams()
|
# await self._cancel_streams()
|
||||||
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Sending actor cancel request to {self.channel.uid} on "
|
f"Sending actor cancel request to {self.channel.uid} on "
|
||||||
|
@ -308,7 +321,7 @@ async def open_portal(
|
||||||
try:
|
try:
|
||||||
yield portal
|
yield portal
|
||||||
finally:
|
finally:
|
||||||
await portal.close()
|
await portal.aclose()
|
||||||
|
|
||||||
if was_connected:
|
if was_connected:
|
||||||
# cancel remote channel-msg loop
|
# cancel remote channel-msg loop
|
||||||
|
|
Loading…
Reference in New Issue