forked from goodboy/tractor
Compare commits
20 Commits
master
...
macos_in_c
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | a89799b682 | |
Tyler Goodlet | 45a9aaf6e9 | |
Tyler Goodlet | b624ebba21 | |
Tyler Goodlet | e12def51a8 | |
Tyler Goodlet | 64819b2acb | |
Tyler Goodlet | a5e3cf4ecf | |
Tyler Goodlet | 7c42d2510c | |
Tyler Goodlet | 9c336ec064 | |
Tyler Goodlet | 6453195e97 | |
Tyler Goodlet | 54322f2bae | |
Tyler Goodlet | 1be3f4115d | |
Tyler Goodlet | 3bdd04ef4a | |
Tyler Goodlet | 2ac19b2e96 | |
Tyler Goodlet | 60f8f110e8 | |
Tyler Goodlet | 6d4d428205 | |
Tyler Goodlet | ecea1e1658 | |
Tyler Goodlet | 971ac50756 | |
Tyler Goodlet | c01c22769b | |
Tyler Goodlet | 0e4b37d122 | |
Tyler Goodlet | 901353c213 |
|
@ -67,7 +67,6 @@ jobs:
|
||||||
]
|
]
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
@ -85,6 +84,40 @@ jobs:
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
|
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
|
||||||
|
|
||||||
|
|
||||||
|
testing-macos:
|
||||||
|
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
|
||||||
|
timeout-minutes: 10
|
||||||
|
runs-on: ${{ matrix.os }}
|
||||||
|
|
||||||
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
|
matrix:
|
||||||
|
os: [macos-latest]
|
||||||
|
python: ['3.10']
|
||||||
|
spawn_backend: [
|
||||||
|
'trio',
|
||||||
|
]
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Setup python
|
||||||
|
uses: actions/setup-python@v2
|
||||||
|
with:
|
||||||
|
python-version: '${{ matrix.python }}'
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
||||||
|
|
||||||
|
- name: List dependencies
|
||||||
|
run: pip list
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
|
||||||
|
|
||||||
|
|
||||||
# We skip 3.10 on windows for now due to not having any collabs to
|
# We skip 3.10 on windows for now due to not having any collabs to
|
||||||
# debug the CI failures. Anyone wanting to hack and solve them is very
|
# debug the CI failures. Anyone wanting to hack and solve them is very
|
||||||
# welcome, but our primary user base is not using that OS.
|
# welcome, but our primary user base is not using that OS.
|
||||||
|
|
|
@ -12,9 +12,11 @@ async def stream_data(seed):
|
||||||
|
|
||||||
# this is the third actor; the aggregator
|
# this is the third actor; the aggregator
|
||||||
async def aggregate(seed):
|
async def aggregate(seed):
|
||||||
"""Ensure that the two streams we receive match but only stream
|
'''
|
||||||
|
Ensure that the two streams we receive match but only stream
|
||||||
a single set of values to the parent.
|
a single set of values to the parent.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery() as nursery:
|
||||||
portals = []
|
portals = []
|
||||||
for i in range(1, 3):
|
for i in range(1, 3):
|
||||||
|
@ -69,7 +71,8 @@ async def aggregate(seed):
|
||||||
async def main():
|
async def main():
|
||||||
# a nursery which spawns "actors"
|
# a nursery which spawns "actors"
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
arbiter_addr=('127.0.0.1', 1616)
|
arbiter_addr=('127.0.0.1', 1616),
|
||||||
|
loglevel='cancel',
|
||||||
) as nursery:
|
) as nursery:
|
||||||
|
|
||||||
seed = int(1e3)
|
seed = int(1e3)
|
||||||
|
@ -92,6 +95,9 @@ async def main():
|
||||||
async for value in stream:
|
async for value in stream:
|
||||||
result_stream.append(value)
|
result_stream.append(value)
|
||||||
|
|
||||||
|
print("ROOT STREAM CONSUMER COMPLETE")
|
||||||
|
|
||||||
|
print("ROOT CANCELLING AGGREGATOR CHILD")
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
print(f"STREAM TIME = {time.time() - start}")
|
print(f"STREAM TIME = {time.time() - start}")
|
||||||
|
|
|
@ -6,3 +6,4 @@ mypy
|
||||||
trio_typing
|
trio_typing
|
||||||
pexpect
|
pexpect
|
||||||
towncrier
|
towncrier
|
||||||
|
numpy
|
||||||
|
|
|
@ -36,9 +36,12 @@ from conftest import repodir, _ci_env
|
||||||
# - recurrent root errors
|
# - recurrent root errors
|
||||||
|
|
||||||
|
|
||||||
if platform.system() == 'Windows':
|
if osname := platform.system() in (
|
||||||
|
'Windows',
|
||||||
|
'Darwin',
|
||||||
|
):
|
||||||
pytest.skip(
|
pytest.skip(
|
||||||
'Debugger tests have no windows support (yet)',
|
'Debugger tests have no {osname} support (yet)',
|
||||||
allow_module_level=True,
|
allow_module_level=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -783,8 +786,6 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
|
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
||||||
|
|
||||||
timed_out_early: bool = False
|
|
||||||
|
|
||||||
for send_char in itertools.cycle(['c', 'q']):
|
for send_char in itertools.cycle(['c', 'q']):
|
||||||
try:
|
try:
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
|
@ -93,14 +93,16 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
|
||||||
ids=lambda t: t[1],
|
ids=lambda t: t[1],
|
||||||
)
|
)
|
||||||
def test_example(run_example_in_subproc, example_script):
|
def test_example(run_example_in_subproc, example_script):
|
||||||
"""Load and run scripts from this repo's ``examples/`` dir as a user
|
'''
|
||||||
|
Load and run scripts from this repo's ``examples/`` dir as a user
|
||||||
would copy and pasing them into their editor.
|
would copy and pasing them into their editor.
|
||||||
|
|
||||||
On windows a little more "finessing" is done to make
|
On windows a little more "finessing" is done to make
|
||||||
``multiprocessing`` play nice: we copy the ``__main__.py`` into the
|
``multiprocessing`` play nice: we copy the ``__main__.py`` into the
|
||||||
test directory and invoke the script as a module with ``python -m
|
test directory and invoke the script as a module with ``python -m
|
||||||
test_example``.
|
test_example``.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
ex_file = os.path.join(*example_script)
|
ex_file = os.path.join(*example_script)
|
||||||
|
|
||||||
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
|
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
|
||||||
|
@ -110,13 +112,20 @@ def test_example(run_example_in_subproc, example_script):
|
||||||
code = ex.read()
|
code = ex.read()
|
||||||
|
|
||||||
with run_example_in_subproc(code) as proc:
|
with run_example_in_subproc(code) as proc:
|
||||||
proc.wait()
|
try:
|
||||||
err, _ = proc.stderr.read(), proc.stdout.read()
|
proc.wait(timeout=5)
|
||||||
# print(f'STDERR: {err}')
|
finally:
|
||||||
# print(f'STDOUT: {out}')
|
err = proc.stderr.read()
|
||||||
|
errmsg = err.decode()
|
||||||
|
out = proc.stdout.read()
|
||||||
|
outmsg = out.decode()
|
||||||
|
|
||||||
|
if out:
|
||||||
|
print(f'STDOUT: {out.decode()}')
|
||||||
|
|
||||||
# if we get some gnarly output let's aggregate and raise
|
# if we get some gnarly output let's aggregate and raise
|
||||||
if err:
|
if err:
|
||||||
|
print(f'STDERR:\n{errmsg}')
|
||||||
errmsg = err.decode()
|
errmsg = err.decode()
|
||||||
errlines = errmsg.splitlines()
|
errlines = errmsg.splitlines()
|
||||||
last_error = errlines[-1]
|
last_error = errlines[-1]
|
||||||
|
|
|
@ -0,0 +1,167 @@
|
||||||
|
"""
|
||||||
|
Shared mem primitives and APIs.
|
||||||
|
|
||||||
|
"""
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
# import numpy
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
from tractor._shm import (
|
||||||
|
open_shm_list,
|
||||||
|
attach_shm_list,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def child_attach_shml_alot(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
shm_key: str,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
await ctx.started(shm_key)
|
||||||
|
|
||||||
|
# now try to attach a boatload of times in a loop..
|
||||||
|
for _ in range(1000):
|
||||||
|
shml = attach_shm_list(
|
||||||
|
key=shm_key,
|
||||||
|
readonly=False,
|
||||||
|
)
|
||||||
|
assert shml.shm.name == shm_key
|
||||||
|
await trio.sleep(0.001)
|
||||||
|
|
||||||
|
|
||||||
|
def test_child_attaches_alot():
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
|
|
||||||
|
# allocate writeable list in parent
|
||||||
|
key = f'shml_{uuid.uuid4()}'
|
||||||
|
shml = open_shm_list(
|
||||||
|
key=key,
|
||||||
|
)
|
||||||
|
|
||||||
|
portal = await an.start_actor(
|
||||||
|
'shm_attacher',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
portal.open_context(
|
||||||
|
child_attach_shml_alot,
|
||||||
|
shm_key=shml.key,
|
||||||
|
) as (ctx, start_val),
|
||||||
|
):
|
||||||
|
assert start_val == key
|
||||||
|
await ctx.result()
|
||||||
|
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def child_read_shm_list(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
shm_key: str,
|
||||||
|
use_str: bool,
|
||||||
|
frame_size: int,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
# attach in child
|
||||||
|
shml = attach_shm_list(
|
||||||
|
key=shm_key,
|
||||||
|
# dtype=str if use_str else float,
|
||||||
|
)
|
||||||
|
await ctx.started(shml.key)
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
async for i in stream:
|
||||||
|
print(f'(child): reading shm list index: {i}')
|
||||||
|
|
||||||
|
if use_str:
|
||||||
|
expect = str(float(i))
|
||||||
|
else:
|
||||||
|
expect = float(i)
|
||||||
|
|
||||||
|
if frame_size == 1:
|
||||||
|
val = shml[i]
|
||||||
|
assert expect == val
|
||||||
|
print(f'(child): reading value: {val}')
|
||||||
|
else:
|
||||||
|
frame = shml[i - frame_size:i]
|
||||||
|
print(f'(child): reading frame: {frame}')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'use_str',
|
||||||
|
[False, True],
|
||||||
|
ids=lambda i: f'use_str_values={i}',
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'frame_size',
|
||||||
|
[1, 2**6, 2**10],
|
||||||
|
ids=lambda i: f'frame_size={i}',
|
||||||
|
)
|
||||||
|
def test_parent_writer_child_reader(
|
||||||
|
use_str: bool,
|
||||||
|
frame_size: int,
|
||||||
|
):
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
# debug_mode=True,
|
||||||
|
) as an:
|
||||||
|
|
||||||
|
portal = await an.start_actor(
|
||||||
|
'shm_reader',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
debug_mode=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# allocate writeable list in parent
|
||||||
|
key = 'shm_list'
|
||||||
|
seq_size = int(2 * 2 ** 10)
|
||||||
|
shml = open_shm_list(
|
||||||
|
key=key,
|
||||||
|
size=seq_size,
|
||||||
|
dtype=str if use_str else float,
|
||||||
|
readonly=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
portal.open_context(
|
||||||
|
child_read_shm_list,
|
||||||
|
shm_key=key,
|
||||||
|
use_str=use_str,
|
||||||
|
frame_size=frame_size,
|
||||||
|
) as (ctx, sent),
|
||||||
|
|
||||||
|
ctx.open_stream() as stream,
|
||||||
|
):
|
||||||
|
|
||||||
|
assert sent == key
|
||||||
|
|
||||||
|
for i in range(seq_size):
|
||||||
|
|
||||||
|
val = float(i)
|
||||||
|
if use_str:
|
||||||
|
val = str(val)
|
||||||
|
|
||||||
|
# print(f'(parent): writing {val}')
|
||||||
|
shml[i] = val
|
||||||
|
|
||||||
|
# only on frame fills do we
|
||||||
|
# signal to the child that a frame's
|
||||||
|
# worth is ready.
|
||||||
|
if (i % frame_size) == 0:
|
||||||
|
print(f'(parent): signalling frame full on {val}')
|
||||||
|
await stream.send(i)
|
||||||
|
else:
|
||||||
|
print(f'(parent): signalling final frame on {val}')
|
||||||
|
await stream.send(i)
|
||||||
|
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
trio.run(main)
|
|
@ -488,7 +488,10 @@ class Actor:
|
||||||
self._parent_chan: Optional[Channel] = None
|
self._parent_chan: Optional[Channel] = None
|
||||||
self._forkserver_info: Optional[
|
self._forkserver_info: Optional[
|
||||||
tuple[Any, Any, Any, Any, Any]] = None
|
tuple[Any, Any, Any, Any, Any]] = None
|
||||||
self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa
|
self._actoruid2nursery: dict[
|
||||||
|
tuple[str, str],
|
||||||
|
ActorNursery | None,
|
||||||
|
] = {} # type: ignore # noqa
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self, uid: tuple[str, str]
|
self, uid: tuple[str, str]
|
||||||
|
@ -826,7 +829,12 @@ class Actor:
|
||||||
|
|
||||||
if ctx._backpressure:
|
if ctx._backpressure:
|
||||||
log.warning(text)
|
log.warning(text)
|
||||||
|
try:
|
||||||
await send_chan.send(msg)
|
await send_chan.send(msg)
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
# XXX: local consumer has closed their side
|
||||||
|
# so cancel the far end streaming task
|
||||||
|
log.warning(f"{chan} is already closed")
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
raise StreamOverrun(text) from None
|
raise StreamOverrun(text) from None
|
||||||
|
@ -1371,8 +1379,9 @@ async def async_main(
|
||||||
actor.lifetime_stack.close()
|
actor.lifetime_stack.close()
|
||||||
|
|
||||||
# Unregister actor from the arbiter
|
# Unregister actor from the arbiter
|
||||||
if registered_with_arbiter and (
|
if (
|
||||||
actor._arb_addr is not None
|
registered_with_arbiter
|
||||||
|
and not actor.is_arbiter
|
||||||
):
|
):
|
||||||
failed = False
|
failed = False
|
||||||
with trio.move_on_after(0.5) as cs:
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
|
|
@ -0,0 +1,828 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
"""
|
||||||
|
SC friendly shared memory management geared at real-time
|
||||||
|
processing.
|
||||||
|
|
||||||
|
Support for ``numpy`` compatible array-buffers is provided but is
|
||||||
|
considered optional within the context of this runtime-library.
|
||||||
|
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
from sys import byteorder
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
from multiprocessing import shared_memory as shm
|
||||||
|
from multiprocessing.shared_memory import (
|
||||||
|
SharedMemory,
|
||||||
|
ShareableList,
|
||||||
|
)
|
||||||
|
|
||||||
|
from msgspec import Struct
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
from .log import get_logger
|
||||||
|
|
||||||
|
|
||||||
|
_USE_POSIX = getattr(shm, '_USE_POSIX', False)
|
||||||
|
if _USE_POSIX:
|
||||||
|
from _posixshmem import shm_unlink
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
import numpy as np
|
||||||
|
from numpy.lib import recfunctions as rfn
|
||||||
|
import nptyping
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def disable_mantracker():
|
||||||
|
'''
|
||||||
|
Disable all ``multiprocessing``` "resource tracking" machinery since
|
||||||
|
it's an absolute multi-threaded mess of non-SC madness.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from multiprocessing import resource_tracker as mantracker
|
||||||
|
|
||||||
|
# Tell the "resource tracker" thing to fuck off.
|
||||||
|
class ManTracker(mantracker.ResourceTracker):
|
||||||
|
def register(self, name, rtype):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def unregister(self, name, rtype):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def ensure_running(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# "know your land and know your prey"
|
||||||
|
# https://www.dailymotion.com/video/x6ozzco
|
||||||
|
mantracker._resource_tracker = ManTracker()
|
||||||
|
mantracker.register = mantracker._resource_tracker.register
|
||||||
|
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
||||||
|
mantracker.unregister = mantracker._resource_tracker.unregister
|
||||||
|
mantracker.getfd = mantracker._resource_tracker.getfd
|
||||||
|
|
||||||
|
|
||||||
|
disable_mantracker()
|
||||||
|
|
||||||
|
|
||||||
|
class SharedInt:
|
||||||
|
'''
|
||||||
|
Wrapper around a single entry shared memory array which
|
||||||
|
holds an ``int`` value used as an index counter.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
shm: SharedMemory,
|
||||||
|
) -> None:
|
||||||
|
self._shm = shm
|
||||||
|
|
||||||
|
@property
|
||||||
|
def value(self) -> int:
|
||||||
|
return int.from_bytes(self._shm.buf, byteorder)
|
||||||
|
|
||||||
|
@value.setter
|
||||||
|
def value(self, value) -> None:
|
||||||
|
self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder)
|
||||||
|
|
||||||
|
def destroy(self) -> None:
|
||||||
|
if _USE_POSIX:
|
||||||
|
# We manually unlink to bypass all the "resource tracker"
|
||||||
|
# nonsense meant for non-SC systems.
|
||||||
|
name = self._shm.name
|
||||||
|
try:
|
||||||
|
shm_unlink(name)
|
||||||
|
except FileNotFoundError:
|
||||||
|
# might be a teardown race here?
|
||||||
|
log.warning(f'Shm for {name} already unlinked?')
|
||||||
|
|
||||||
|
|
||||||
|
class NDToken(Struct, frozen=True):
|
||||||
|
'''
|
||||||
|
Internal represenation of a shared memory ``numpy`` array "token"
|
||||||
|
which can be used to key and load a system (OS) wide shm entry
|
||||||
|
and correctly read the array by type signature.
|
||||||
|
|
||||||
|
This type is msg safe.
|
||||||
|
|
||||||
|
'''
|
||||||
|
shm_name: str # this servers as a "key" value
|
||||||
|
shm_first_index_name: str
|
||||||
|
shm_last_index_name: str
|
||||||
|
dtype_descr: tuple
|
||||||
|
size: int # in struct-array index / row terms
|
||||||
|
|
||||||
|
# TODO: use nptyping here on dtypes
|
||||||
|
@property
|
||||||
|
def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]:
|
||||||
|
return np.dtype(
|
||||||
|
list(
|
||||||
|
map(tuple, self.dtype_descr)
|
||||||
|
)
|
||||||
|
).descr
|
||||||
|
|
||||||
|
def as_msg(self):
|
||||||
|
return self.to_dict()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_msg(cls, msg: dict) -> NDToken:
|
||||||
|
if isinstance(msg, NDToken):
|
||||||
|
return msg
|
||||||
|
|
||||||
|
# TODO: native struct decoding
|
||||||
|
# return _token_dec.decode(msg)
|
||||||
|
|
||||||
|
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
|
||||||
|
return NDToken(**msg)
|
||||||
|
|
||||||
|
|
||||||
|
# _token_dec = msgspec.msgpack.Decoder(NDToken)
|
||||||
|
|
||||||
|
# TODO: this api?
|
||||||
|
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
|
||||||
|
# _known_tokens = tractor.ContextStack('_known_tokens', )
|
||||||
|
# _known_tokens = trio.RunVar('shms', {})
|
||||||
|
|
||||||
|
# TODO: this should maybe be provided via
|
||||||
|
# a `.trionics.maybe_open_context()` wrapper factory?
|
||||||
|
# process-local store of keys to tokens
|
||||||
|
_known_tokens: dict[str, NDToken] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def get_shm_token(key: str) -> NDToken | None:
|
||||||
|
'''
|
||||||
|
Convenience func to check if a token
|
||||||
|
for the provided key is known by this process.
|
||||||
|
|
||||||
|
Returns either the ``numpy`` token or a string for a shared list.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return _known_tokens.get(key)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_token(
|
||||||
|
key: str,
|
||||||
|
size: int,
|
||||||
|
dtype: np.dtype,
|
||||||
|
|
||||||
|
) -> NDToken:
|
||||||
|
'''
|
||||||
|
Create a serializable token that can be used
|
||||||
|
to access a shared array.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return NDToken(
|
||||||
|
shm_name=key,
|
||||||
|
shm_first_index_name=key + "_first",
|
||||||
|
shm_last_index_name=key + "_last",
|
||||||
|
dtype_descr=tuple(np.dtype(dtype).descr),
|
||||||
|
size=size,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ShmArray:
|
||||||
|
'''
|
||||||
|
A shared memory ``numpy.ndarray`` API.
|
||||||
|
|
||||||
|
An underlying shared memory buffer is allocated based on
|
||||||
|
a user specified ``numpy.ndarray``. This fixed size array
|
||||||
|
can be read and written to by pushing data both onto the "front"
|
||||||
|
or "back" of a set index range. The indexes for the "first" and
|
||||||
|
"last" index are themselves stored in shared memory (accessed via
|
||||||
|
``SharedInt`` interfaces) values such that multiple processes can
|
||||||
|
interact with the same array using a synchronized-index.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
shmarr: np.ndarray,
|
||||||
|
first: SharedInt,
|
||||||
|
last: SharedInt,
|
||||||
|
shm: SharedMemory,
|
||||||
|
# readonly: bool = True,
|
||||||
|
) -> None:
|
||||||
|
self._array = shmarr
|
||||||
|
|
||||||
|
# indexes for first and last indices corresponding
|
||||||
|
# to fille data
|
||||||
|
self._first = first
|
||||||
|
self._last = last
|
||||||
|
|
||||||
|
self._len = len(shmarr)
|
||||||
|
self._shm = shm
|
||||||
|
self._post_init: bool = False
|
||||||
|
|
||||||
|
# pushing data does not write the index (aka primary key)
|
||||||
|
self._write_fields: list[str] | None = None
|
||||||
|
dtype = shmarr.dtype
|
||||||
|
if dtype.fields:
|
||||||
|
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
|
||||||
|
|
||||||
|
# TODO: ringbuf api?
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _token(self) -> NDToken:
|
||||||
|
return NDToken(
|
||||||
|
shm_name=self._shm.name,
|
||||||
|
shm_first_index_name=self._first._shm.name,
|
||||||
|
shm_last_index_name=self._last._shm.name,
|
||||||
|
dtype_descr=tuple(self._array.dtype.descr),
|
||||||
|
size=self._len,
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def token(self) -> dict:
|
||||||
|
"""Shared memory token that can be serialized and used by
|
||||||
|
another process to attach to this array.
|
||||||
|
"""
|
||||||
|
return self._token.as_msg()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def index(self) -> int:
|
||||||
|
return self._last.value % self._len
|
||||||
|
|
||||||
|
@property
|
||||||
|
def array(self) -> np.ndarray:
|
||||||
|
'''
|
||||||
|
Return an up-to-date ``np.ndarray`` view of the
|
||||||
|
so-far-written data to the underlying shm buffer.
|
||||||
|
|
||||||
|
'''
|
||||||
|
a = self._array[self._first.value:self._last.value]
|
||||||
|
|
||||||
|
# first, last = self._first.value, self._last.value
|
||||||
|
# a = self._array[first:last]
|
||||||
|
|
||||||
|
# TODO: eventually comment this once we've not seen it in the
|
||||||
|
# wild in a long time..
|
||||||
|
# XXX: race where first/last indexes cause a reader
|
||||||
|
# to load an empty array..
|
||||||
|
if len(a) == 0 and self._post_init:
|
||||||
|
raise RuntimeError('Empty array race condition hit!?')
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
return a
|
||||||
|
|
||||||
|
def ustruct(
|
||||||
|
self,
|
||||||
|
fields: Optional[list[str]] = None,
|
||||||
|
|
||||||
|
# type that all field values will be cast to in the returned
|
||||||
|
# view.
|
||||||
|
common_dtype: np.dtype = np.float64, # type: ignore
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
array = self._array
|
||||||
|
|
||||||
|
if fields:
|
||||||
|
selection = array[fields]
|
||||||
|
# fcount = len(fields)
|
||||||
|
else:
|
||||||
|
selection = array
|
||||||
|
# fcount = len(array.dtype.fields)
|
||||||
|
|
||||||
|
# XXX: manual ``.view()`` attempt that also doesn't work.
|
||||||
|
# uview = selection.view(
|
||||||
|
# dtype='<f16',
|
||||||
|
# ).reshape(-1, 4, order='A')
|
||||||
|
|
||||||
|
# assert len(selection) == len(uview)
|
||||||
|
|
||||||
|
u = rfn.structured_to_unstructured(
|
||||||
|
selection,
|
||||||
|
# dtype=float,
|
||||||
|
copy=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
|
||||||
|
# array[:] = a[:]
|
||||||
|
return u
|
||||||
|
# return ShmArray(
|
||||||
|
# shmarr=u,
|
||||||
|
# first=self._first,
|
||||||
|
# last=self._last,
|
||||||
|
# shm=self._shm
|
||||||
|
# )
|
||||||
|
|
||||||
|
def last(
|
||||||
|
self,
|
||||||
|
length: int = 1,
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
'''
|
||||||
|
Return the last ``length``'s worth of ("row") entries from the
|
||||||
|
array.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self.array[-length:]
|
||||||
|
|
||||||
|
def push(
|
||||||
|
self,
|
||||||
|
data: np.ndarray,
|
||||||
|
|
||||||
|
field_map: Optional[dict[str, str]] = None,
|
||||||
|
prepend: bool = False,
|
||||||
|
update_first: bool = True,
|
||||||
|
start: Optional[int] = None,
|
||||||
|
|
||||||
|
) -> int:
|
||||||
|
'''
|
||||||
|
Ring buffer like "push" to append data
|
||||||
|
into the buffer and return updated "last" index.
|
||||||
|
|
||||||
|
NB: no actual ring logic yet to give a "loop around" on overflow
|
||||||
|
condition, lel.
|
||||||
|
|
||||||
|
'''
|
||||||
|
length = len(data)
|
||||||
|
|
||||||
|
if prepend:
|
||||||
|
index = (start or self._first.value) - length
|
||||||
|
|
||||||
|
if index < 0:
|
||||||
|
raise ValueError(
|
||||||
|
f'Array size of {self._len} was overrun during prepend.\n'
|
||||||
|
f'You have passed {abs(index)} too many datums.'
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
index = start if start is not None else self._last.value
|
||||||
|
|
||||||
|
end = index + length
|
||||||
|
|
||||||
|
if field_map:
|
||||||
|
src_names, dst_names = zip(*field_map.items())
|
||||||
|
else:
|
||||||
|
dst_names = src_names = self._write_fields
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._array[
|
||||||
|
list(dst_names)
|
||||||
|
][index:end] = data[list(src_names)][:]
|
||||||
|
|
||||||
|
# NOTE: there was a race here between updating
|
||||||
|
# the first and last indices and when the next reader
|
||||||
|
# tries to access ``.array`` (which due to the index
|
||||||
|
# overlap will be empty). Pretty sure we've fixed it now
|
||||||
|
# but leaving this here as a reminder.
|
||||||
|
if prepend and update_first and length:
|
||||||
|
assert index < self._first.value
|
||||||
|
|
||||||
|
if (
|
||||||
|
index < self._first.value
|
||||||
|
and update_first
|
||||||
|
):
|
||||||
|
assert prepend, 'prepend=True not passed but index decreased?'
|
||||||
|
self._first.value = index
|
||||||
|
|
||||||
|
elif not prepend:
|
||||||
|
self._last.value = end
|
||||||
|
|
||||||
|
self._post_init = True
|
||||||
|
return end
|
||||||
|
|
||||||
|
except ValueError as err:
|
||||||
|
if field_map:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# should raise if diff detected
|
||||||
|
self.diff_err_fields(data)
|
||||||
|
raise err
|
||||||
|
|
||||||
|
def diff_err_fields(
|
||||||
|
self,
|
||||||
|
data: np.ndarray,
|
||||||
|
) -> None:
|
||||||
|
# reraise with any field discrepancy
|
||||||
|
our_fields, their_fields = (
|
||||||
|
set(self._array.dtype.fields),
|
||||||
|
set(data.dtype.fields),
|
||||||
|
)
|
||||||
|
|
||||||
|
only_in_ours = our_fields - their_fields
|
||||||
|
only_in_theirs = their_fields - our_fields
|
||||||
|
|
||||||
|
if only_in_ours:
|
||||||
|
raise TypeError(
|
||||||
|
f"Input array is missing field(s): {only_in_ours}"
|
||||||
|
)
|
||||||
|
elif only_in_theirs:
|
||||||
|
raise TypeError(
|
||||||
|
f"Input array has unknown field(s): {only_in_theirs}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: support "silent" prepends that don't update ._first.value?
|
||||||
|
def prepend(
|
||||||
|
self,
|
||||||
|
data: np.ndarray,
|
||||||
|
) -> int:
|
||||||
|
end = self.push(data, prepend=True)
|
||||||
|
assert end
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self._first._shm.close()
|
||||||
|
self._last._shm.close()
|
||||||
|
self._shm.close()
|
||||||
|
|
||||||
|
def destroy(self) -> None:
|
||||||
|
if _USE_POSIX:
|
||||||
|
# We manually unlink to bypass all the "resource tracker"
|
||||||
|
# nonsense meant for non-SC systems.
|
||||||
|
shm_unlink(self._shm.name)
|
||||||
|
|
||||||
|
self._first.destroy()
|
||||||
|
self._last.destroy()
|
||||||
|
|
||||||
|
def flush(self) -> None:
|
||||||
|
# TODO: flush to storage backend like markestore?
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def open_shm_ndarray(
|
||||||
|
key: Optional[str] = None,
|
||||||
|
size: int = int(2 ** 10),
|
||||||
|
dtype: np.dtype | None = None,
|
||||||
|
append_start_index: int = 0,
|
||||||
|
readonly: bool = False,
|
||||||
|
|
||||||
|
) -> ShmArray:
|
||||||
|
'''
|
||||||
|
Open a memory shared ``numpy`` using the standard library.
|
||||||
|
|
||||||
|
This call unlinks (aka permanently destroys) the buffer on teardown
|
||||||
|
and thus should be used from the parent-most accessor (process).
|
||||||
|
|
||||||
|
'''
|
||||||
|
# create new shared mem segment for which we
|
||||||
|
# have write permission
|
||||||
|
a = np.zeros(size, dtype=dtype)
|
||||||
|
a['index'] = np.arange(len(a))
|
||||||
|
|
||||||
|
shm = SharedMemory(
|
||||||
|
name=key,
|
||||||
|
create=True,
|
||||||
|
size=a.nbytes
|
||||||
|
)
|
||||||
|
array = np.ndarray(
|
||||||
|
a.shape,
|
||||||
|
dtype=a.dtype,
|
||||||
|
buffer=shm.buf
|
||||||
|
)
|
||||||
|
array[:] = a[:]
|
||||||
|
array.setflags(write=int(not readonly))
|
||||||
|
|
||||||
|
token = _make_token(
|
||||||
|
key=key,
|
||||||
|
size=size,
|
||||||
|
dtype=dtype,
|
||||||
|
)
|
||||||
|
|
||||||
|
# create single entry arrays for storing an first and last indices
|
||||||
|
first = SharedInt(
|
||||||
|
shm=SharedMemory(
|
||||||
|
name=token.shm_first_index_name,
|
||||||
|
create=True,
|
||||||
|
size=4, # std int
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
last = SharedInt(
|
||||||
|
shm=SharedMemory(
|
||||||
|
name=token.shm_last_index_name,
|
||||||
|
create=True,
|
||||||
|
size=4, # std int
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start the "real-time" append-updated (or "pushed-to") section
|
||||||
|
# after some start index: ``append_start_index``. This allows appending
|
||||||
|
# from a start point in the array which isn't the 0 index and looks
|
||||||
|
# something like,
|
||||||
|
# -------------------------
|
||||||
|
# | | i
|
||||||
|
# _________________________
|
||||||
|
# <-------------> <------->
|
||||||
|
# history real-time
|
||||||
|
#
|
||||||
|
# Once fully "prepended", the history section will leave the
|
||||||
|
# ``ShmArray._start.value: int = 0`` and the yet-to-be written
|
||||||
|
# real-time section will start at ``ShmArray.index: int``.
|
||||||
|
|
||||||
|
# this sets the index to 3/4 of the length of the buffer
|
||||||
|
# leaving a "days worth of second samples" for the real-time
|
||||||
|
# section.
|
||||||
|
last.value = first.value = append_start_index
|
||||||
|
|
||||||
|
shmarr = ShmArray(
|
||||||
|
array,
|
||||||
|
first,
|
||||||
|
last,
|
||||||
|
shm,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert shmarr._token == token
|
||||||
|
_known_tokens[key] = shmarr.token
|
||||||
|
|
||||||
|
# "unlink" created shm on process teardown by
|
||||||
|
# pushing teardown calls onto actor context stack
|
||||||
|
stack = tractor.current_actor().lifetime_stack
|
||||||
|
stack.callback(shmarr.close)
|
||||||
|
stack.callback(shmarr.destroy)
|
||||||
|
|
||||||
|
return shmarr
|
||||||
|
|
||||||
|
|
||||||
|
def attach_shm_ndarray(
|
||||||
|
token: tuple[str, str, tuple[str, str]],
|
||||||
|
readonly: bool = True,
|
||||||
|
|
||||||
|
) -> ShmArray:
|
||||||
|
'''
|
||||||
|
Attach to an existing shared memory array previously
|
||||||
|
created by another process using ``open_shared_array``.
|
||||||
|
|
||||||
|
No new shared mem is allocated but wrapper types for read/write
|
||||||
|
access are constructed.
|
||||||
|
|
||||||
|
'''
|
||||||
|
token = NDToken.from_msg(token)
|
||||||
|
key = token.shm_name
|
||||||
|
|
||||||
|
if key in _known_tokens:
|
||||||
|
assert NDToken.from_msg(_known_tokens[key]) == token, "WTF"
|
||||||
|
|
||||||
|
# XXX: ugh, looks like due to the ``shm_open()`` C api we can't
|
||||||
|
# actually place files in a subdir, see discussion here:
|
||||||
|
# https://stackoverflow.com/a/11103289
|
||||||
|
|
||||||
|
# attach to array buffer and view as per dtype
|
||||||
|
_err: Optional[Exception] = None
|
||||||
|
for _ in range(3):
|
||||||
|
try:
|
||||||
|
shm = SharedMemory(
|
||||||
|
name=key,
|
||||||
|
create=False,
|
||||||
|
)
|
||||||
|
break
|
||||||
|
except OSError as oserr:
|
||||||
|
_err = oserr
|
||||||
|
time.sleep(0.1)
|
||||||
|
else:
|
||||||
|
if _err:
|
||||||
|
raise _err
|
||||||
|
|
||||||
|
shmarr = np.ndarray(
|
||||||
|
(token.size,),
|
||||||
|
dtype=token.dtype,
|
||||||
|
buffer=shm.buf
|
||||||
|
)
|
||||||
|
shmarr.setflags(write=int(not readonly))
|
||||||
|
|
||||||
|
first = SharedInt(
|
||||||
|
shm=SharedMemory(
|
||||||
|
name=token.shm_first_index_name,
|
||||||
|
create=False,
|
||||||
|
size=4, # std int
|
||||||
|
),
|
||||||
|
)
|
||||||
|
last = SharedInt(
|
||||||
|
shm=SharedMemory(
|
||||||
|
name=token.shm_last_index_name,
|
||||||
|
create=False,
|
||||||
|
size=4, # std int
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# make sure we can read
|
||||||
|
first.value
|
||||||
|
|
||||||
|
sha = ShmArray(
|
||||||
|
shmarr,
|
||||||
|
first,
|
||||||
|
last,
|
||||||
|
shm,
|
||||||
|
)
|
||||||
|
# read test
|
||||||
|
sha.array
|
||||||
|
|
||||||
|
# Stash key -> token knowledge for future queries
|
||||||
|
# via `maybe_opepn_shm_array()` but only after we know
|
||||||
|
# we can attach.
|
||||||
|
if key not in _known_tokens:
|
||||||
|
_known_tokens[key] = token
|
||||||
|
|
||||||
|
# "close" attached shm on actor teardown
|
||||||
|
tractor.current_actor().lifetime_stack.callback(sha.close)
|
||||||
|
|
||||||
|
return sha
|
||||||
|
|
||||||
|
|
||||||
|
def maybe_open_shm_ndarray(
|
||||||
|
key: str, # unique identifier for segment
|
||||||
|
|
||||||
|
# from ``open_shm_array()``
|
||||||
|
size: int = int(2 ** 10), # array length in index terms
|
||||||
|
dtype: np.dtype | None = None,
|
||||||
|
append_start_index: int = 0,
|
||||||
|
readonly: bool = True,
|
||||||
|
|
||||||
|
) -> tuple[ShmArray, bool]:
|
||||||
|
'''
|
||||||
|
Attempt to attach to a shared memory block using a "key" lookup
|
||||||
|
to registered blocks in the users overall "system" registry
|
||||||
|
(presumes you don't have the block's explicit token).
|
||||||
|
|
||||||
|
This function is meant to solve the problem of discovering whether
|
||||||
|
a shared array token has been allocated or discovered by the actor
|
||||||
|
running in **this** process. Systems where multiple actors may seek
|
||||||
|
to access a common block can use this function to attempt to acquire
|
||||||
|
a token as discovered by the actors who have previously stored
|
||||||
|
a "key" -> ``NDToken`` map in an actor local (aka python global)
|
||||||
|
variable.
|
||||||
|
|
||||||
|
If you know the explicit ``NDToken`` for your memory segment instead
|
||||||
|
use ``attach_shm_array``.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
# see if we already know this key
|
||||||
|
token = _known_tokens[key]
|
||||||
|
return (
|
||||||
|
attach_shm_ndarray(
|
||||||
|
token=token,
|
||||||
|
readonly=readonly,
|
||||||
|
),
|
||||||
|
False, # not newly opened
|
||||||
|
)
|
||||||
|
except KeyError:
|
||||||
|
log.warning(f"Could not find {key} in shms cache")
|
||||||
|
if dtype:
|
||||||
|
token = _make_token(
|
||||||
|
key,
|
||||||
|
size=size,
|
||||||
|
dtype=dtype,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
|
||||||
|
try:
|
||||||
|
return (
|
||||||
|
attach_shm_ndarray(
|
||||||
|
token=token,
|
||||||
|
readonly=readonly,
|
||||||
|
),
|
||||||
|
False,
|
||||||
|
)
|
||||||
|
except FileNotFoundError:
|
||||||
|
log.warning(f"Could not attach to shm with token {token}")
|
||||||
|
|
||||||
|
# This actor does not know about memory
|
||||||
|
# associated with the provided "key".
|
||||||
|
# Attempt to open a block and expect
|
||||||
|
# to fail if a block has been allocated
|
||||||
|
# on the OS by someone else.
|
||||||
|
return (
|
||||||
|
open_shm_ndarray(
|
||||||
|
key=key,
|
||||||
|
size=size,
|
||||||
|
dtype=dtype,
|
||||||
|
append_start_index=append_start_index,
|
||||||
|
readonly=readonly,
|
||||||
|
),
|
||||||
|
True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ShmList(ShareableList):
|
||||||
|
'''
|
||||||
|
Carbon copy of ``.shared_memory.ShareableList`` with a few
|
||||||
|
enhancements:
|
||||||
|
|
||||||
|
- readonly mode via instance var flag `._readonly: bool`
|
||||||
|
- ``.__getitem__()`` accepts ``slice`` inputs
|
||||||
|
- exposes the underlying buffer "name" as a ``.key: str``
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
sequence: list | None = None,
|
||||||
|
*,
|
||||||
|
name: str | None = None,
|
||||||
|
readonly: bool = True
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
self._readonly = readonly
|
||||||
|
self._key = name
|
||||||
|
return super().__init__(
|
||||||
|
sequence=sequence,
|
||||||
|
name=name,
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def key(self) -> str:
|
||||||
|
return self._key
|
||||||
|
|
||||||
|
@property
|
||||||
|
def readonly(self) -> bool:
|
||||||
|
return self._readonly
|
||||||
|
|
||||||
|
def __setitem__(
|
||||||
|
self,
|
||||||
|
position,
|
||||||
|
value,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
# mimick ``numpy`` error
|
||||||
|
if self._readonly:
|
||||||
|
raise ValueError('assignment destination is read-only')
|
||||||
|
|
||||||
|
return super().__setitem__(position, value)
|
||||||
|
|
||||||
|
def __getitem__(
|
||||||
|
self,
|
||||||
|
indexish,
|
||||||
|
) -> list:
|
||||||
|
|
||||||
|
# NOTE: this is a non-writeable view (copy?) of the buffer
|
||||||
|
# in a new list instance.
|
||||||
|
if isinstance(indexish, slice):
|
||||||
|
return list(self)[indexish]
|
||||||
|
|
||||||
|
return super().__getitem__(indexish)
|
||||||
|
|
||||||
|
# TODO: should we offer a `.array` and `.push()` equivalent
|
||||||
|
# to the `ShmArray`?
|
||||||
|
# currently we have the following limitations:
|
||||||
|
# - can't write slices of input using traditional slice-assign
|
||||||
|
# syntax due to the ``ShareableList.__setitem__()`` implementation.
|
||||||
|
# - ``list(shmlist)`` returns a non-mutable copy instead of
|
||||||
|
# a writeable view which would be handier numpy-style ops.
|
||||||
|
|
||||||
|
|
||||||
|
def open_shm_list(
|
||||||
|
key: str,
|
||||||
|
sequence: list | None = None,
|
||||||
|
size: int = int(2 ** 10),
|
||||||
|
dtype: float | int | bool | str | bytes | None = float,
|
||||||
|
readonly: bool = True,
|
||||||
|
|
||||||
|
) -> ShmList:
|
||||||
|
|
||||||
|
if sequence is None:
|
||||||
|
default = {
|
||||||
|
float: 0.,
|
||||||
|
int: 0,
|
||||||
|
bool: True,
|
||||||
|
str: 'doggy',
|
||||||
|
None: None,
|
||||||
|
}[dtype]
|
||||||
|
sequence = [default] * size
|
||||||
|
|
||||||
|
shml = ShmList(
|
||||||
|
sequence=sequence,
|
||||||
|
name=key,
|
||||||
|
readonly=readonly,
|
||||||
|
)
|
||||||
|
|
||||||
|
# "close" attached shm on actor teardown
|
||||||
|
try:
|
||||||
|
actor = tractor.current_actor()
|
||||||
|
actor.lifetime_stack.callback(shml.shm.close)
|
||||||
|
actor.lifetime_stack.callback(shml.shm.unlink)
|
||||||
|
except RuntimeError:
|
||||||
|
log.warning('tractor runtime not active, skipping teardown steps')
|
||||||
|
|
||||||
|
return shml
|
||||||
|
|
||||||
|
|
||||||
|
def attach_shm_list(
|
||||||
|
key: str,
|
||||||
|
readonly: bool = False,
|
||||||
|
|
||||||
|
) -> ShmList:
|
||||||
|
|
||||||
|
return ShmList(
|
||||||
|
name=key,
|
||||||
|
readonly=readonly,
|
||||||
|
)
|
|
@ -133,12 +133,12 @@ async def gather_contexts(
|
||||||
# deliver control once all managers have started up
|
# deliver control once all managers have started up
|
||||||
await all_entered.wait()
|
await all_entered.wait()
|
||||||
|
|
||||||
# NOTE: order *should* be preserved in the output values
|
try:
|
||||||
# since ``dict``s are now implicitly ordered.
|
|
||||||
yield tuple(unwrapped.values())
|
yield tuple(unwrapped.values())
|
||||||
|
finally:
|
||||||
# we don't need a try/finally since cancellation will be triggered
|
# NOTE: this is ABSOLUTELY REQUIRED to avoid
|
||||||
# by the surrounding nursery on error.
|
# the following wacky bug:
|
||||||
|
# <tractorbugurlhere>
|
||||||
parent_exit.set()
|
parent_exit.set()
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue