Use mem chan in actor core
parent
b42e118e89
commit
f44ac4528a
|
@ -199,7 +199,9 @@ class Actor:
|
|||
Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event]
|
||||
] = {}
|
||||
# map {uids -> {callids -> waiter queues}}
|
||||
self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {}
|
||||
self._cids2qs: Dict[
|
||||
Tuple[Tuple[str, str], str],
|
||||
trio.abc.SendChannel[Any]] = {}
|
||||
self._listeners: List[trio.abc.Listener] = []
|
||||
self._parent_chan: Optional[Channel] = None
|
||||
self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None
|
||||
|
@ -299,37 +301,58 @@ class Actor:
|
|||
log.exception(
|
||||
f"Channel for {chan.uid} was already zonked..")
|
||||
|
||||
async def _push_result(self, actorid, cid: str, msg: dict) -> None:
|
||||
async def _push_result(
|
||||
self,
|
||||
actorid: Tuple[str, str],
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Push an RPC result to the local consumer's queue.
|
||||
"""
|
||||
assert actorid, f"`actorid` can't be {actorid}"
|
||||
q = self.get_waitq(actorid, cid)
|
||||
cid = msg['cid']
|
||||
send_chan = self._cids2qs[(actorid, cid)]
|
||||
assert send_chan.cid == cid
|
||||
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
||||
# maintain backpressure
|
||||
await q.put(msg)
|
||||
try:
|
||||
# maintain backpressure
|
||||
await send_chan.send(msg)
|
||||
except trio.BrokenResourceError:
|
||||
# XXX: local consumer has closed their side
|
||||
# so cancel the far end streaming task
|
||||
log.warning(f"{send_chan} consumer is already closed")
|
||||
|
||||
def get_waitq(
|
||||
def get_memchans(
|
||||
self,
|
||||
actorid: Tuple[str, str],
|
||||
cid: str
|
||||
) -> trio.Queue:
|
||||
) -> trio.abc.ReceiveChannel:
|
||||
log.debug(f"Getting result queue for {actorid} cid {cid}")
|
||||
cids2qs = self._actors2calls.setdefault(actorid, {})
|
||||
return cids2qs.setdefault(cid, trio.Queue(1000))
|
||||
try:
|
||||
recv_chan = self._cids2qs[(actorid, cid)]
|
||||
except KeyError:
|
||||
send_chan, recv_chan = trio.open_memory_channel(1000)
|
||||
send_chan.cid = cid
|
||||
self._cids2qs[(actorid, cid)] = send_chan
|
||||
|
||||
return recv_chan
|
||||
|
||||
async def send_cmd(
|
||||
self, chan: Channel, ns: str, func: str, kwargs: dict
|
||||
) -> Tuple[str, trio.Queue]:
|
||||
self,
|
||||
chan: Channel,
|
||||
ns: str,
|
||||
func: str,
|
||||
kwargs: dict
|
||||
) -> Tuple[str, trio.abc.ReceiveChannel]:
|
||||
"""Send a ``'cmd'`` message to a remote actor and return a
|
||||
caller id and a ``trio.Queue`` that can be used to wait for
|
||||
responses delivered by the local message processing loop.
|
||||
"""
|
||||
cid = str(uuid.uuid1())
|
||||
assert chan.uid
|
||||
q = self.get_waitq(chan.uid, cid)
|
||||
recv_chan = self.get_memchans(chan.uid, cid)
|
||||
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
||||
return cid, q
|
||||
return cid, recv_chan
|
||||
|
||||
async def _process_messages(
|
||||
self, chan: Channel,
|
||||
|
@ -364,11 +387,10 @@ class Actor:
|
|||
f" {chan} from {chan.uid}")
|
||||
break
|
||||
|
||||
log.debug(f"Received msg {msg} from {chan.uid}")
|
||||
cid = msg.get('cid')
|
||||
if cid:
|
||||
log.trace(f"Received msg {msg} from {chan.uid}")
|
||||
if msg.get('cid'):
|
||||
# deliver response to local caller/waiter
|
||||
await self._push_result(chan.uid, cid, msg)
|
||||
await self._push_result(chan.uid, msg)
|
||||
log.debug(
|
||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||
continue
|
||||
|
|
Loading…
Reference in New Issue