Support `chan.started_nowait()` in `.open_channel_from()` target
That is the `target` can declare a `chan: LinkedTaskChannel` instead of `to_trio`/`from_aio`. To support it, - change `.started()` -> the more appropriate `.started_nowait()` which can be called sync from the aio child task. - adjust the `provide_channels` assert to accept either fn sig declaration (for now). Still needs test(s) obvi..to_asyncio_eoc_signal
							parent
							
								
									466dce8aed
								
							
						
					
					
						commit
						ab11ee4fbe
					
				| 
						 | 
				
			
			@ -209,10 +209,15 @@ class LinkedTaskChannel(
 | 
			
		|||
    async def aclose(self) -> None:
 | 
			
		||||
        await self._from_aio.aclose()
 | 
			
		||||
 | 
			
		||||
    def started(
 | 
			
		||||
    # ?TODO? async version of this?
 | 
			
		||||
    def started_nowait(
 | 
			
		||||
        self,
 | 
			
		||||
        val: Any = None,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
        Synchronize aio-sde with its trio-parent.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        self._aio_started_val = val
 | 
			
		||||
        return self._to_trio.send_nowait(val)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -349,18 +354,6 @@ def _run_asyncio_task(
 | 
			
		|||
        # value otherwise it would just return ;P
 | 
			
		||||
        assert qsize > 1
 | 
			
		||||
 | 
			
		||||
    if provide_channels:
 | 
			
		||||
        assert 'to_trio' in args
 | 
			
		||||
 | 
			
		||||
    # allow target func to accept/stream results manually by name
 | 
			
		||||
    if 'to_trio' in args:
 | 
			
		||||
        kwargs['to_trio'] = to_trio
 | 
			
		||||
 | 
			
		||||
    if 'from_trio' in args:
 | 
			
		||||
        kwargs['from_trio'] = from_trio
 | 
			
		||||
 | 
			
		||||
    coro = func(**kwargs)
 | 
			
		||||
 | 
			
		||||
    trio_task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
    trio_cs = trio.CancelScope()
 | 
			
		||||
    aio_task_complete = trio.Event()
 | 
			
		||||
| 
						 | 
				
			
			@ -375,6 +368,25 @@ def _run_asyncio_task(
 | 
			
		|||
        _suppress_graceful_exits=suppress_graceful_exits,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # allow target func to accept/stream results manually by name
 | 
			
		||||
    if 'to_trio' in args:
 | 
			
		||||
        kwargs['to_trio'] = to_trio
 | 
			
		||||
 | 
			
		||||
    if 'from_trio' in args:
 | 
			
		||||
        kwargs['from_trio'] = from_trio
 | 
			
		||||
 | 
			
		||||
    if 'chan' in args:
 | 
			
		||||
        kwargs['chan'] = chan
 | 
			
		||||
 | 
			
		||||
    if provide_channels:
 | 
			
		||||
        assert (
 | 
			
		||||
            'to_trio' in args
 | 
			
		||||
            or
 | 
			
		||||
            'chan' in args
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    coro = func(**kwargs)
 | 
			
		||||
 | 
			
		||||
    async def wait_on_coro_final_result(
 | 
			
		||||
        to_trio: trio.MemorySendChannel,
 | 
			
		||||
        coro: Awaitable,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue