forked from goodboy/tractor
				
			Drop all .statespace refs; it was a silly idea
							parent
							
								
									0eba5f4708
								
							
						
					
					
						commit
						4bf9b27f57
					
				| 
						 | 
					@ -28,14 +28,18 @@ def is_even(i):
 | 
				
			||||||
    return i % 2 == 0
 | 
					    return i % 2 == 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# placeholder for topics getter
 | 
				
			||||||
 | 
					_get_topics = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@tractor.msg.pub
 | 
					@tractor.msg.pub
 | 
				
			||||||
async def pubber(get_topics, seed=10):
 | 
					async def pubber(get_topics, seed=10):
 | 
				
			||||||
    ss = tractor.current_actor().statespace
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    for i in cycle(range(seed)):
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # ensure topic subscriptions are as expected
 | 
					    # ensure topic subscriptions are as expected
 | 
				
			||||||
        ss['get_topics'] = get_topics
 | 
					    global _get_topics
 | 
				
			||||||
 | 
					    _get_topics = get_topics
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for i in cycle(range(seed)):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        yield {'even' if is_even(i) else 'odd': i}
 | 
					        yield {'even' if is_even(i) else 'odd': i}
 | 
				
			||||||
        await trio.sleep(0.1)
 | 
					        await trio.sleep(0.1)
 | 
				
			||||||
| 
						 | 
					@ -151,8 +155,9 @@ def test_multi_actor_subs_arbiter_pub(
 | 
				
			||||||
):
 | 
					):
 | 
				
			||||||
    """Try out the neato @pub decorator system.
 | 
					    """Try out the neato @pub decorator system.
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
 | 
					    global _get_topics
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def main():
 | 
					    async def main():
 | 
				
			||||||
        ss = tractor.current_actor().statespace
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with tractor.open_nursery() as n:
 | 
					        async with tractor.open_nursery() as n:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -183,11 +188,12 @@ def test_multi_actor_subs_arbiter_pub(
 | 
				
			||||||
                pass
 | 
					                pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if pub_actor == 'arbiter':
 | 
					            if pub_actor == 'arbiter':
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # wait for publisher task to be spawned in a local RPC task
 | 
					                # wait for publisher task to be spawned in a local RPC task
 | 
				
			||||||
                while not ss.get('get_topics'):
 | 
					                while _get_topics is None:
 | 
				
			||||||
                    await trio.sleep(0.1)
 | 
					                    await trio.sleep(0.1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                get_topics = ss.get('get_topics')
 | 
					                get_topics = _get_topics
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                assert 'even' in get_topics()
 | 
					                assert 'even' in get_topics()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -194,7 +194,6 @@ class Actor:
 | 
				
			||||||
        name: str,
 | 
					        name: str,
 | 
				
			||||||
        *,
 | 
					        *,
 | 
				
			||||||
        rpc_module_paths: List[str] = [],
 | 
					        rpc_module_paths: List[str] = [],
 | 
				
			||||||
        statespace: Optional[Dict[str, Any]] = None,
 | 
					 | 
				
			||||||
        uid: str = None,
 | 
					        uid: str = None,
 | 
				
			||||||
        loglevel: str = None,
 | 
					        loglevel: str = None,
 | 
				
			||||||
        arbiter_addr: Optional[Tuple[str, int]] = None,
 | 
					        arbiter_addr: Optional[Tuple[str, int]] = None,
 | 
				
			||||||
| 
						 | 
					@ -226,7 +225,6 @@ class Actor:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # TODO: consider making this a dynamically defined
 | 
					        # TODO: consider making this a dynamically defined
 | 
				
			||||||
        # @dataclass once we get py3.7
 | 
					        # @dataclass once we get py3.7
 | 
				
			||||||
        self.statespace = statespace or {}
 | 
					 | 
				
			||||||
        self.loglevel = loglevel
 | 
					        self.loglevel = loglevel
 | 
				
			||||||
        self._arb_addr = arbiter_addr
 | 
					        self._arb_addr = arbiter_addr
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -248,7 +248,6 @@ async def new_proc(
 | 
				
			||||||
                await chan.send({
 | 
					                await chan.send({
 | 
				
			||||||
                    "_parent_main_data": subactor._parent_main_data,
 | 
					                    "_parent_main_data": subactor._parent_main_data,
 | 
				
			||||||
                    "rpc_module_paths": subactor.rpc_module_paths,
 | 
					                    "rpc_module_paths": subactor.rpc_module_paths,
 | 
				
			||||||
                    "statespace": subactor.statespace,
 | 
					 | 
				
			||||||
                    "_arb_addr": subactor._arb_addr,
 | 
					                    "_arb_addr": subactor._arb_addr,
 | 
				
			||||||
                    "bind_host": bind_addr[0],
 | 
					                    "bind_host": bind_addr[0],
 | 
				
			||||||
                    "bind_port": bind_addr[1],
 | 
					                    "bind_port": bind_addr[1],
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -52,7 +52,6 @@ class ActorNursery:
 | 
				
			||||||
        name: str,
 | 
					        name: str,
 | 
				
			||||||
        *,
 | 
					        *,
 | 
				
			||||||
        bind_addr: Tuple[str, int] = _default_bind_addr,
 | 
					        bind_addr: Tuple[str, int] = _default_bind_addr,
 | 
				
			||||||
        # statespace: Optional[Dict[str, Any]] = None,
 | 
					 | 
				
			||||||
        rpc_module_paths: List[str] = None,
 | 
					        rpc_module_paths: List[str] = None,
 | 
				
			||||||
        loglevel: str = None,  # set log level per subactor
 | 
					        loglevel: str = None,  # set log level per subactor
 | 
				
			||||||
        nursery: trio.Nursery = None,
 | 
					        nursery: trio.Nursery = None,
 | 
				
			||||||
| 
						 | 
					@ -67,7 +66,6 @@ class ActorNursery:
 | 
				
			||||||
            name,
 | 
					            name,
 | 
				
			||||||
            # modules allowed to invoked funcs from
 | 
					            # modules allowed to invoked funcs from
 | 
				
			||||||
            rpc_module_paths=rpc_module_paths or [],
 | 
					            rpc_module_paths=rpc_module_paths or [],
 | 
				
			||||||
            # statespace=statespace,  # global proc state vars
 | 
					 | 
				
			||||||
            loglevel=loglevel,
 | 
					            loglevel=loglevel,
 | 
				
			||||||
            arbiter_addr=current_actor()._arb_addr,
 | 
					            arbiter_addr=current_actor()._arb_addr,
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
| 
						 | 
					@ -99,7 +97,6 @@ class ActorNursery:
 | 
				
			||||||
        name: Optional[str] = None,
 | 
					        name: Optional[str] = None,
 | 
				
			||||||
        bind_addr: Tuple[str, int] = _default_bind_addr,
 | 
					        bind_addr: Tuple[str, int] = _default_bind_addr,
 | 
				
			||||||
        rpc_module_paths: Optional[List[str]] = None,
 | 
					        rpc_module_paths: Optional[List[str]] = None,
 | 
				
			||||||
        # statespace: Dict[str, Any] = None,
 | 
					 | 
				
			||||||
        loglevel: str = None,  # set log level per subactor
 | 
					        loglevel: str = None,  # set log level per subactor
 | 
				
			||||||
        **kwargs,  # explicit args to ``fn``
 | 
					        **kwargs,  # explicit args to ``fn``
 | 
				
			||||||
    ) -> Portal:
 | 
					    ) -> Portal:
 | 
				
			||||||
| 
						 | 
					@ -120,7 +117,6 @@ class ActorNursery:
 | 
				
			||||||
            name,
 | 
					            name,
 | 
				
			||||||
            rpc_module_paths=[mod_path] + (rpc_module_paths or []),
 | 
					            rpc_module_paths=[mod_path] + (rpc_module_paths or []),
 | 
				
			||||||
            bind_addr=bind_addr,
 | 
					            bind_addr=bind_addr,
 | 
				
			||||||
            # statespace=statespace,
 | 
					 | 
				
			||||||
            loglevel=loglevel,
 | 
					            loglevel=loglevel,
 | 
				
			||||||
            # use the run_in_actor nursery
 | 
					            # use the run_in_actor nursery
 | 
				
			||||||
            nursery=self._ria_nursery,
 | 
					            nursery=self._ria_nursery,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -90,6 +90,9 @@ def modify_subs(topics2ctxs, topics, ctx):
 | 
				
			||||||
            topics2ctxs.pop(topic)
 | 
					            topics2ctxs.pop(topic)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					_pub_state: Dict[str, dict] = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def pub(
 | 
					def pub(
 | 
				
			||||||
    wrapped: typing.Callable = None,
 | 
					    wrapped: typing.Callable = None,
 | 
				
			||||||
    *,
 | 
					    *,
 | 
				
			||||||
| 
						 | 
					@ -175,12 +178,15 @@ def pub(
 | 
				
			||||||
    subscribers. If you are ok to have a new task running for every call
 | 
					    subscribers. If you are ok to have a new task running for every call
 | 
				
			||||||
    to ``pub_service()`` then probably don't need this.
 | 
					    to ``pub_service()`` then probably don't need this.
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
 | 
					    global _pub_state
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # handle the decorator not called with () case
 | 
					    # handle the decorator not called with () case
 | 
				
			||||||
    if wrapped is None:
 | 
					    if wrapped is None:
 | 
				
			||||||
        return partial(pub, tasks=tasks)
 | 
					        return partial(pub, tasks=tasks)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {
 | 
					    task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {
 | 
				
			||||||
        None: trio.StrictFIFOLock()}
 | 
					        None: trio.StrictFIFOLock()}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for name in tasks:
 | 
					    for name in tasks:
 | 
				
			||||||
        task2lock[name] = trio.StrictFIFOLock()
 | 
					        task2lock[name] = trio.StrictFIFOLock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -203,11 +209,10 @@ def pub(
 | 
				
			||||||
                    f"argument with a falue from {tasks}")
 | 
					                    f"argument with a falue from {tasks}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            topics = set(topics)
 | 
					            topics = set(topics)
 | 
				
			||||||
            ss = current_actor().statespace
 | 
					            lockmap = _pub_state.setdefault('_pubtask2lock', task2lock)
 | 
				
			||||||
            lockmap = ss.setdefault('_pubtask2lock', task2lock)
 | 
					 | 
				
			||||||
            lock = lockmap[task_name]
 | 
					            lock = lockmap[task_name]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            all_subs = ss.setdefault('_subs', {})
 | 
					            all_subs = _pub_state.setdefault('_subs', {})
 | 
				
			||||||
            topics2ctxs = all_subs.setdefault(task_name, {})
 | 
					            topics2ctxs = all_subs.setdefault(task_name, {})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue