Linux specific IPC RingBuff using EventFD for async reader wakeup #10
Open
guille
wants to merge 334 commits from
one_ring_to_rule_them_all
into shm_apis
pull from: one_ring_to_rule_them_all
merge into: goodboy:shm_apis
goodboy:hilevel_serman
goodboy:oco_supervisor_prototype
goodboy:shm_apis
goodboy:ext_type_plds
goodboy:py313_support
goodboy:aio_abandons
goodboy:runtime_to_msgspec
goodboy:main
goodboy:pkg_tidying
goodboy:structural_dynamics_of_flow
goodboy:multihost_exs
goodboy:uv_migration_pre_msgspec_in_runtime
goodboy:remote_inceptions
goodboy:ext_type_plds_XPS_BACKUP
goodboy:modden_spawn_from_client_req
goodboy:multihomed
goodboy:devx_subpkg
goodboy:asyncio_debugger_support
goodboy:ctx_cancel_semantics_and_overruns
goodboy:pre_pretty_struct_dep_commit_b54cb66
goodboy:ctx_cancel_semantics_and_overruns_REVERSED_FACEPALM
goodboy:uv_migration
goodboy:to_asyncio_refinery
goodboy:runtime_to_msgspec_XPS_BACKUP
goodboy:rae_message_packing
goodboy:msg_codecs
goodboy:old_msg_types
goodboy:asyncio_debug_mode
goodboy:pause_from_sync_w_greenback
goodboy:mv_to_new_trio_py3.11
goodboy:modden_spawn_from_client_req_XPS_BACKUP
goodboy:shielded_ctx_cancel
goodboy:dereg_on_oserror
goodboy:ctx_cancel_semantics_and_overruns_XPS_GH_BACKUP
goodboy:msgtypes
goodboy:master
goodboy:switch_to_pdbp
goodboy:proper_breakpoint_hooking
goodboy:drop_proc_actxmngr
goodboy:ctx_result_consumption
goodboy:readme_touchups
goodboy:ipython_integration
goodboy:breceiver_internals
goodboy:piker_pin
goodboy:ipc_failure_while_streaming
goodboy:deprecate_arbiter_addr
goodboy:prompt_on_ctrlc
goodboy:dun_unset_current_actor
goodboy:ipc_failwhilestream_backup
goodboy:macos_in_ci
goodboy:harden_cluster_tests
goodboy:eg_backup
goodboy:exceptiongroups
goodboy:egs_with_ctx_res_consumption
goodboy:debug_lock_blocking
goodboy:callable_key_maybe_open_context
goodboy:spawn_backend_table
goodboy:pin_pre_trio_0.22
goodboy:pytest_report_workaround
goodboy:lifetime_stack_tests
goodboy:we_bein_all_matchy
goodboy:debug_event_guard
goodboy:disable_win_ci
goodboy:alpha5
goodboy:signint_saviour
goodboy:sigintsaviour_citesthackin
goodboy:sigintsaviour_ci_worked
goodboy:aio_error_propagation
goodboy:drop_msgpack
goodboy:310_windows
goodboy:ci_sdist_install
goodboy:include_readme
goodboy:310_plus
goodboy:name_query
goodboy:sort_subs_results_infected_aio
goodboy:aio_explicit_task_cancels
goodboy:fence_mp
goodboy:sigint_ignore_in_pdb_repl
goodboy:sigint2
goodboy:msgpack_lists_by_default
goodboy:nspaths
goodboy:experimental_subpkg
goodboy:maybe_cancel_the_cancel_
goodboy:moar_timeoutz
goodboy:drop_old_nooz_files
goodboy:raise_runinactor_error
goodboy:win_ci_timeout
goodboy:alpha4
goodboy:infect_asyncio
goodboy:expected_ctx_cancelled
goodboy:new_mypy
goodboy:context_caching
goodboy:end_of_channel_fixes
goodboy:agpl_commit_msg_fix
goodboy:agpl
goodboy:stricter_context_starting
goodboy:acked_backup
goodboy:faster_daemon_cancels
goodboy:early_deth_fixes
goodboy:clusters_and_hot_tips
goodboy:alpha3
goodboy:pubsub_startup_response_msg
goodboy:iaio_backup
goodboy:trionics
goodboy:graceful_gather
goodboy:246_facepalm_backup
goodboy:patch-async-enter-all
goodboy:immediate_remote_cancels
goodboy:less_logging
goodboy:zombie_lord_infinite
goodboy:optional_msgspec_support
goodboy:fix_kbi_in_ctx_block
goodboy:logo_tweaks
goodboy:use_trio_on_win
goodboy:alpha2
goodboy:msgspec_infect_asyncio
goodboy:live_on_air_from_tokio
goodboy:tokio_backup
goodboy:debugger_test_tweaks
goodboy:fix_news_links
goodboy:wats_da_nooz
goodboy:ctx_debugger
goodboy:bi_streaming_no_debugger_stuff
goodboy:round_2_ci_windows
goodboy:CI_increment_for_windows_bidirstreaming
goodboy:ctx_debugger_from_hardening
goodboy:infect_asyncio_backup
goodboy:debugger_hardening
goodboy:bi_streaming
goodboy:transport_cleaning
goodboy:context_finesse
goodboy:cf_backup
goodboy:db_backup
goodboy:pre_bad_close
goodboy:stdstream_clobber_fix
goodboy:bistream_backup
goodboy:transport_hardening
goodboy:msgspec_not_fucked
goodboy:try_msgspec
goodboy:prehardkill
goodboy:windows_bi_streaming
goodboy:docs_revamp
goodboy:new_docs_polish
goodboy:wip_fix_asyncio_gen_streaming
goodboy:drop_run
goodboy:mp_teardown_hardening
goodboy:stream_contexts
goodboy:drop_sync_funcs
goodboy:pub_connect_msg
goodboy:sync_cancel
goodboy:stream_clones
goodboy:first_pypi_release
goodboy:single_func_example
goodboy:readme_pump
goodboy:kinda_drop_run
goodboy:mp_hang_search
goodboy:eg_worker_poolz
goodboy:sync_breakpoint
goodboy:actor_state_via_messages
goodboy:we_aint_got_zombie_shields
goodboy:deprecate_rpcmodpaths
goodboy:implicit_runtime
goodboy:drop_tractor_run
goodboy:py3.9
goodboy:denoise_logging
goodboy:func_refs_always
goodboy:fix_debug_tests_in_ci_again
goodboy:stream_channel_shield
goodboy:pdb_madness
goodboy:advanced_debugger_testing
goodboy:clean_log_header
goodboy:debug_refine
goodboy:debug_refinements
goodboy:drop_warn
goodboy:multiproc_debug
goodboy:debugger_on_windows
goodboy:bug_in_debug
goodboy:debug_tests
goodboy:native_debugging
goodboy:matrix
goodboy:dereg_on_channel_aclose
goodboy:ensure_deregister
goodboy:start_up_sequence_trickery
goodboy:fix_win_ci_again
goodboy:stin_char_relay
goodboy:flaky_tests
goodboy:drop_cloudpickle
goodboy:reorg_entry_points
goodboy:drop-trip-update-trio
goodboy:init_sphinx_docs
goodboy:example_tests
goodboy:implicit_rpc
goodboy:fix_examples_in_docs
goodboy:try_trip
goodboy:log_task_context
goodboy:drop_event_clear
goodboy:more_thorough_super_tests
goodboy:pip_ci_fix
goodboy:windows_support
goodboy:rename_forkserver_mod
goodboy:user_update
goodboy:win_ci
goodboy:stream_functions
goodboy:propagate_loglevel
goodboy:ipc_iternals_renaming
goodboy:close_mem_chans
goodboy:docs_example_fixes
goodboy:spawn_method_support
goodboy:trio_memchans
goodboy:contexts
goodboy:remote_module_errors
goodboy:remote_task_cancelling
goodboy:fix_46
goodboy:loglevel_to_tractor_tests
goodboy:expose_tractor_test
goodboy:improved_errors
goodboy:self_register
goodboy:multi_program_tests
goodboy:tests_reorg
goodboy:type_annotations
goodboy:py3.7_tweaks
goodboy:reliable_cancel_tests
goodboy:attrs_it_up
goodboy:wait_for_actor
goodboy:draft_readme
goodboy:init_docs
goodboy:reg_with_uid
goodboy:forkserver_singleton
goodboy:drop_main_kwarg
goodboy:asyncgen_closing_fix
Labels
Clear labels
No items
No Label
Milestone
Clear milestone
No items
No Milestone
Projects
Clear projects
No project
Assignees
Clear assignees
No Assignees
2 Participants
Notifications
Due Date
The due date is invalid or out of range. Please use the format 'yyyy-mm-dd'.
No due date set.
Dependencies
No dependencies set.
Reference: goodboy/tractor#10
Reference in New Issue
There is no content yet.
Delete Branch "one_ring_to_rule_them_all"
Deleting a branch is permanent. Although the deleted branch may exist for a short time before cleaning up, in most cases it CANNOT be undone. Continue?
@guille’s re-org list
move
EventFdIO
stuff (and related util fns) into a separate module, maybe just._eventfd
not sure where we want the
RingBuffReceiver/Sender
interfaces put yet?tractor._buffer
or._buf
mod for now and we’ll eventually likely move them under thetractor.ipc
subpkg (see my list below).@goodboy’s surrounding re-org needed around this!
we should make a new
tractor.ipc
subpkg with an__init__.py
that only exports all the currently used lower level bits from the currenttractor._ipc
the set of cmds to do this should be trying to preserve as much history in module-files as possible,
mkdir tractor/ipc
touch tractor/ipc/__init__.py
import Channel as Channel
for all refs likeChannel
that are used in other parts of the code base and obvi change those imports to now src fromtractor.ipc
instead of._ipc
;)git mv tractor/_ipc.py tractor/ipc/_chan.py
._ipc
mod.leave all
Channel
related code in that newtractor.ipc._chan
move all the even lower-level transport code to a new
.ipc._tcp
and possibly a new.ipc._transport
for the interface/types that other tranports (“backends”) can implement.git mv tractor/_shm.py tractor/ipc/_shm.py
tractor.ipc
maybe.ipc.ringbuf
or similar; and/or we might want to stick all theEventFdIO
stuff to it’s own mod as well since it shouldn’t be directly tied to theShmList/Array
stuff right?RingBuffReceiver/Sender
interfaces put yet (see above)@ -0,0 +1,18 @@
{ pkgs ? import <nixpkgs> {} }:
Hmm should this be part of the repo though?
It’d be nice if we could also look into wtv they doin in
nix
land these days to integrate withuv
😉@ -11,6 +14,7 @@ import tractor
from tractor._shm import (
open_shm_list,
attach_shm_list,
EventFD, open_ringbuffer_sender, open_ringbuffer_receiver,
keep the surrounding multi-line import style porfa!
@ -33,3 +34,3 @@
)
from msgspec import Struct
from msgspec import Struct, to_builtins
again, please always use multi-line tuple style imports 🙏🙏
@ -833,1 +834,4 @@
)
if platform.system() == 'Linux':
prolly moving this into a separate mod would be good.
as mentioned that should be easier once we re-org to a
tractor.ipc
subpkg 😉@ -834,0 +1030,4 @@
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.aclose()
not following this, isn’t the whole point of being a
trio.abc.SendStream
to not have to implement these dunder meths?Also, if
.aclose()
does nothing then seems even more strange that you’d call it ?On latest commit we do have to do eventfd cleanup calls on
aclose
but yeah removed
__aexit__
, super class impl callsaclose
underneath@ -141,6 +141,7 @@ class ActorNursery:
# a `._ria_nursery` since the dependent APIs have been
# removed!
nursery: trio.Nursery|None = None,
proc_kwargs: dict[str, any] = {}
luv this!
it might make even more sense if we eventually use a
msgspec.Struct
for this as well so we can explicitly decide what to allow passing through?@ -834,0 +1123,4 @@
create=False
)
async with RingBuffReceiver(
shm, EventFD(fd=write_event_fd), EventFD(fd=wrap_event_fd, omode='w'), start_ptr=start_ptr
generally speaking (as i’ve kinda implied above) try to keep a clean "multi-line inputs & outputs* style to code.
so here that would be,
could could go further and do it with the
EventFD
inputs as well ;)the reason(s) i prefer this,
@ -834,0 +914,4 @@
def close_eventfd(fd: int) -> int:
'''
Close the eventfd.
'''
Style i’ve been rolling with for doc-strings is to have a blank newline before the last
'''
🙏@ -834,0 +920,4 @@
raise OSError(errno.errorcode[ffi.errno], 'close failed')
class EventFD:
yeah, i almost want to say you can move this stuff already into a new module?
07dab8519e
to41e84cc701
3127db8502
todd17aa4205
3db500bd2b
to3c5420f4c9
@ -0,0 +216,4 @@
destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(
Heh, i forgot this was hard coded at the
Channel
level..We should obvi move this down to
MsgpackTCPStream.connect()
(or something) and then make it easy to just swap to USD via theopen_unix_stream()
call?https://github.com/python-trio/trio/blob/main/src/trio/_tests/test_highlevel_open_unix_stream.py#L65
which if you look at the factory, also returns a
SocketStream
B)https://github.com/python-trio/trio/blob/main/src/trio/_highlevel_open_unix_stream.py#L38
If the typing is the same in
trio
it might even be worth changing to aMsgpackSocketStream
??and then we just let the UDS vs. TCP be a(n introspect-able) impl deat?
stream of thoughts here (punzone), but the only thing that’s obvi going to be incompat is the address typing,
for UDS it’s a file path and for TCP it’s obvi an ipv4 socket-addr..
so maybe we need to wrap addrs in an interface that can be called to deliver the appropriate type/format to the respective transport factories,
tuple[str, int]
=>open_tcp_stream()
str
=>open_unix_socket()
??
6e113d2150
to0208a4728f
Linux specific IPC RingBuff using EventFD for async reader wakeupto Linux specific IPC RingBuff using EventFD for async reader wakeupStep 1:
From your project repository, check out a new branch and test the changes.Step 2:
Merge the changes and update on Gitea.