diff --git a/tractor/ipc/_shm.py b/tractor/ipc/_shm.py index 2a5fbb82..08543eca 100644 --- a/tractor/ipc/_shm.py +++ b/tractor/ipc/_shm.py @@ -23,6 +23,7 @@ considered optional within the context of this runtime-library. """ from __future__ import annotations +import hashlib from multiprocessing import shared_memory as shm from multiprocessing.shared_memory import ( # SharedMemory, @@ -106,11 +107,12 @@ class NDToken(Struct, frozen=True): This type is msg safe. ''' - shm_name: str # this servers as a "key" value + shm_name: str # actual OS-level name (may be shortened on macOS) shm_first_index_name: str shm_last_index_name: str dtype_descr: tuple size: int # in struct-array index / row terms + key: str|None = None # original descriptive key (for lookup) # TODO: use nptyping here on dtypes @property @@ -124,6 +126,41 @@ class NDToken(Struct, frozen=True): def as_msg(self): return to_builtins(self) + def __eq__(self, other) -> bool: + ''' + Compare tokens based on shm names and dtype, + ignoring the `key` field. + + The `key` field is only used for lookups, + not for token identity. + + ''' + if not isinstance(other, NDToken): + return False + return ( + self.shm_name == other.shm_name + and self.shm_first_index_name + == other.shm_first_index_name + and self.shm_last_index_name + == other.shm_last_index_name + and self.dtype_descr == other.dtype_descr + and self.size == other.size + ) + + def __hash__(self) -> int: + ''' + Hash based on the same fields used + in `.__eq__()`. + + ''' + return hash(( + self.shm_name, + self.shm_first_index_name, + self.shm_last_index_name, + self.dtype_descr, + self.size, + )) + @classmethod def from_msg(cls, msg: dict) -> NDToken: if isinstance(msg, NDToken): @@ -160,6 +197,32 @@ def get_shm_token(key: str) -> NDToken | None: return _known_tokens.get(key) +def _shorten_key_for_macos(key: str) -> str: + ''' + macOS has a 31 character limit for POSIX shared + memory names. Hash long keys to fit within this + limit while maintaining uniqueness. + + ''' + # macOS shm_open() has a 31 char limit (PSHMNAMLEN) + # format: /t_ = 19 chars, well under limit + if len(key) <= 31: + return key + + key_hash: str = hashlib.sha256( + key.encode() + ).hexdigest()[:16] + short_key = f't_{key_hash}' + + log.debug( + f'Shortened shm key for macOS:\n' + f' original: {key} ({len(key)} chars)\n' + f' shortened: {short_key}' + f' ({len(short_key)} chars)' + ) + return short_key + + def _make_token( key: str, size: int, @@ -171,12 +234,28 @@ def _make_token( to access a shared array. ''' + # On macOS, shorten keys that exceed the + # 31 character limit + if platform.system() == 'Darwin': + shm_name = _shorten_key_for_macos(key) + shm_first = _shorten_key_for_macos( + key + '_first' + ) + shm_last = _shorten_key_for_macos( + key + '_last' + ) + else: + shm_name = key + shm_first = key + '_first' + shm_last = key + '_last' + return NDToken( - shm_name=key, - shm_first_index_name=key + "_first", - shm_last_index_name=key + "_last", + shm_name=shm_name, + shm_first_index_name=shm_first, + shm_last_index_name=shm_last, dtype_descr=tuple(np.dtype(dtype).descr), size=size, + key=key, # store original key for lookup ) @@ -431,9 +510,17 @@ class ShmArray: def destroy(self) -> None: if _USE_POSIX: - # We manually unlink to bypass all the "resource tracker" - # nonsense meant for non-SC systems. - shm_unlink(self._shm.name) + # We manually unlink to bypass all the + # "resource tracker" nonsense meant for + # non-SC systems. + name = self._shm.name + try: + shm_unlink(name) + except FileNotFoundError: + # might be a teardown race here? + log.warning( + f'Shm for {name} already unlinked?' + ) self._first.destroy() self._last.destroy() @@ -463,8 +550,16 @@ def open_shm_ndarray( a = np.zeros(size, dtype=dtype) a['index'] = np.arange(len(a)) + # Create token first to get the (possibly + # shortened) shm name + token = _make_token( + key=key, + size=size, + dtype=dtype, + ) + shm = SharedMemory( - name=key, + name=token.shm_name, create=True, size=a.nbytes ) @@ -476,12 +571,6 @@ def open_shm_ndarray( array[:] = a[:] array.setflags(write=int(not readonly)) - token = _make_token( - key=key, - size=size, - dtype=dtype, - ) - # create single entry arrays for storing an first and last indices first = SharedInt( shm=SharedMemory( @@ -554,13 +643,23 @@ def attach_shm_ndarray( ''' token = NDToken.from_msg(token) - key = token.shm_name + # Use original key for _known_tokens lookup, + # shm_name for OS calls + lookup_key = ( + token.key if token.key + else token.shm_name + ) - if key in _known_tokens: - assert NDToken.from_msg(_known_tokens[key]) == token, "WTF" + if lookup_key in _known_tokens: + assert ( + NDToken.from_msg( + _known_tokens[lookup_key] + ) == token + ), 'WTF' - # XXX: ugh, looks like due to the ``shm_open()`` C api we can't - # actually place files in a subdir, see discussion here: + # XXX: ugh, looks like due to the ``shm_open()`` + # C api we can't actually place files in a subdir, + # see discussion here: # https://stackoverflow.com/a/11103289 # attach to array buffer and view as per dtype @@ -568,7 +667,7 @@ def attach_shm_ndarray( for _ in range(3): try: shm = SharedMemory( - name=key, + name=token.shm_name, create=False, ) break @@ -614,10 +713,10 @@ def attach_shm_ndarray( sha.array # Stash key -> token knowledge for future queries - # via `maybe_opepn_shm_array()` but only after we know - # we can attach. - if key not in _known_tokens: - _known_tokens[key] = token + # via `maybe_open_shm_ndarray()` but only after + # we know we can attach. + if lookup_key not in _known_tokens: + _known_tokens[lookup_key] = token # "close" attached shm on actor teardown tractor.current_actor().lifetime_stack.callback(sha.close)