forked from goodboy/tractor
1
0
Fork 0

Be more pedantic with error handling

wip_fix_asyncio_gen_streaming
Tyler Goodlet 2021-05-10 07:23:39 -04:00
parent 899404932a
commit 01bef653c2
2 changed files with 27 additions and 21 deletions

View File

@ -358,33 +358,39 @@ class Portal:
fn_mod_path, fn_name = func_deats(func) fn_mod_path, fn_name = func_deats(func)
cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs)
assert functype == 'context'
msg = await recv_chan.receive()
try: try:
# the "first" value here is delivered by the callee's cid, recv_chan, functype, first_msg = await self._submit(
# ``Context.started()`` call. fn_mod_path, fn_name, kwargs)
first = msg['started']
except KeyError: assert functype == 'context'
assert msg.get('cid'), ("Received internal error at context?") msg = await recv_chan.receive()
if msg.get('error'): try:
# raise the error message # the "first" value here is delivered by the callee's
raise unpack_error(msg, self.channel) # ``Context.started()`` call.
else: first = msg['started']
raise
try: except KeyError:
assert msg.get('cid'), ("Received internal error at context?")
if msg.get('error'):
# raise the error message
raise unpack_error(msg, self.channel)
else:
raise
# deliver context instance and .started() msg value in open
# tuple.
ctx = Context(self.channel, cid, _portal=self) ctx = Context(self.channel, cid, _portal=self)
yield ctx, first try:
yield ctx, first
finally:
await ctx.cancel()
finally: finally:
await recv_chan.aclose() await recv_chan.aclose()
await ctx.cancel()
@dataclass @dataclass
class LocalPortal: class LocalPortal:

View File

@ -357,7 +357,8 @@ async def open_nursery(
try: try:
if actor is None and is_main_process(): if actor is None and is_main_process():
# if we are the parent process start the actor runtime implicitly # if we are the parent process start the
# actor runtime implicitly
log.info("Starting actor runtime!") log.info("Starting actor runtime!")
# mark us for teardown on exit # mark us for teardown on exit
@ -376,7 +377,6 @@ async def open_nursery(
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
yield anursery yield anursery
finally: finally: