Merge pull request #241 from pikers/fsp_hotfixes

Fsp hotfixes
simpler_quote_throttle_logic
goodboy 2021-11-05 15:44:08 -04:00 committed by GitHub
commit 837c34e24b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 27 deletions

View File

@ -79,7 +79,7 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
stream: tractor.MsgStream, ctx: tractor.Context,
symbol: str, symbol: str,
feed: Feed, feed: Feed,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -90,7 +90,7 @@ async def fsp_compute(
func_name: str, func_name: str,
func: Callable, func: Callable,
attach_stream: bool = False, attach_stream: bool = True,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -147,6 +147,7 @@ async def fsp_compute(
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
tracker = TaskTracker(trio.Event(), cs) tracker = TaskTracker(trio.Event(), cs)
await ctx.started(index)
task_status.started((tracker, index)) task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
@ -155,6 +156,7 @@ async def fsp_compute(
try: try:
# rt stream # rt stream
async with ctx.open_stream() as stream:
async for processed in out_stream: async for processed in out_stream:
log.debug(f"{func_name}: {processed}") log.debug(f"{func_name}: {processed}")
@ -229,14 +231,13 @@ async def cascade(
# last_len = new_len = len(src.array) # last_len = new_len = len(src.array)
async with ( async with (
ctx.open_stream() as stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
fsp_target = partial( fsp_target = partial(
fsp_compute, fsp_compute,
stream=stream, ctx=ctx,
symbol=symbol, symbol=symbol,
feed=feed, feed=feed,
quote_stream=quote_stream, quote_stream=quote_stream,
@ -255,7 +256,6 @@ async def cascade(
last = dst.array[-1:] last = dst.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype) zeroed = np.zeros(last.shape, dtype=last.dtype)
await ctx.started(index)
profiler(f'{func_name}: fsp up') profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:

View File

@ -427,7 +427,7 @@ async def run_fsp(
) )
async with ( async with (
portal.open_stream_from( portal.open_context(
# chaining entrypoint # chaining entrypoint
fsp.cascade, fsp.cascade,
@ -437,21 +437,17 @@ async def run_fsp(
src_shm_token=src_shm.token, src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token, dst_shm_token=conf['shm'].token,
symbol=sym, symbol=sym,
fsp_func_name=fsp_func_name, func_name=fsp_func_name,
loglevel=loglevel, loglevel=loglevel,
) as stream, ) as (ctx, last_index),
ctx.open_stream() as stream,
open_sidepane( open_sidepane(
linkedsplits, linkedsplits,
display_name, display_name,
) as sidepane, ) as sidepane,
): ):
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
shm = conf['shm'] shm = conf['shm']
if conf.get('overlay'): if conf.get('overlay'):