forked from goodboy/tractor
				
			Drop old (and deluded) "streaming" cruft
							parent
							
								
									7a65165279
								
							
						
					
					
						commit
						41eddffc2c
					
				| 
						 | 
				
			
			@ -24,33 +24,19 @@ log = get_logger(__name__)
 | 
			
		|||
__all__ = ['run_task', 'run_as_asyncio_guest']
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# async def consume_asyncgen(
 | 
			
		||||
#     to_trio: trio.MemorySendChannel,
 | 
			
		||||
#     coro: AsyncIterator,
 | 
			
		||||
# ) -> None:
 | 
			
		||||
#     """Stream async generator results back to ``trio``.
 | 
			
		||||
 | 
			
		||||
#     ``from_trio`` might eventually be used here for
 | 
			
		||||
#     bidirectional streaming.
 | 
			
		||||
#     """
 | 
			
		||||
#     async for item in coro:
 | 
			
		||||
#         to_trio.send_nowait(item)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _run_asyncio_task(
 | 
			
		||||
    func: Callable,
 | 
			
		||||
    *,
 | 
			
		||||
    qsize: int = 1,
 | 
			
		||||
    # _treat_as_stream: bool = False,
 | 
			
		||||
    provide_channels: bool = False,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
 | 
			
		||||
) -> Any:
 | 
			
		||||
    """
 | 
			
		||||
    '''
 | 
			
		||||
    Run an ``asyncio`` async function or generator in a task, return
 | 
			
		||||
    or stream the result back to ``trio``.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    '''
 | 
			
		||||
    if not current_actor().is_infected_aio():
 | 
			
		||||
        raise RuntimeError("`infect_asyncio` mode is not enabled!?")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -66,7 +52,6 @@ def _run_asyncio_task(
 | 
			
		|||
        # the assumption is that the target async routine accepts the
 | 
			
		||||
        # send channel then it intends to yield more then one return
 | 
			
		||||
        # value otherwise it would just return ;P
 | 
			
		||||
        # _treat_as_stream = True
 | 
			
		||||
        assert qsize > 1
 | 
			
		||||
 | 
			
		||||
    if provide_channels:
 | 
			
		||||
| 
						 | 
				
			
			@ -91,10 +76,10 @@ def _run_asyncio_task(
 | 
			
		|||
        aio_task_complete: trio.Event,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        '''
 | 
			
		||||
        Await ``coro`` and relay result back to ``trio``.
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        '''
 | 
			
		||||
        nonlocal aio_err
 | 
			
		||||
        orig = result = id(coro)
 | 
			
		||||
        try:
 | 
			
		||||
| 
						 | 
				
			
			@ -114,9 +99,6 @@ def _run_asyncio_task(
 | 
			
		|||
            wait_on_coro_final_result(to_trio, coro, aio_task_complete)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # elif inspect.isasyncgen(coro):
 | 
			
		||||
    #     task = asyncio.create_task(consume_asyncgen(to_trio, coro))
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        raise TypeError(f"No support for invoking {coro}")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -147,7 +129,6 @@ async def run_task(
 | 
			
		|||
    *,
 | 
			
		||||
 | 
			
		||||
    qsize: int = 2**10,
 | 
			
		||||
    # _treat_as_stream: bool = False,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
 | 
			
		||||
) -> Any:
 | 
			
		||||
| 
						 | 
				
			
			@ -184,28 +165,19 @@ async def run_task(
 | 
			
		|||
            raise err from aio_err
 | 
			
		||||
        else:
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    # except trio.Cancelled:
 | 
			
		||||
        # raise
 | 
			
		||||
    finally:
 | 
			
		||||
        if not task.done():
 | 
			
		||||
            task.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: explicit api for the streaming case where
 | 
			
		||||
# TODO: explicitly api for the streaming case where
 | 
			
		||||
# we pull from the mem chan in an async generator?
 | 
			
		||||
# This ends up looking more like our ``Portal.open_stream_from()``
 | 
			
		||||
# NB: code below is untested.
 | 
			
		||||
 | 
			
		||||
# async def _start_and_sync_aio_task(
 | 
			
		||||
#     from_trio,
 | 
			
		||||
#     to_trio,
 | 
			
		||||
#     from_aio,
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_channel_from(
 | 
			
		||||
 | 
			
		||||
    target: Callable[[Any, ...], Any],
 | 
			
		||||
    **kwargs,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -250,7 +222,8 @@ async def open_channel_from(
 | 
			
		|||
def run_as_asyncio_guest(
 | 
			
		||||
    trio_main: Callable,
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Entry for an "infected ``asyncio`` actor".
 | 
			
		||||
    '''
 | 
			
		||||
    Entry for an "infected ``asyncio`` actor".
 | 
			
		||||
 | 
			
		||||
    Uh, oh. :o
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -265,8 +238,8 @@ def run_as_asyncio_guest(
 | 
			
		|||
 | 
			
		||||
    :)
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    # Disable sigint handling in children?
 | 
			
		||||
    '''
 | 
			
		||||
    # Disable sigint handling in children? (nawp)
 | 
			
		||||
    # import signal
 | 
			
		||||
    # signal.signal(signal.SIGINT, signal.SIG_IGN)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue