Use the actor's service nursery instead
In order to ensure the lifetime of the feed can in fact be kept open until the last consumer task has completed we need to maintain a lifetime which is hierarchically greater then all consumer tasks. This solution is somewhat hacky but seems to work well: we just use the `tractor` actor's "service nursery" (the one normally used to invoke rpc tasks) to launch the task which will start and keep open the target cached async context manager. To make this more "proper" we may want to offer a "root nursery" in all piker actors that is exposed through some singleton api or even introduce a public api for it into `tractor` directly.pause_feeds_on_sym_switch
parent
1184a4d88e
commit
c3682348fe
|
@ -36,6 +36,7 @@ from contextlib import (
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
import tractor
|
||||||
from tractor._portal import maybe_open_nursery
|
from tractor._portal import maybe_open_nursery
|
||||||
|
|
||||||
from .brokers import get_brokermod
|
from .brokers import get_brokermod
|
||||||
|
@ -180,43 +181,49 @@ async def maybe_open_ctx(
|
||||||
|
|
||||||
ctx_key = id(mngr)
|
ctx_key = id(mngr)
|
||||||
|
|
||||||
# TODO: does this need to be a tractor "root nursery"?
|
|
||||||
async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n:
|
|
||||||
cache.nurseries[ctx_key] = n
|
|
||||||
|
|
||||||
value = None
|
value = None
|
||||||
try:
|
try:
|
||||||
# lock feed acquisition around task racing / ``trio``'s
|
# lock feed acquisition around task racing / ``trio``'s
|
||||||
# scheduler protocol
|
# scheduler protocol
|
||||||
value = cache.values[key]
|
value = cache.values[key]
|
||||||
log.info(f'Reusing cached feed for {key}')
|
log.info(f'Reusing cached feed for {key}')
|
||||||
cache.users += 1
|
cache.users += 1
|
||||||
cache.lock.release()
|
cache.lock.release()
|
||||||
yield True, value
|
yield True, value
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.info(f'Allocating new feed for {key}')
|
log.info(f'Allocating new feed for {key}')
|
||||||
|
|
||||||
# **critical section** that should prevent other tasks from
|
# **critical section** that should prevent other tasks from
|
||||||
# checking the cache until complete otherwise the scheduler
|
# checking the cache until complete otherwise the scheduler
|
||||||
# may switch and by accident we create more then one feed.
|
# may switch and by accident we create more then one feed.
|
||||||
|
|
||||||
value = await n.start(cache.run_ctx, mngr, key)
|
# TODO: vaoid pulling from ``tractor`` internals and
|
||||||
cache.users += 1
|
# instead offer a "root nursery" in
|
||||||
|
service_n = tractor.current_actor()._service_n
|
||||||
|
|
||||||
|
# TODO: does this need to be a tractor "root nursery"?
|
||||||
|
ln = cache.nurseries.get(ctx_key)
|
||||||
|
assert not ln
|
||||||
|
ln = cache.nurseries[ctx_key] = service_n
|
||||||
|
|
||||||
|
value = await ln.start(cache.run_ctx, mngr, key)
|
||||||
|
cache.users += 1
|
||||||
|
cache.lock.release()
|
||||||
|
|
||||||
|
yield False, value
|
||||||
|
|
||||||
|
finally:
|
||||||
|
cache.users -= 1
|
||||||
|
|
||||||
|
if cache.lock.locked():
|
||||||
cache.lock.release()
|
cache.lock.release()
|
||||||
|
|
||||||
yield False, value
|
if value is not None:
|
||||||
|
# if no more consumers, teardown the client
|
||||||
|
if cache.users <= 0:
|
||||||
|
log.warning(f'De-allocating feed for {key}')
|
||||||
|
|
||||||
finally:
|
# terminate mngr nursery
|
||||||
cache.users -= 1
|
cache.no_more_users.set()
|
||||||
|
|
||||||
if cache.lock.locked():
|
|
||||||
cache.lock.release()
|
|
||||||
|
|
||||||
if value is not None:
|
|
||||||
# if no more consumers, teardown the client
|
|
||||||
if cache.users <= 0:
|
|
||||||
log.warning(f'De-allocating feed for {key}')
|
|
||||||
|
|
||||||
# terminate mngr nursery
|
|
||||||
cache.no_more_users.set()
|
|
||||||
|
|
Loading…
Reference in New Issue