forked from goodboy/tractor
				
			`Logger.warn()` is deprecated
							parent
							
								
									d12136d44d
								
							
						
					
					
						commit
						d808ffd8f3
					
				| 
						 | 
					@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
import importlib
 | 
					import importlib
 | 
				
			||||||
from functools import partial
 | 
					from functools import partial
 | 
				
			||||||
from typing import Tuple, Any
 | 
					from typing import Tuple, Any, Optional
 | 
				
			||||||
import typing
 | 
					import typing
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import trio  # type: ignore
 | 
					import trio  # type: ignore
 | 
				
			||||||
| 
						 | 
					@ -56,7 +56,7 @@ async def _main(
 | 
				
			||||||
        async with _connect_chan(host, port):
 | 
					        async with _connect_chan(host, port):
 | 
				
			||||||
            arbiter_found = True
 | 
					            arbiter_found = True
 | 
				
			||||||
    except OSError:
 | 
					    except OSError:
 | 
				
			||||||
        log.warn(f"No actor could be found @ {host}:{port}")
 | 
					        log.warning(f"No actor could be found @ {host}:{port}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # create a local actor and start up its main routine/task
 | 
					    # create a local actor and start up its main routine/task
 | 
				
			||||||
    if arbiter_found:  # we were able to connect to an arbiter
 | 
					    if arbiter_found:  # we were able to connect to an arbiter
 | 
				
			||||||
| 
						 | 
					@ -97,14 +97,12 @@ def run(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def run_daemon(
 | 
					def run_daemon(
 | 
				
			||||||
    rpc_modules: Tuple[str] = (),
 | 
					    rpc_modules: Optional[Tuple[str]] = None,
 | 
				
			||||||
    **kwargs
 | 
					    **kwargs
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    for path in rpc_modules:
 | 
					    for path in rpc_modules or ():
 | 
				
			||||||
        importlib.import_module(path)
 | 
					        importlib.import_module(path)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return run(
 | 
					    kwargs['rpc_module_paths'] = rpc_modules
 | 
				
			||||||
        partial(trio.sleep, float('inf')),
 | 
					
 | 
				
			||||||
        rpc_module_paths=rpc_modules,
 | 
					    return run(partial(trio.sleep, float('inf')), **kwargs)
 | 
				
			||||||
        **kwargs
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -233,7 +233,7 @@ class Actor:
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            uid = await _do_handshake(self, chan)
 | 
					            uid = await _do_handshake(self, chan)
 | 
				
			||||||
        except StopAsyncIteration:
 | 
					        except StopAsyncIteration:
 | 
				
			||||||
            log.warn(f"Channel {chan} failed to handshake")
 | 
					            log.warning(f"Channel {chan} failed to handshake")
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # channel tracking
 | 
					        # channel tracking
 | 
				
			||||||
| 
						 | 
					@ -248,7 +248,7 @@ class Actor:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        chans = self._peers[uid]
 | 
					        chans = self._peers[uid]
 | 
				
			||||||
        if chans:
 | 
					        if chans:
 | 
				
			||||||
            log.warn(
 | 
					            log.warning(
 | 
				
			||||||
                f"already have channel(s) for {uid}:{chans}?"
 | 
					                f"already have channel(s) for {uid}:{chans}?"
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
        log.debug(f"Registered {chan} for {uid}")
 | 
					        log.debug(f"Registered {chan} for {uid}")
 | 
				
			||||||
| 
						 | 
					@ -400,7 +400,7 @@ class Actor:
 | 
				
			||||||
        parent_addr: Tuple[str, int] = None
 | 
					        parent_addr: Tuple[str, int] = None
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
        # after fork routine which invokes a fresh ``trio.run``
 | 
					        # after fork routine which invokes a fresh ``trio.run``
 | 
				
			||||||
        # log.warn("Log level after fork is {self.loglevel}")
 | 
					        # log.warning("Log level after fork is {self.loglevel}")
 | 
				
			||||||
        self._forkserver_info = forkserver_info
 | 
					        self._forkserver_info = forkserver_info
 | 
				
			||||||
        from ._trionics import ctx
 | 
					        from ._trionics import ctx
 | 
				
			||||||
        if self.loglevel is not None:
 | 
					        if self.loglevel is not None:
 | 
				
			||||||
| 
						 | 
					@ -456,7 +456,7 @@ class Actor:
 | 
				
			||||||
                        # initial handshake, report who we are, who they are
 | 
					                        # initial handshake, report who we are, who they are
 | 
				
			||||||
                        await _do_handshake(self, chan)
 | 
					                        await _do_handshake(self, chan)
 | 
				
			||||||
                    except OSError:  # failed to connect
 | 
					                    except OSError:  # failed to connect
 | 
				
			||||||
                        log.warn(
 | 
					                        log.warning(
 | 
				
			||||||
                            f"Failed to connect to parent @ {parent_addr},"
 | 
					                            f"Failed to connect to parent @ {parent_addr},"
 | 
				
			||||||
                            " closing server")
 | 
					                            " closing server")
 | 
				
			||||||
                        await self.cancel()
 | 
					                        await self.cancel()
 | 
				
			||||||
| 
						 | 
					@ -555,7 +555,7 @@ class Actor:
 | 
				
			||||||
                    await arb_portal.run(
 | 
					                    await arb_portal.run(
 | 
				
			||||||
                        'self', 'unregister_actor', uid=self.uid)
 | 
					                        'self', 'unregister_actor', uid=self.uid)
 | 
				
			||||||
        except OSError:
 | 
					        except OSError:
 | 
				
			||||||
            log.warn(f"Unable to unregister {self.name} from arbiter")
 | 
					            log.warning(f"Unable to unregister {self.name} from arbiter")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def cancel(self) -> None:
 | 
					    async def cancel(self) -> None:
 | 
				
			||||||
        """Cancel this actor.
 | 
					        """Cancel this actor.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -158,11 +158,11 @@ class Channel:
 | 
				
			||||||
                    await self.connect()
 | 
					                    await self.connect()
 | 
				
			||||||
                cancelled = cancel_scope.cancelled_caught
 | 
					                cancelled = cancel_scope.cancelled_caught
 | 
				
			||||||
                if cancelled:
 | 
					                if cancelled:
 | 
				
			||||||
                    log.warn(
 | 
					                    log.warning(
 | 
				
			||||||
                        "Reconnect timed out after 3 seconds, retrying...")
 | 
					                        "Reconnect timed out after 3 seconds, retrying...")
 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    log.warn("Stream connection re-established!")
 | 
					                    log.warning("Stream connection re-established!")
 | 
				
			||||||
                    # run any reconnection sequence
 | 
					                    # run any reconnection sequence
 | 
				
			||||||
                    on_recon = self._recon_seq
 | 
					                    on_recon = self._recon_seq
 | 
				
			||||||
                    if on_recon:
 | 
					                    if on_recon:
 | 
				
			||||||
| 
						 | 
					@ -171,7 +171,7 @@ class Channel:
 | 
				
			||||||
            except (OSError, ConnectionRefusedError):
 | 
					            except (OSError, ConnectionRefusedError):
 | 
				
			||||||
                if not down:
 | 
					                if not down:
 | 
				
			||||||
                    down = True
 | 
					                    down = True
 | 
				
			||||||
                    log.warn(
 | 
					                    log.warning(
 | 
				
			||||||
                        f"Connection to {self.raddr} went down, waiting"
 | 
					                        f"Connection to {self.raddr} went down, waiting"
 | 
				
			||||||
                        " for re-establishment")
 | 
					                        " for re-establishment")
 | 
				
			||||||
                await trio.sleep(1)
 | 
					                await trio.sleep(1)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -186,7 +186,7 @@ class Portal:
 | 
				
			||||||
    async def cancel_actor(self) -> bool:
 | 
					    async def cancel_actor(self) -> bool:
 | 
				
			||||||
        """Cancel the actor on the other end of this portal.
 | 
					        """Cancel the actor on the other end of this portal.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        log.warn(
 | 
					        log.warning(
 | 
				
			||||||
            f"Sending cancel request to {self.channel.uid} on "
 | 
					            f"Sending cancel request to {self.channel.uid} on "
 | 
				
			||||||
            f"{self.channel}")
 | 
					            f"{self.channel}")
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
| 
						 | 
					@ -196,11 +196,11 @@ class Portal:
 | 
				
			||||||
                await self.run('self', 'cancel')
 | 
					                await self.run('self', 'cancel')
 | 
				
			||||||
                return True
 | 
					                return True
 | 
				
			||||||
        except trio.ClosedResourceError:
 | 
					        except trio.ClosedResourceError:
 | 
				
			||||||
            log.warn(
 | 
					            log.warning(
 | 
				
			||||||
                f"{self.channel} for {self.channel.uid} was already closed?")
 | 
					                f"{self.channel} for {self.channel.uid} was already closed?")
 | 
				
			||||||
            return False
 | 
					            return False
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            log.warn(f"May have failed to cancel {self.channel.uid}")
 | 
					            log.warning(f"May have failed to cancel {self.channel.uid}")
 | 
				
			||||||
            return False
 | 
					            return False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -153,7 +153,7 @@ class ActorNursery:
 | 
				
			||||||
                # if it's an async-gen then we should alert the user
 | 
					                # if it's an async-gen then we should alert the user
 | 
				
			||||||
                # that we're cancelling it
 | 
					                # that we're cancelling it
 | 
				
			||||||
                if inspect.isasyncgen(res):
 | 
					                if inspect.isasyncgen(res):
 | 
				
			||||||
                    log.warn(
 | 
					                    log.warning(
 | 
				
			||||||
                        f"Blindly consuming asyncgen for {actor.uid}")
 | 
					                        f"Blindly consuming asyncgen for {actor.uid}")
 | 
				
			||||||
                    with trio.fail_after(1):
 | 
					                    with trio.fail_after(1):
 | 
				
			||||||
                        async with aclosing(res) as agen:
 | 
					                        async with aclosing(res) as agen:
 | 
				
			||||||
| 
						 | 
					@ -177,7 +177,7 @@ class ActorNursery:
 | 
				
			||||||
            self._children.pop(actor.uid)
 | 
					            self._children.pop(actor.uid)
 | 
				
			||||||
            # proc terminated, cancel result waiter
 | 
					            # proc terminated, cancel result waiter
 | 
				
			||||||
            if cancel_scope:
 | 
					            if cancel_scope:
 | 
				
			||||||
                log.warn(
 | 
					                log.warning(
 | 
				
			||||||
                    f"Cancelling existing result waiter task for {actor.uid}")
 | 
					                    f"Cancelling existing result waiter task for {actor.uid}")
 | 
				
			||||||
                cancel_scope.cancel()
 | 
					                cancel_scope.cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -194,7 +194,7 @@ class ActorNursery:
 | 
				
			||||||
                await portal.cancel_actor()
 | 
					                await portal.cancel_actor()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if cs.cancelled_caught:
 | 
					            if cs.cancelled_caught:
 | 
				
			||||||
                log.warn("Result waiter was cancelled")
 | 
					                log.warning("Result waiter was cancelled")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # unblocks when all waiter tasks have completed
 | 
					        # unblocks when all waiter tasks have completed
 | 
				
			||||||
        children = self._children.copy()
 | 
					        children = self._children.copy()
 | 
				
			||||||
| 
						 | 
					@ -213,7 +213,7 @@ class ActorNursery:
 | 
				
			||||||
        directly without any far end graceful ``trio`` cancellation.
 | 
					        directly without any far end graceful ``trio`` cancellation.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        def do_hard_kill(proc):
 | 
					        def do_hard_kill(proc):
 | 
				
			||||||
            log.warn(f"Hard killing subactors {self._children}")
 | 
					            log.warning(f"Hard killing subactors {self._children}")
 | 
				
			||||||
            proc.terminate()
 | 
					            proc.terminate()
 | 
				
			||||||
            # XXX: below doesn't seem to work?
 | 
					            # XXX: below doesn't seem to work?
 | 
				
			||||||
            # send KeyBoardInterrupt (trio abort signal) to sub-actors
 | 
					            # send KeyBoardInterrupt (trio abort signal) to sub-actors
 | 
				
			||||||
| 
						 | 
					@ -228,7 +228,7 @@ class ActorNursery:
 | 
				
			||||||
                    else:
 | 
					                    else:
 | 
				
			||||||
                        if portal is None:  # actor hasn't fully spawned yet
 | 
					                        if portal is None:  # actor hasn't fully spawned yet
 | 
				
			||||||
                            event = self._actor._peer_connected[subactor.uid]
 | 
					                            event = self._actor._peer_connected[subactor.uid]
 | 
				
			||||||
                            log.warn(
 | 
					                            log.warning(
 | 
				
			||||||
                                f"{subactor.uid} wasn't finished spawning?")
 | 
					                                f"{subactor.uid} wasn't finished spawning?")
 | 
				
			||||||
                            await event.wait()
 | 
					                            await event.wait()
 | 
				
			||||||
                            # channel/portal should now be up
 | 
					                            # channel/portal should now be up
 | 
				
			||||||
| 
						 | 
					@ -260,7 +260,7 @@ class ActorNursery:
 | 
				
			||||||
                # else block here might not complete? Should both be shielded?
 | 
					                # else block here might not complete? Should both be shielded?
 | 
				
			||||||
                with trio.open_cancel_scope(shield=True):
 | 
					                with trio.open_cancel_scope(shield=True):
 | 
				
			||||||
                    if etype is trio.Cancelled:
 | 
					                    if etype is trio.Cancelled:
 | 
				
			||||||
                        log.warn(
 | 
					                        log.warning(
 | 
				
			||||||
                            f"{current_actor().uid} was cancelled with {etype}"
 | 
					                            f"{current_actor().uid} was cancelled with {etype}"
 | 
				
			||||||
                            ", cancelling actor nursery")
 | 
					                            ", cancelling actor nursery")
 | 
				
			||||||
                        await self.cancel()
 | 
					                        await self.cancel()
 | 
				
			||||||
| 
						 | 
					@ -276,7 +276,7 @@ class ActorNursery:
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
                    await self.wait()
 | 
					                    await self.wait()
 | 
				
			||||||
                except Exception as err:
 | 
					                except Exception as err:
 | 
				
			||||||
                    log.warn(f"Nursery caught {err}, cancelling")
 | 
					                    log.warning(f"Nursery caught {err}, cancelling")
 | 
				
			||||||
                    await self.cancel()
 | 
					                    await self.cancel()
 | 
				
			||||||
                    raise
 | 
					                    raise
 | 
				
			||||||
                log.debug(f"Nursery teardown complete")
 | 
					                log.debug(f"Nursery teardown complete")
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue