commit
a105e32e34
|
@ -1,39 +1,41 @@
|
||||||
|
from typing import AsyncIterator
|
||||||
from itertools import repeat
|
from itertools import repeat
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
tractor.log.get_console_log("INFO")
|
|
||||||
|
|
||||||
|
async def stream_forever() -> AsyncIterator[int]:
|
||||||
|
|
||||||
async def stream_forever():
|
|
||||||
for i in repeat("I can see these little future bubble things"):
|
for i in repeat("I can see these little future bubble things"):
|
||||||
# each yielded value is sent over the ``Channel`` to the
|
# each yielded value is sent over the ``Channel`` to the parent actor
|
||||||
# parent actor
|
|
||||||
yield i
|
yield i
|
||||||
await trio.sleep(0.01)
|
await trio.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
# stream for at most 1 seconds
|
async with tractor.open_nursery() as n:
|
||||||
with trio.move_on_after(1) as cancel_scope:
|
|
||||||
|
|
||||||
async with tractor.open_nursery() as n:
|
portal = await n.start_actor(
|
||||||
|
'donny',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
portal = await n.start_actor(
|
# this async for loop streams values from the above
|
||||||
'donny',
|
# async generator running in a separate process
|
||||||
enable_modules=[__name__],
|
async with portal.open_stream_from(stream_forever) as stream:
|
||||||
)
|
count = 0
|
||||||
|
async for letter in stream:
|
||||||
|
print(letter)
|
||||||
|
count += 1
|
||||||
|
|
||||||
# this async for loop streams values from the above
|
if count > 50:
|
||||||
# async generator running in a separate process
|
break
|
||||||
async with portal.open_stream_from(stream_forever) as stream:
|
|
||||||
async for letter in stream:
|
|
||||||
print(letter)
|
|
||||||
|
|
||||||
# we support trio's cancellation system
|
print('stream terminated')
|
||||||
assert cancel_scope.cancelled_caught
|
|
||||||
assert n.cancelled
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -422,27 +422,22 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
|
||||||
|
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
||||||
|
|
||||||
|
timed_out_early: bool = False
|
||||||
|
|
||||||
for i in range(12):
|
for i in range(12):
|
||||||
try:
|
try:
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
except (
|
except pexpect.exceptions.EOF:
|
||||||
pexpect.exceptions.EOF,
|
|
||||||
pexpect.exceptions.TIMEOUT,
|
|
||||||
):
|
|
||||||
# races all over..
|
|
||||||
|
|
||||||
print(f"Failed early on {i}?")
|
|
||||||
before = str(child.before.decode())
|
|
||||||
|
|
||||||
timed_out_early = True
|
|
||||||
|
|
||||||
# race conditions on how fast the continue is sent?
|
# race conditions on how fast the continue is sent?
|
||||||
|
print(f"Failed early on {i}?")
|
||||||
|
timed_out_early = True
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
if not timed_out_early:
|
if not timed_out_early:
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
@ -499,6 +494,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
break
|
break
|
||||||
except pexpect.exceptions.TIMEOUT:
|
except pexpect.exceptions.TIMEOUT:
|
||||||
|
child.sendline('c')
|
||||||
print('child was able to grab tty lock again?')
|
print('child was able to grab tty lock again?')
|
||||||
|
|
||||||
if not timed_out_early:
|
if not timed_out_early:
|
||||||
|
|
|
@ -17,6 +17,7 @@ from . import _spawn
|
||||||
from . import _state
|
from . import _state
|
||||||
from . import log
|
from . import log
|
||||||
from ._ipc import _connect_chan
|
from ._ipc import _connect_chan
|
||||||
|
from ._exceptions import is_multi_cancelled
|
||||||
|
|
||||||
|
|
||||||
# set at startup and after forks
|
# set at startup and after forks
|
||||||
|
@ -177,7 +178,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
entered = await _debug._maybe_enter_pm(err)
|
entered = await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
if not entered:
|
if not entered and not is_multi_cancelled(err):
|
||||||
logger.exception("Root actor crashed:")
|
logger.exception("Root actor crashed:")
|
||||||
|
|
||||||
# always re-raise
|
# always re-raise
|
||||||
|
|
|
@ -281,7 +281,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# Instead try to wait for pdb to be released before
|
# Instead try to wait for pdb to be released before
|
||||||
# tearing down.
|
# tearing down.
|
||||||
if is_root_process():
|
if is_root_process():
|
||||||
log.exception(f"we're root with {err}")
|
|
||||||
|
|
||||||
# TODO: could this make things more deterministic?
|
# TODO: could this make things more deterministic?
|
||||||
# wait to see if a sub-actor task will be
|
# wait to see if a sub-actor task will be
|
||||||
|
|
Loading…
Reference in New Issue