Port macOS shm 31-char name limit hack from `piker`
Adapt the `PSHMNAMLEN` fix from `piker.data._sharedmem` (orig commit 96fb79ec thx @dnks!) to `tractor.ipc._shm` accounting for the module-local differences: - Add `hashlib` import for sha256 key hashing - Add `key: str|None` field to `NDToken` for storing the original descriptive key separate from the (possibly shortened) OS-level `shm_name` - Add `__eq__()`/`__hash__()` to `NDToken` excluding the `key` field from identity comparison - Add `_shorten_key_for_macos()` using `t_` prefix (vs piker's `p_`) with 16 hex chars of sha256 - Use `platform.system() == 'Darwin'` in `_make_token()` (tractor already imports the `platform` module vs piker's `sys.platform`) - Wrap `shm_unlink()` in `ShmArray.destroy()` with `try/except FileNotFoundError` for teardown races (was already done in `SharedInt.destroy()`) - Move token creation before `SharedMemory()` alloc in `open_shm_ndarray()` so `token.shm_name` is used as the OS-level name - Use `lookup_key` pattern in `attach_shm_ndarray()` to decouple `_known_tokens` dict key from OS name (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codens_aware
parent
7bcd7aca2b
commit
01c0db651a
|
|
@ -23,6 +23,7 @@ considered optional within the context of this runtime-library.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
import hashlib
|
||||||
from multiprocessing import shared_memory as shm
|
from multiprocessing import shared_memory as shm
|
||||||
from multiprocessing.shared_memory import (
|
from multiprocessing.shared_memory import (
|
||||||
# SharedMemory,
|
# SharedMemory,
|
||||||
|
|
@ -106,11 +107,12 @@ class NDToken(Struct, frozen=True):
|
||||||
This type is msg safe.
|
This type is msg safe.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
shm_name: str # this servers as a "key" value
|
shm_name: str # actual OS-level name (may be shortened on macOS)
|
||||||
shm_first_index_name: str
|
shm_first_index_name: str
|
||||||
shm_last_index_name: str
|
shm_last_index_name: str
|
||||||
dtype_descr: tuple
|
dtype_descr: tuple
|
||||||
size: int # in struct-array index / row terms
|
size: int # in struct-array index / row terms
|
||||||
|
key: str|None = None # original descriptive key (for lookup)
|
||||||
|
|
||||||
# TODO: use nptyping here on dtypes
|
# TODO: use nptyping here on dtypes
|
||||||
@property
|
@property
|
||||||
|
|
@ -124,6 +126,41 @@ class NDToken(Struct, frozen=True):
|
||||||
def as_msg(self):
|
def as_msg(self):
|
||||||
return to_builtins(self)
|
return to_builtins(self)
|
||||||
|
|
||||||
|
def __eq__(self, other) -> bool:
|
||||||
|
'''
|
||||||
|
Compare tokens based on shm names and dtype,
|
||||||
|
ignoring the `key` field.
|
||||||
|
|
||||||
|
The `key` field is only used for lookups,
|
||||||
|
not for token identity.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if not isinstance(other, NDToken):
|
||||||
|
return False
|
||||||
|
return (
|
||||||
|
self.shm_name == other.shm_name
|
||||||
|
and self.shm_first_index_name
|
||||||
|
== other.shm_first_index_name
|
||||||
|
and self.shm_last_index_name
|
||||||
|
== other.shm_last_index_name
|
||||||
|
and self.dtype_descr == other.dtype_descr
|
||||||
|
and self.size == other.size
|
||||||
|
)
|
||||||
|
|
||||||
|
def __hash__(self) -> int:
|
||||||
|
'''
|
||||||
|
Hash based on the same fields used
|
||||||
|
in `.__eq__()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return hash((
|
||||||
|
self.shm_name,
|
||||||
|
self.shm_first_index_name,
|
||||||
|
self.shm_last_index_name,
|
||||||
|
self.dtype_descr,
|
||||||
|
self.size,
|
||||||
|
))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_msg(cls, msg: dict) -> NDToken:
|
def from_msg(cls, msg: dict) -> NDToken:
|
||||||
if isinstance(msg, NDToken):
|
if isinstance(msg, NDToken):
|
||||||
|
|
@ -160,6 +197,32 @@ def get_shm_token(key: str) -> NDToken | None:
|
||||||
return _known_tokens.get(key)
|
return _known_tokens.get(key)
|
||||||
|
|
||||||
|
|
||||||
|
def _shorten_key_for_macos(key: str) -> str:
|
||||||
|
'''
|
||||||
|
macOS has a 31 character limit for POSIX shared
|
||||||
|
memory names. Hash long keys to fit within this
|
||||||
|
limit while maintaining uniqueness.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# macOS shm_open() has a 31 char limit (PSHMNAMLEN)
|
||||||
|
# format: /t_<hash16> = 19 chars, well under limit
|
||||||
|
if len(key) <= 31:
|
||||||
|
return key
|
||||||
|
|
||||||
|
key_hash: str = hashlib.sha256(
|
||||||
|
key.encode()
|
||||||
|
).hexdigest()[:16]
|
||||||
|
short_key = f't_{key_hash}'
|
||||||
|
|
||||||
|
log.debug(
|
||||||
|
f'Shortened shm key for macOS:\n'
|
||||||
|
f' original: {key} ({len(key)} chars)\n'
|
||||||
|
f' shortened: {short_key}'
|
||||||
|
f' ({len(short_key)} chars)'
|
||||||
|
)
|
||||||
|
return short_key
|
||||||
|
|
||||||
|
|
||||||
def _make_token(
|
def _make_token(
|
||||||
key: str,
|
key: str,
|
||||||
size: int,
|
size: int,
|
||||||
|
|
@ -171,12 +234,28 @@ def _make_token(
|
||||||
to access a shared array.
|
to access a shared array.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# On macOS, shorten keys that exceed the
|
||||||
|
# 31 character limit
|
||||||
|
if platform.system() == 'Darwin':
|
||||||
|
shm_name = _shorten_key_for_macos(key)
|
||||||
|
shm_first = _shorten_key_for_macos(
|
||||||
|
key + '_first'
|
||||||
|
)
|
||||||
|
shm_last = _shorten_key_for_macos(
|
||||||
|
key + '_last'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
shm_name = key
|
||||||
|
shm_first = key + '_first'
|
||||||
|
shm_last = key + '_last'
|
||||||
|
|
||||||
return NDToken(
|
return NDToken(
|
||||||
shm_name=key,
|
shm_name=shm_name,
|
||||||
shm_first_index_name=key + "_first",
|
shm_first_index_name=shm_first,
|
||||||
shm_last_index_name=key + "_last",
|
shm_last_index_name=shm_last,
|
||||||
dtype_descr=tuple(np.dtype(dtype).descr),
|
dtype_descr=tuple(np.dtype(dtype).descr),
|
||||||
size=size,
|
size=size,
|
||||||
|
key=key, # store original key for lookup
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -431,9 +510,17 @@ class ShmArray:
|
||||||
|
|
||||||
def destroy(self) -> None:
|
def destroy(self) -> None:
|
||||||
if _USE_POSIX:
|
if _USE_POSIX:
|
||||||
# We manually unlink to bypass all the "resource tracker"
|
# We manually unlink to bypass all the
|
||||||
# nonsense meant for non-SC systems.
|
# "resource tracker" nonsense meant for
|
||||||
shm_unlink(self._shm.name)
|
# non-SC systems.
|
||||||
|
name = self._shm.name
|
||||||
|
try:
|
||||||
|
shm_unlink(name)
|
||||||
|
except FileNotFoundError:
|
||||||
|
# might be a teardown race here?
|
||||||
|
log.warning(
|
||||||
|
f'Shm for {name} already unlinked?'
|
||||||
|
)
|
||||||
|
|
||||||
self._first.destroy()
|
self._first.destroy()
|
||||||
self._last.destroy()
|
self._last.destroy()
|
||||||
|
|
@ -463,8 +550,16 @@ def open_shm_ndarray(
|
||||||
a = np.zeros(size, dtype=dtype)
|
a = np.zeros(size, dtype=dtype)
|
||||||
a['index'] = np.arange(len(a))
|
a['index'] = np.arange(len(a))
|
||||||
|
|
||||||
|
# Create token first to get the (possibly
|
||||||
|
# shortened) shm name
|
||||||
|
token = _make_token(
|
||||||
|
key=key,
|
||||||
|
size=size,
|
||||||
|
dtype=dtype,
|
||||||
|
)
|
||||||
|
|
||||||
shm = SharedMemory(
|
shm = SharedMemory(
|
||||||
name=key,
|
name=token.shm_name,
|
||||||
create=True,
|
create=True,
|
||||||
size=a.nbytes
|
size=a.nbytes
|
||||||
)
|
)
|
||||||
|
|
@ -476,12 +571,6 @@ def open_shm_ndarray(
|
||||||
array[:] = a[:]
|
array[:] = a[:]
|
||||||
array.setflags(write=int(not readonly))
|
array.setflags(write=int(not readonly))
|
||||||
|
|
||||||
token = _make_token(
|
|
||||||
key=key,
|
|
||||||
size=size,
|
|
||||||
dtype=dtype,
|
|
||||||
)
|
|
||||||
|
|
||||||
# create single entry arrays for storing an first and last indices
|
# create single entry arrays for storing an first and last indices
|
||||||
first = SharedInt(
|
first = SharedInt(
|
||||||
shm=SharedMemory(
|
shm=SharedMemory(
|
||||||
|
|
@ -554,13 +643,23 @@ def attach_shm_ndarray(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
token = NDToken.from_msg(token)
|
token = NDToken.from_msg(token)
|
||||||
key = token.shm_name
|
# Use original key for _known_tokens lookup,
|
||||||
|
# shm_name for OS calls
|
||||||
|
lookup_key = (
|
||||||
|
token.key if token.key
|
||||||
|
else token.shm_name
|
||||||
|
)
|
||||||
|
|
||||||
if key in _known_tokens:
|
if lookup_key in _known_tokens:
|
||||||
assert NDToken.from_msg(_known_tokens[key]) == token, "WTF"
|
assert (
|
||||||
|
NDToken.from_msg(
|
||||||
|
_known_tokens[lookup_key]
|
||||||
|
) == token
|
||||||
|
), 'WTF'
|
||||||
|
|
||||||
# XXX: ugh, looks like due to the ``shm_open()`` C api we can't
|
# XXX: ugh, looks like due to the ``shm_open()``
|
||||||
# actually place files in a subdir, see discussion here:
|
# C api we can't actually place files in a subdir,
|
||||||
|
# see discussion here:
|
||||||
# https://stackoverflow.com/a/11103289
|
# https://stackoverflow.com/a/11103289
|
||||||
|
|
||||||
# attach to array buffer and view as per dtype
|
# attach to array buffer and view as per dtype
|
||||||
|
|
@ -568,7 +667,7 @@ def attach_shm_ndarray(
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
try:
|
try:
|
||||||
shm = SharedMemory(
|
shm = SharedMemory(
|
||||||
name=key,
|
name=token.shm_name,
|
||||||
create=False,
|
create=False,
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
|
|
@ -614,10 +713,10 @@ def attach_shm_ndarray(
|
||||||
sha.array
|
sha.array
|
||||||
|
|
||||||
# Stash key -> token knowledge for future queries
|
# Stash key -> token knowledge for future queries
|
||||||
# via `maybe_opepn_shm_array()` but only after we know
|
# via `maybe_open_shm_ndarray()` but only after
|
||||||
# we can attach.
|
# we know we can attach.
|
||||||
if key not in _known_tokens:
|
if lookup_key not in _known_tokens:
|
||||||
_known_tokens[key] = token
|
_known_tokens[lookup_key] = token
|
||||||
|
|
||||||
# "close" attached shm on actor teardown
|
# "close" attached shm on actor teardown
|
||||||
tractor.current_actor().lifetime_stack.callback(sha.close)
|
tractor.current_actor().lifetime_stack.callback(sha.close)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue