diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 447b4f3..b90e070 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,6 +3,7 @@ name: CI on: push jobs: + mypy: name: 'MyPy' runs-on: ubuntu-latest @@ -23,23 +24,59 @@ jobs: run: mypy tractor/ --ignore-missing-imports testing: + name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}' timeout-minutes: 9 runs-on: ${{ matrix.os }} + strategy: fail-fast: false matrix: os: [ubuntu-latest, windows-latest] python: ['3.8', '3.9'] spawn_backend: ['trio', 'mp'] + steps: + - name: Checkout uses: actions/checkout@v2 + - name: Setup python uses: actions/setup-python@v2 with: python-version: '${{ matrix.python }}' + - name: Install dependencies run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager + + - name: Run tests + run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs + + testing-msgspec: + # runs py3.9 jobs on all OS's but with optional `msgspec` dep installed + name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }} - msgspec' + timeout-minutes: 10 + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest] + python: ['3.9'] + spawn_backend: ['trio', 'mp'] + + steps: + + - name: Checkout + uses: actions/checkout@v2 + + - name: Setup python + uses: actions/setup-python@v2 + with: + python-version: '${{ matrix.python }}' + + - name: Install dependencies + run: pip install -U .[msgspec] -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager + - name: Run tests run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs diff --git a/docs/README.rst b/docs/README.rst index 1199d3d..15abc89 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -24,8 +24,9 @@ Features - Builtin IPC streaming APIs with task fan-out broadcasting - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ - Support for a swappable, OS specific, process spawning layer -- A modular transport stack, allowing for custom serialization, - communications protocols, and environment specific IPC primitives +- A modular transport stack, allowing for custom serialization (eg. + `msgspec`_), communications protocols, and environment specific IPC + primitives - `structured concurrency`_ from the ground up @@ -322,6 +323,12 @@ From PyPi:: pip install tractor +To try out the (optionally) faster `msgspec`_ codec instead of the +default ``msgpack`` lib:: + + pip install tractor[msgspec] + + From git:: pip install git+git://github.com/goodboy/tractor.git @@ -394,7 +401,8 @@ Help us push toward the future. - (Soon to land) ``asyncio`` support allowing for "infected" actors where `trio` drives the `asyncio` scheduler via the astounding "`guest mode`_" -- Typed messaging protocols (ex. via ``msgspec``) +- Typed messaging protocols (ex. via ``msgspec``, see `#36 + `_) - Erlang-style supervisors via composed context managers @@ -427,6 +435,7 @@ channel`_! .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _trio-parallel: https://github.com/richardsheridan/trio-parallel +.. _msgspec: https://jcristharif.com/msgspec/ .. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square diff --git a/newsfragments/214.feature.rst b/newsfragments/214.feature.rst new file mode 100644 index 0000000..1247536 --- /dev/null +++ b/newsfragments/214.feature.rst @@ -0,0 +1,9 @@ +Add optional `msgspec `_ support over +TCP streams as an alernative, faster MessagePack codec. + +This get's us moving toward typed messaging/IPC protocols. Further, +``msgspec`` structs may be a valid tool to start for formalizing our "SC +dialog un-protocol" messages as described in `#36 +`_`. + + diff --git a/setup.py b/setup.py index 36294d3..d233ecd 100755 --- a/setup.py +++ b/setup.py @@ -44,6 +44,10 @@ setup( 'async_generator', 'trio_typing', + # tooling + 'tricycle', + 'trio_typing', + # tooling 'colorlog', 'wrapt', @@ -53,6 +57,12 @@ setup( 'msgpack', ], + extras_require={ + + # serialization + 'msgspec': ["msgspec >= 0.3.2'; python_version >= '3.9'"], + + }, tests_require=['pytest'], python_requires=">=3.8", keywords=[ diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 383608e..2b2996f 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -42,7 +42,7 @@ async def test_reg_then_unreg(arb_addr): await trio.sleep(0.1) assert uid not in aportal.actor._registry - sockaddrs = actor._registry[uid] + sockaddrs = actor._registry.get(uid) assert not sockaddrs @@ -136,7 +136,7 @@ async def spawn_and_check_registry( if actor.is_arbiter: async def get_reg(): - return actor._registry + return await actor.get_registry() extra = 1 # arbiter is local root actor else: @@ -187,13 +187,12 @@ async def spawn_and_check_registry( await cancel(use_signal) finally: - with trio.CancelScope(shield=True): - await trio.sleep(0.5) + await trio.sleep(0.5) - # all subactors should have de-registered - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + # all subactors should have de-registered + registry = await get_reg() + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) @@ -277,7 +276,9 @@ async def close_chans_before_nursery( # TODO: compact this back as was in last commit once # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from(stream_forever) as agen1: + async with portal1.open_stream_from( + stream_forever + ) as agen1: async with portal2.open_stream_from( stream_forever ) as agen2: @@ -293,8 +294,9 @@ async def close_chans_before_nursery( # reliably triggered by an external SIGINT. # tractor.current_actor()._root_nursery.cancel_scope.cancel() - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block + # XXX: THIS IS THE KEY THING that + # happens **before** exiting the + # actor nursery block # also kill off channels cuz why not await agen1.aclose() diff --git a/tests/test_spawning.py b/tests/test_spawning.py index f9eeb16..b1110fe 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -1,6 +1,7 @@ """ Spawning basics """ +from typing import Dict, Tuple import pytest import trio @@ -11,7 +12,11 @@ from conftest import tractor_test data_to_pass_down = {'doggy': 10, 'kitty': 4} -async def spawn(is_arbiter, data, arb_addr): +async def spawn( + is_arbiter: bool, + data: Dict, + arb_addr: Tuple[str, int], +): namespaces = [__name__] await trio.sleep(0.1) diff --git a/tractor/_actor.py b/tractor/_actor.py index f84a597..dcbe541 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -317,7 +317,8 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = arbiter_addr + + self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -421,7 +422,7 @@ class Actor: """ self._no_more_peers = trio.Event() # unset - chan = Channel(stream=stream) + chan = Channel.from_stream(stream) log.runtime(f"New connection to us {chan}") # send/receive initial handshake response @@ -429,7 +430,10 @@ class Actor: uid = await self._do_handshake(chan) except ( + # we need this for ``msgspec`` for some reason? + # for now, it's been put in the stream backend. # trio.BrokenResourceError, + # trio.ClosedResourceError, TransportClosed, ): @@ -615,6 +619,7 @@ class Actor: # ``scope = Nursery.start()`` task_status.started(loop_cs) async for msg in chan: + if msg is None: # loop terminate sentinel log.debug( @@ -775,6 +780,7 @@ class Actor: if self._spawn_method == "trio": # Receive runtime state from our parent + parent_data: dict[str, Any] parent_data = await chan.recv() log.debug( "Received state from parent:\n" @@ -790,7 +796,16 @@ class Actor: _state._runtime_vars.update(rvs) for attr, value in parent_data.items(): - setattr(self, attr, value) + + if attr == '_arb_addr': + # XXX: ``msgspec`` doesn't support serializing tuples + # so just cash manually here since it's what our + # internals expect. + value = tuple(value) if value else None + self._arb_addr = value + + else: + setattr(self, attr, value) return chan, accept_addr @@ -804,21 +819,25 @@ class Actor: async def _async_main( self, accept_addr: Optional[Tuple[str, int]] = None, + # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to # the child instead of relaying it over the connect-back # channel). Once that backend is removed we can likely just - # change this so a simple ``is_subactor: bool`` which will + # change this to a simple ``is_subactor: bool`` which will # be False when running as root actor and True when as # a subactor. parent_addr: Optional[Tuple[str, int]] = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Start the channel server, maybe connect back to the parent, and + """ + Start the channel server, maybe connect back to the parent, and start the main task. A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. + """ registered_with_arbiter = False try: @@ -1162,6 +1181,7 @@ class Actor: async def _do_handshake( self, chan: Channel + ) -> Tuple[str, str]: """Exchange (name, UUIDs) identifiers as the first communication step. @@ -1169,12 +1189,13 @@ class Actor: parlance. """ await chan.send(self.uid) - uid: Tuple[str, str] = await chan.recv() + value = await chan.recv() + uid: Tuple[str, str] = (str(value[0]), str(value[1])) if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") - chan.uid = uid + chan.uid = str(uid[0]), str(uid[1]) log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid @@ -1191,8 +1212,13 @@ class Arbiter(Actor): is_arbiter = True def __init__(self, *args, **kwargs): - self._registry = defaultdict(list) + + self._registry: Dict[ + Tuple[str, str], + Tuple[str, int], + ] = {} self._waiters = {} + super().__init__(*args, **kwargs) async def find_actor(self, name: str) -> Optional[Tuple[str, int]]: @@ -1204,9 +1230,11 @@ class Arbiter(Actor): async def get_registry( self - ) -> Dict[str, Tuple[str, str]]: - """Return current name registry. - """ + ) -> Dict[Tuple[str, str], Tuple[str, int]]: + '''Return current name registry. + + This method is async to allow for cross-actor invocation. + ''' # NOTE: requires ``strict_map_key=False`` to the msgpack # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): @@ -1214,13 +1242,14 @@ class Arbiter(Actor): return self._registry async def wait_for_actor( - self, name: str + self, + name: str, ) -> List[Tuple[str, int]]: - """Wait for a particular actor to register. + '''Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently registered. - """ + ''' sockaddrs = [] for (aname, _), sockaddr in self._registry.items(): @@ -1237,10 +1266,13 @@ class Arbiter(Actor): return sockaddrs async def register_actor( - self, uid: Tuple[str, str], sockaddr: Tuple[str, int] + self, + uid: Tuple[str, str], + sockaddr: Tuple[str, int] + ) -> None: - name, uuid = uid - self._registry[uid] = sockaddr + uid = name, uuid = (str(uid[0]), str(uid[1])) + self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1])) # pop and signal all waiter events events = self._waiters.pop(name, ()) @@ -1249,5 +1281,9 @@ class Arbiter(Actor): if isinstance(event, trio.Event): event.set() - async def unregister_actor(self, uid: Tuple[str, str]) -> None: + async def unregister_actor( + self, + uid: Tuple[str, str] + ) -> None: + uid = (str(uid[0]), str(uid[1])) self._registry.pop(uid) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 08057e9..28bef97 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -2,11 +2,16 @@ Inter-process comms abstractions """ +from __future__ import annotations import platform +import struct import typing -from typing import Any, Tuple, Optional -from functools import partial +from typing import ( + Any, Tuple, Optional, + Type, Protocol, TypeVar +) +from tricycle import BufferedReceiveStream import msgpack import trio from async_generator import asynccontextmanager @@ -17,14 +22,54 @@ log = get_logger(__name__) _is_windows = platform.system() == 'Windows' +log = get_logger(__name__) -# :eyeroll: -try: - import msgpack_numpy - Unpacker = msgpack_numpy.Unpacker -except ImportError: - # just plain ``msgpack`` requires tweaking key settings - Unpacker = partial(msgpack.Unpacker, strict_map_key=False) + +def get_stream_addrs(stream: trio.SocketStream) -> Tuple: + # should both be IP sockets + lsockname = stream.socket.getsockname() + rsockname = stream.socket.getpeername() + return ( + tuple(lsockname[:2]), + tuple(rsockname[:2]), + ) + + +MsgType = TypeVar("MsgType") + +# TODO: consider using a generic def and indexing with our eventual +# msg definition/types? +# - https://docs.python.org/3/library/typing.html#typing.Protocol +# - https://jcristharif.com/msgspec/usage.html#structs + + +class MsgTransport(Protocol[MsgType]): + + stream: trio.SocketStream + + def __init__(self, stream: trio.SocketStream) -> None: + ... + + # XXX: should this instead be called `.sendall()`? + async def send(self, msg: MsgType) -> None: + ... + + async def recv(self) -> MsgType: + ... + + def __aiter__(self) -> MsgType: + ... + + def connected(self) -> bool: + ... + + @property + def laddr(self) -> Tuple[str, int]: + ... + + @property + def raddr(self) -> Tuple[str, int]: + ... class MsgpackTCPStream: @@ -40,26 +85,21 @@ class MsgpackTCPStream: self.stream = stream assert self.stream.socket + # should both be IP sockets - lsockname = stream.socket.getsockname() - assert isinstance(lsockname, tuple) - self._laddr = lsockname[:2] - rsockname = stream.socket.getpeername() - assert isinstance(rsockname, tuple) - self._raddr = rsockname[:2] + self._laddr, self._raddr = get_stream_addrs(stream) - # start and seed first entry to read loop + # create read loop instance self._agen = self._iter_packets() - # self._agen.asend(None) is None - self._send_lock = trio.StrictFIFOLock() async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ - unpacker = Unpacker( + unpacker = msgpack.Unpacker( raw=False, use_list=False, + strict_map_key=False ) while True: try: @@ -107,11 +147,11 @@ class MsgpackTCPStream: def raddr(self) -> Tuple[Any, ...]: return self._raddr - # XXX: should this instead be called `.sendall()`? - async def send(self, data: Any) -> None: + async def send(self, msg: Any) -> None: async with self._send_lock: return await self.stream.send_all( - msgpack.dumps(data, use_bin_type=True)) + msgpack.dumps(msg, use_bin_type=True) + ) async def recv(self) -> Any: return await self._agen.asend(None) @@ -123,36 +163,173 @@ class MsgpackTCPStream: return self.stream.socket.fileno() != -1 -class Channel: - """An inter-process channel for communication between (remote) actors. +class MsgspecTCPStream(MsgpackTCPStream): + '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data + using ``msgspec``. - Currently the only supported transport is a ``trio.SocketStream``. - """ + ''' def __init__( self, - destaddr: Optional[Tuple[str, int]] = None, - on_reconnect: typing.Callable[..., typing.Awaitable] = None, - auto_reconnect: bool = False, - stream: trio.SocketStream = None, # expected to be active + stream: trio.SocketStream, + prefix_size: int = 4, ) -> None: - self._recon_seq = on_reconnect - self._autorecon = auto_reconnect - self.msgstream: Optional[MsgpackTCPStream] = MsgpackTCPStream( - stream) if stream else None - if self.msgstream and destaddr: - raise ValueError( - f"A stream was provided with local addr {self.laddr}" - ) - self._destaddr = self.msgstream.raddr if self.msgstream else destaddr + import msgspec + + super().__init__(stream) + self.recv_stream = BufferedReceiveStream(transport_stream=stream) + self.prefix_size = prefix_size + + # TODO: struct aware messaging coders + self.encode = msgspec.Encoder().encode + self.decode = msgspec.Decoder().decode # dict[str, Any]) + + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: + '''Yield packets from the underlying stream. + + ''' + import msgspec # noqa + last_decode_failed: bool = False + + while True: + try: + header = await self.recv_stream.receive_exactly(4) + + except ( + ValueError, + + # not sure entirely why we need this but without it we + # seem to be getting racy failures here on + # arbiter/registry name subs.. + trio.BrokenResourceError, + ): + raise TransportClosed( + f'transport {self} was already closed prior ro read' + ) + + if header == b'': + raise TransportClosed( + f'transport {self} was already closed prior ro read' + ) + + size, = struct.unpack(" None: + async with self._send_lock: + + bytes_data: bytes = self.encode(msg) + + # supposedly the fastest says, + # https://stackoverflow.com/a/54027962 + size: bytes = struct.pack(" Type[MsgTransport]: + + return { + ('msgpack', 'tcp'): MsgpackTCPStream, + ('msgspec', 'tcp'): MsgspecTCPStream, + }[key] + + +class Channel: + '''An inter-process channel for communication between (remote) actors. + + Currently the only supported transport is a ``trio.SocketStream``. + + ''' + def __init__( + + self, + destaddr: Optional[Tuple[str, int]], + + msg_transport_type_key: Tuple[str, str] = ('msgpack', 'tcp'), + + # TODO: optional reconnection support? + # auto_reconnect: bool = False, + # on_reconnect: typing.Callable[..., typing.Awaitable] = None, + + ) -> None: + + # self._recon_seq = on_reconnect + # self._autorecon = auto_reconnect + + # TODO: maybe expose this through the nursery api? + try: + # if installed load the msgspec transport since it's faster + import msgspec # noqa + msg_transport_type_key = ('msgspec', 'tcp') + except ImportError: + pass + + self._destaddr = destaddr + self._transport_key = msg_transport_type_key + + # Either created in ``.connect()`` or passed in by + # user in ``.from_stream()``. + self._stream: Optional[trio.SocketStream] = None + self.msgstream: Optional[MsgTransport] = None + # set after handshake - always uid of far end self.uid: Optional[Tuple[str, str]] = None + # set if far end actor errors internally self._exc: Optional[Exception] = None self._agen = self._aiter_recv() - self._closed: bool = False + @classmethod + def from_stream( + cls, + stream: trio.SocketStream, + **kwargs, + + ) -> Channel: + + src, dst = get_stream_addrs(stream) + chan = Channel(destaddr=dst, **kwargs) + + # set immediately here from provided instance + chan._stream = stream + chan.set_msg_transport(stream) + return chan + + def set_msg_transport( + self, + stream: trio.SocketStream, + type_key: Optional[Tuple[str, str]] = None, + + ) -> MsgTransport: + type_key = type_key or self._transport_key + self.msgstream = get_msg_transport(type_key)(stream) + return self.msgstream + def __repr__(self) -> str: if self.msgstream: return repr( @@ -161,20 +338,19 @@ class Channel: return object.__repr__(self) @property - def laddr(self) -> Optional[Tuple[Any, ...]]: + def laddr(self) -> Optional[Tuple[str, int]]: return self.msgstream.laddr if self.msgstream else None @property - def raddr(self) -> Optional[Tuple[Any, ...]]: + def raddr(self) -> Optional[Tuple[str, int]]: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[Any, ...] = None, **kwargs - ) -> trio.SocketStream: + ) -> MsgTransport: if self.connected(): raise RuntimeError("channel is already connected?") @@ -186,12 +362,12 @@ class Channel: *destaddr, **kwargs ) - self.msgstream = MsgpackTCPStream(stream) + msgstream = self.set_msg_transport(stream) log.transport( - f'Opened channel to peer {self.laddr} -> {self.raddr}' + f'Opened channel[{type(msgstream)}]: {self.laddr} -> {self.raddr}' ) - return stream + return msgstream async def send(self, item: Any) -> None: @@ -202,16 +378,15 @@ class Channel: async def recv(self) -> Any: assert self.msgstream + return await self.msgstream.recv() - try: - return await self.msgstream.recv() - - except trio.BrokenResourceError: - if self._autorecon: - await self._reconnect() - return await self.recv() - - raise + # try: + # return await self.msgstream.recv() + # except trio.BrokenResourceError: + # if self._autorecon: + # await self._reconnect() + # return await self.recv() + # raise async def aclose(self) -> None: @@ -233,34 +408,36 @@ class Channel: def __aiter__(self): return self._agen - async def _reconnect(self) -> None: - """Handle connection failures by polling until a reconnect can be - established. - """ - down = False - while True: - try: - with trio.move_on_after(3) as cancel_scope: - await self.connect() - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.transport( - "Reconnect timed out after 3 seconds, retrying...") - continue - else: - log.transport("Stream connection re-established!") - # run any reconnection sequence - on_recon = self._recon_seq - if on_recon: - await on_recon(self) - break - except (OSError, ConnectionRefusedError): - if not down: - down = True - log.transport( - f"Connection to {self.raddr} went down, waiting" - " for re-establishment") - await trio.sleep(1) + # async def _reconnect(self) -> None: + # """Handle connection failures by polling until a reconnect can be + # established. + # """ + # down = False + # while True: + # try: + # with trio.move_on_after(3) as cancel_scope: + # await self.connect() + # cancelled = cancel_scope.cancelled_caught + # if cancelled: + # log.transport( + # "Reconnect timed out after 3 seconds, retrying...") + # continue + # else: + # log.transport("Stream connection re-established!") + + # # TODO: run any reconnection sequence + # # on_recon = self._recon_seq + # # if on_recon: + # # await on_recon(self) + + # break + # except (OSError, ConnectionRefusedError): + # if not down: + # down = True + # log.transport( + # f"Connection to {self.raddr} went down, waiting" + # " for re-establishment") + # await trio.sleep(1) async def _aiter_recv( self @@ -279,16 +456,14 @@ class Channel: # await self.msgstream.send(sent) except trio.BrokenResourceError: - if not self._autorecon: - raise + # if not self._autorecon: + raise await self.aclose() - if self._autorecon: # attempt reconnect - await self._reconnect() - continue - else: - return + # if self._autorecon: # attempt reconnect + # await self._reconnect() + # continue def connected(self) -> bool: return self.msgstream.connected() if self.msgstream else False diff --git a/tractor/_root.py b/tractor/_root.py index 9eba909..b153755 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -21,8 +21,8 @@ from ._exceptions import is_multi_cancelled # set at startup and after forks -_default_arbiter_host = '127.0.0.1' -_default_arbiter_port = 1616 +_default_arbiter_host: str = '127.0.0.1' +_default_arbiter_port: int = 1616 logger = log.get_logger('tractor') @@ -32,7 +32,7 @@ logger = log.get_logger('tractor') async def open_root_actor( # defaults are above - arbiter_addr: Tuple[str, int] = ( + arbiter_addr: Optional[Tuple[str, int]] = ( _default_arbiter_host, _default_arbiter_port, ), @@ -97,7 +97,7 @@ async def open_root_actor( arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, - _default_arbiter_port + _default_arbiter_port, ) loglevel = loglevel or log.get_loglevel()