Handle cancelation on EventFD.read
							parent
							
								
									22a84bcb74
								
							
						
					
					
						commit
						fc7a8a5441
					
				|  | @ -4,7 +4,6 @@ import trio | |||
| import pytest | ||||
| import tractor | ||||
| from tractor.ipc import ( | ||||
|     EFD_NONBLOCK, | ||||
|     open_eventfd, | ||||
|     RingBuffSender, | ||||
|     RingBuffReceiver | ||||
|  | @ -95,7 +94,7 @@ async def child_write_shm( | |||
|         'large_payloads_large_buffer', | ||||
|     ] | ||||
| ) | ||||
| def test_ring_buff( | ||||
| def test_ringbuf( | ||||
|     msg_amount: int, | ||||
|     rand_min: int, | ||||
|     rand_max: int, | ||||
|  | @ -171,8 +170,7 @@ async def child_blocked_receiver( | |||
| 
 | ||||
| 
 | ||||
| def test_ring_reader_cancel(): | ||||
|     flags = EFD_NONBLOCK | ||||
|     write_eventfd = open_eventfd(flags=flags) | ||||
|     write_eventfd = open_eventfd() | ||||
|     wrap_eventfd = open_eventfd() | ||||
| 
 | ||||
|     proc_kwargs = { | ||||
|  | @ -201,7 +199,6 @@ def test_ring_reader_cancel(): | |||
|                     write_eventfd=write_eventfd, | ||||
|                     wrap_eventfd=wrap_eventfd, | ||||
|                     shm_key=shm_key, | ||||
|                     flags=flags | ||||
|                 ) as (sctx, _sent), | ||||
|             ): | ||||
|                 await trio.sleep(1) | ||||
|  |  | |||
|  | @ -133,8 +133,10 @@ class EventFD: | |||
|         return write_eventfd(self._fd, value) | ||||
| 
 | ||||
|     async def read(self) -> int: | ||||
|         #TODO: how to handle signals? | ||||
|         return await trio.to_thread.run_sync(read_eventfd, self._fd) | ||||
|         return await trio.to_thread.run_sync( | ||||
|             read_eventfd, self._fd, | ||||
|             abandon_on_cancel=True | ||||
|         ) | ||||
| 
 | ||||
|     def open(self): | ||||
|         self._fobj = os.fdopen(self._fd, self._omode) | ||||
|  |  | |||
|  | @ -45,7 +45,7 @@ class RingBuffSender(trio.abc.SendStream): | |||
|         wrap_eventfd: int, | ||||
|         start_ptr: int = 0, | ||||
|         buf_size: int = 10 * 1024, | ||||
|         clean_shm_on_exit: bool = True | ||||
|         unlink_on_exit: bool = True | ||||
|     ): | ||||
|         self._shm = SharedMemory( | ||||
|             name=shm_key, | ||||
|  | @ -55,7 +55,7 @@ class RingBuffSender(trio.abc.SendStream): | |||
|         self._write_event = EventFD(write_eventfd, 'w') | ||||
|         self._wrap_event = EventFD(wrap_eventfd, 'r') | ||||
|         self._ptr = start_ptr | ||||
|         self.clean_shm_on_exit = clean_shm_on_exit | ||||
|         self.unlink_on_exit = unlink_on_exit | ||||
| 
 | ||||
|     @property | ||||
|     def key(self) -> str: | ||||
|  | @ -104,7 +104,7 @@ class RingBuffSender(trio.abc.SendStream): | |||
|     async def aclose(self): | ||||
|         self._write_event.close() | ||||
|         self._wrap_event.close() | ||||
|         if self.clean_shm_on_exit: | ||||
|         if self.unlink_on_exit: | ||||
|             self._shm.unlink() | ||||
| 
 | ||||
|         else: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue