Port fsp daemon to tractor's context api
							parent
							
								
									154e1f7087
								
							
						
					
					
						commit
						429b6f6891
					
				| 
						 | 
				
			
			@ -36,7 +36,7 @@ from ..data._sharedmem import ShmArray
 | 
			
		|||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_fsps = {
 | 
			
		||||
_fsp_builtins = {
 | 
			
		||||
    'rsi': _rsi,
 | 
			
		||||
    'wma': _wma,
 | 
			
		||||
    'vwap': _tina_vwap,
 | 
			
		||||
| 
						 | 
				
			
			@ -65,16 +65,39 @@ async def latency(
 | 
			
		|||
            yield value
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def filter_quotes_by_sym(
 | 
			
		||||
 | 
			
		||||
    sym: str,
 | 
			
		||||
    quote_stream,
 | 
			
		||||
 | 
			
		||||
) -> AsyncIterator[dict]:
 | 
			
		||||
    '''Filter quote stream by target symbol.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # TODO: make this the actualy first quote from feed
 | 
			
		||||
    # XXX: this allows for a single iteration to run for history
 | 
			
		||||
    # processing without waiting on the real-time feed for a new quote
 | 
			
		||||
    yield {}
 | 
			
		||||
 | 
			
		||||
    # task cancellation won't kill the channel
 | 
			
		||||
    # since we shielded at the `open_feed()` call
 | 
			
		||||
    async for quotes in quote_stream:
 | 
			
		||||
        for symbol, quote in quotes.items():
 | 
			
		||||
            if symbol == sym:
 | 
			
		||||
                yield quote
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def fsp_compute(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
 | 
			
		||||
    stream: tractor.MsgStream,
 | 
			
		||||
    symbol: str,
 | 
			
		||||
    feed: Feed,
 | 
			
		||||
    stream: trio.abc.ReceiveChannel,
 | 
			
		||||
    quote_stream: trio.abc.ReceiveChannel,
 | 
			
		||||
 | 
			
		||||
    src: ShmArray,
 | 
			
		||||
    dst: ShmArray,
 | 
			
		||||
 | 
			
		||||
    fsp_func_name: str,
 | 
			
		||||
    func_name: str,
 | 
			
		||||
    func: Callable,
 | 
			
		||||
 | 
			
		||||
    task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
| 
						 | 
				
			
			@ -83,25 +106,13 @@ async def fsp_compute(
 | 
			
		|||
 | 
			
		||||
    # TODO: load appropriate fsp with input args
 | 
			
		||||
 | 
			
		||||
    async def filter_by_sym(
 | 
			
		||||
        sym: str,
 | 
			
		||||
        stream,
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
        # TODO: make this the actualy first quote from feed
 | 
			
		||||
        # XXX: this allows for a single iteration to run for history
 | 
			
		||||
        # processing without waiting on the real-time feed for a new quote
 | 
			
		||||
        yield {}
 | 
			
		||||
 | 
			
		||||
        # task cancellation won't kill the channel
 | 
			
		||||
        # since we shielded at the `open_feed()` call
 | 
			
		||||
        async for quotes in stream:
 | 
			
		||||
            for symbol, quotes in quotes.items():
 | 
			
		||||
                if symbol == sym:
 | 
			
		||||
                    yield quotes
 | 
			
		||||
 | 
			
		||||
    out_stream = func(
 | 
			
		||||
        filter_by_sym(symbol, stream),
 | 
			
		||||
 | 
			
		||||
        # TODO: do we even need this if we do the feed api right?
 | 
			
		||||
        # shouldn't a local stream do this before we get a handle
 | 
			
		||||
        # to the async iterable? it's that or we do some kinda
 | 
			
		||||
        # async itertools style?
 | 
			
		||||
        filter_quotes_by_sym(symbol, quote_stream),
 | 
			
		||||
        feed.shm,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -129,11 +140,11 @@ async def fsp_compute(
 | 
			
		|||
        np.arange(len(history_output)),
 | 
			
		||||
        dtype=dst.array.dtype
 | 
			
		||||
    )
 | 
			
		||||
    history[fsp_func_name] = history_output
 | 
			
		||||
    history[func_name] = history_output
 | 
			
		||||
 | 
			
		||||
    # check for data length mis-allignment and fill missing values
 | 
			
		||||
    diff = len(src.array) - len(history)
 | 
			
		||||
    if diff >= 0:
 | 
			
		||||
    if diff > 0:
 | 
			
		||||
        log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}")
 | 
			
		||||
        for _ in range(diff):
 | 
			
		||||
            dst.push(history[:1])
 | 
			
		||||
| 
						 | 
				
			
			@ -141,36 +152,44 @@ async def fsp_compute(
 | 
			
		|||
    # compare with source signal and time align
 | 
			
		||||
    index = dst.push(history)
 | 
			
		||||
 | 
			
		||||
    await ctx.send_yield(index)
 | 
			
		||||
 | 
			
		||||
    # setup a respawn handle
 | 
			
		||||
    with trio.CancelScope() as cs:
 | 
			
		||||
        task_status.started(cs)
 | 
			
		||||
        task_status.started((cs, index))
 | 
			
		||||
 | 
			
		||||
        import time
 | 
			
		||||
        last = time.time()
 | 
			
		||||
 | 
			
		||||
        # rt stream
 | 
			
		||||
        async for processed in out_stream:
 | 
			
		||||
 | 
			
		||||
            # period = time.time() - last
 | 
			
		||||
            # hz = 1/period if period else float('nan')
 | 
			
		||||
            # if hz > 60:
 | 
			
		||||
            #     log.info(f'FSP quote too fast: {hz}')
 | 
			
		||||
            period = time.time() - last
 | 
			
		||||
            hz = 1/period if period else float('nan')
 | 
			
		||||
            if hz > 60:
 | 
			
		||||
                log.info(f'FSP quote too fast: {hz}')
 | 
			
		||||
 | 
			
		||||
            log.debug(f"{fsp_func_name}: {processed}")
 | 
			
		||||
            log.debug(f"{func_name}: {processed}")
 | 
			
		||||
            index = src.index
 | 
			
		||||
            dst.array[-1][fsp_func_name] = processed
 | 
			
		||||
            dst.array[-1][func_name] = processed
 | 
			
		||||
 | 
			
		||||
            # stream latest shm array index entry
 | 
			
		||||
            await ctx.send_yield(index)
 | 
			
		||||
            # stream latest array index entry which basically just acts
 | 
			
		||||
            # as trigger msg to tell the consumer to read from shm
 | 
			
		||||
            await stream.send(index)
 | 
			
		||||
 | 
			
		||||
            last = time.time()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.stream
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def cascade(
 | 
			
		||||
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    brokername: str,
 | 
			
		||||
 | 
			
		||||
    src_shm_token: dict,
 | 
			
		||||
    dst_shm_token: Tuple[str, np.dtype],
 | 
			
		||||
 | 
			
		||||
    symbol: str,
 | 
			
		||||
    fsp_func_name: str,
 | 
			
		||||
    func_name: str,
 | 
			
		||||
 | 
			
		||||
    loglevel: Optional[str] = None,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
| 
						 | 
				
			
			@ -184,39 +203,50 @@ async def cascade(
 | 
			
		|||
    src = attach_shm_array(token=src_shm_token)
 | 
			
		||||
    dst = attach_shm_array(readonly=False, token=dst_shm_token)
 | 
			
		||||
 | 
			
		||||
    func: Callable = _fsps[fsp_func_name]
 | 
			
		||||
    func: Callable = _fsp_builtins.get(func_name)
 | 
			
		||||
    if not func:
 | 
			
		||||
        # TODO: assume it's a func target path
 | 
			
		||||
        raise ValueError('Unknown fsp target: {func_name}')
 | 
			
		||||
 | 
			
		||||
    # open a data feed stream with requested broker
 | 
			
		||||
    async with data.feed.maybe_open_feed(
 | 
			
		||||
        brokername,
 | 
			
		||||
        [symbol],
 | 
			
		||||
 | 
			
		||||
        # TODO:
 | 
			
		||||
        # TODO throttle tick outputs from *this* daemon since
 | 
			
		||||
        # it'll emit tons of ticks due to the throttle only
 | 
			
		||||
        # limits quote arrival periods, so the consumer of *this*
 | 
			
		||||
        # needs to get throttled the ticks we generate.
 | 
			
		||||
        # tick_throttle=60,
 | 
			
		||||
 | 
			
		||||
    ) as (feed, stream):
 | 
			
		||||
    ) as (feed, quote_stream):
 | 
			
		||||
 | 
			
		||||
        assert src.token == feed.shm.token
 | 
			
		||||
 | 
			
		||||
        last_len = new_len = len(src.array)
 | 
			
		||||
 | 
			
		||||
        fsp_target = partial(
 | 
			
		||||
            fsp_compute,
 | 
			
		||||
            ctx=ctx,
 | 
			
		||||
            symbol=symbol,
 | 
			
		||||
            feed=feed,
 | 
			
		||||
            stream=stream,
 | 
			
		||||
        async with (
 | 
			
		||||
            ctx.open_stream() as stream,
 | 
			
		||||
            trio.open_nursery() as n,
 | 
			
		||||
        ):
 | 
			
		||||
 | 
			
		||||
            src=src,
 | 
			
		||||
            dst=dst,
 | 
			
		||||
            fsp_target = partial(
 | 
			
		||||
 | 
			
		||||
            fsp_func_name=fsp_func_name,
 | 
			
		||||
            func=func
 | 
			
		||||
        )
 | 
			
		||||
                fsp_compute,
 | 
			
		||||
                stream=stream,
 | 
			
		||||
                symbol=symbol,
 | 
			
		||||
                feed=feed,
 | 
			
		||||
                quote_stream=quote_stream,
 | 
			
		||||
 | 
			
		||||
        async with trio.open_nursery() as n:
 | 
			
		||||
                # shm
 | 
			
		||||
                src=src,
 | 
			
		||||
                dst=dst,
 | 
			
		||||
 | 
			
		||||
            cs = await n.start(fsp_target)
 | 
			
		||||
                func_name=func_name,
 | 
			
		||||
                func=func
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            cs, index = await n.start(fsp_target)
 | 
			
		||||
            await ctx.started(index)
 | 
			
		||||
 | 
			
		||||
            # Increment the underlying shared memory buffer on every
 | 
			
		||||
            # "increment" msg received from the underlying data feed.
 | 
			
		||||
| 
						 | 
				
			
			@ -229,17 +259,15 @@ async def cascade(
 | 
			
		|||
                    if new_len > last_len + 1:
 | 
			
		||||
                        # respawn the signal compute task if the source
 | 
			
		||||
                        # signal has been updated
 | 
			
		||||
                        log.warning(f'Re-spawning fsp {func_name}')
 | 
			
		||||
                        cs.cancel()
 | 
			
		||||
                        cs = await n.start(fsp_target)
 | 
			
		||||
                        cs, index = await n.start(fsp_target)
 | 
			
		||||
 | 
			
		||||
                        # TODO: adopt an incremental update engine/approach
 | 
			
		||||
                        # where possible here eventually!
 | 
			
		||||
 | 
			
		||||
                    # read out last shm row
 | 
			
		||||
                    # read out last shm row, copy and write new row
 | 
			
		||||
                    array = dst.array
 | 
			
		||||
                    last = array[-1:].copy()
 | 
			
		||||
 | 
			
		||||
                    # write new row to the shm buffer
 | 
			
		||||
                    dst.push(last)
 | 
			
		||||
 | 
			
		||||
                    last_len = new_len
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue