Add signal backfilling via trio task respawn
							parent
							
								
									47856666b5
								
							
						
					
					
						commit
						7cf3c4a86c
					
				| 
						 | 
					@ -20,6 +20,7 @@ Financial signal processing for the peeps.
 | 
				
			||||||
from typing import AsyncIterator, Callable, Tuple
 | 
					from typing import AsyncIterator, Callable, Tuple
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import trio
 | 
					import trio
 | 
				
			||||||
 | 
					from trio_typing import TaskStatus
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
import numpy as np
 | 
					import numpy as np
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -75,6 +76,7 @@ async def increment_signals(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # write new slot to the buffer
 | 
					        # write new slot to the buffer
 | 
				
			||||||
        dst_shm.push(last)
 | 
					        dst_shm.push(last)
 | 
				
			||||||
 | 
					        len(dst_shm.array)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@tractor.stream
 | 
					@tractor.stream
 | 
				
			||||||
| 
						 | 
					@ -99,9 +101,19 @@ async def cascade(
 | 
				
			||||||
    async with data.open_feed(brokername, [symbol]) as feed:
 | 
					    async with data.open_feed(brokername, [symbol]) as feed:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        assert src.token == feed.shm.token
 | 
					        assert src.token == feed.shm.token
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        async def fsp_compute(
 | 
				
			||||||
 | 
					            task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
 | 
				
			||||||
 | 
					        ) -> None:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: load appropriate fsp with input args
 | 
					            # TODO: load appropriate fsp with input args
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async def filter_by_sym(sym, stream):
 | 
					            async def filter_by_sym(
 | 
				
			||||||
 | 
					                sym: str,
 | 
				
			||||||
 | 
					                stream,
 | 
				
			||||||
 | 
					            ):
 | 
				
			||||||
 | 
					                # task cancellation won't kill the channel
 | 
				
			||||||
 | 
					                async with stream.shield():
 | 
				
			||||||
                    async for quotes in stream:
 | 
					                    async for quotes in stream:
 | 
				
			||||||
                        for symbol, quotes in quotes.items():
 | 
					                        for symbol, quotes in quotes.items():
 | 
				
			||||||
                            if symbol == sym:
 | 
					                            if symbol == sym:
 | 
				
			||||||
| 
						 | 
					@ -122,7 +134,9 @@ async def cascade(
 | 
				
			||||||
            # - developing some system on top of the shared mem array that
 | 
					            # - developing some system on top of the shared mem array that
 | 
				
			||||||
            #   is `index` aware such that historical data can be indexed
 | 
					            #   is `index` aware such that historical data can be indexed
 | 
				
			||||||
            #   relative to the true first datum? Not sure if this is sane
 | 
					            #   relative to the true first datum? Not sure if this is sane
 | 
				
			||||||
        #   for derivatives.
 | 
					            #   for incremental compuations.
 | 
				
			||||||
 | 
					            dst._first.value = src._first.value
 | 
				
			||||||
 | 
					            dst._last.value = src._first.value
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Conduct a single iteration of fsp with historical bars input
 | 
					            # Conduct a single iteration of fsp with historical bars input
 | 
				
			||||||
            # and get historical output
 | 
					            # and get historical output
 | 
				
			||||||
| 
						 | 
					@ -136,6 +150,7 @@ async def cascade(
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            history[fsp_func_name] = history_output
 | 
					            history[fsp_func_name] = history_output
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # check for data length mis-allignment and fill missing values
 | 
					            # check for data length mis-allignment and fill missing values
 | 
				
			||||||
            diff = len(src.array) - len(history)
 | 
					            diff = len(src.array) - len(history)
 | 
				
			||||||
            if diff >= 0:
 | 
					            if diff >= 0:
 | 
				
			||||||
| 
						 | 
					@ -146,13 +161,47 @@ async def cascade(
 | 
				
			||||||
            # compare with source signal and time align
 | 
					            # compare with source signal and time align
 | 
				
			||||||
            index = dst.push(history)
 | 
					            index = dst.push(history)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        yield index
 | 
					            await ctx.send_yield(index)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with trio.open_nursery() as n:
 | 
					            # setup a respawn handle
 | 
				
			||||||
            n.start_soon(increment_signals, feed, dst)
 | 
					            with trio.CancelScope() as cs:
 | 
				
			||||||
 | 
					                task_status.started(cs)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # rt stream
 | 
				
			||||||
                async for processed in out_stream:
 | 
					                async for processed in out_stream:
 | 
				
			||||||
                    log.debug(f"{fsp_func_name}: {processed}")
 | 
					                    log.debug(f"{fsp_func_name}: {processed}")
 | 
				
			||||||
                    index = src.index
 | 
					                    index = src.index
 | 
				
			||||||
                    dst.array[-1][fsp_func_name] = processed
 | 
					                    dst.array[-1][fsp_func_name] = processed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # stream latest shm array index entry
 | 
				
			||||||
                    await ctx.send_yield(index)
 | 
					                    await ctx.send_yield(index)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        last_len = new_len = len(src.array)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        async with trio.open_nursery() as n:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            cs = await n.start(fsp_compute)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Increment the underlying shared memory buffer on every "increment"
 | 
				
			||||||
 | 
					            # msg received from the underlying data feed.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            async for msg in await feed.index_stream():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                new_len = len(src.array)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if new_len > last_len + 1:
 | 
				
			||||||
 | 
					                    # respawn the signal compute task if the source
 | 
				
			||||||
 | 
					                    # signal has been updated
 | 
				
			||||||
 | 
					                    cs.cancel()
 | 
				
			||||||
 | 
					                    cs = await n.start(fsp_compute)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # TODO: adopt an incremental update engine/approach
 | 
				
			||||||
 | 
					                    # where possible here eventually!
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                array = dst.array
 | 
				
			||||||
 | 
					                last = array[-1:].copy()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # write new slot to the buffer
 | 
				
			||||||
 | 
					                dst.push(last)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                last_len = new_len
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -151,8 +151,8 @@ def wma(
 | 
				
			||||||
    return np.convolve(signal, weights, 'valid')
 | 
					    return np.convolve(signal, weights, 'valid')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# @piker.fsp(
 | 
					# @piker.fsp.signal(
 | 
				
			||||||
    # aggregates=[60, 60*5, 60*60, '4H', '1D'],
 | 
					#     timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
 | 
				
			||||||
# )
 | 
					# )
 | 
				
			||||||
async def _rsi(
 | 
					async def _rsi(
 | 
				
			||||||
    source: 'QuoteStream[Dict[str, Any]]',  # noqa
 | 
					    source: 'QuoteStream[Dict[str, Any]]',  # noqa
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -558,7 +558,9 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: see how this handles with custom ohlcv bars graphics
 | 
					            # TODO: see how this handles with custom ohlcv bars graphics
 | 
				
			||||||
            # and/or if we can implement something similar for OHLC graphics
 | 
					            # and/or if we can implement something similar for OHLC graphics
 | 
				
			||||||
            clipToView=True,
 | 
					            # clipToView=True,
 | 
				
			||||||
 | 
					            autoDownsample=True,
 | 
				
			||||||
 | 
					            downsampleMethod='subsample',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            **pdi_kwargs,
 | 
					            **pdi_kwargs,
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
| 
						 | 
					@ -1221,9 +1223,23 @@ async def update_signals(
 | 
				
			||||||
    # update chart graphics
 | 
					    # update chart graphics
 | 
				
			||||||
    async for value in stream:
 | 
					    async for value in stream:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # TODO: provide a read sync mechanism to avoid this polling.
 | 
				
			||||||
 | 
					        # the underlying issue is that a backfill and subsequent shm
 | 
				
			||||||
 | 
					        # array first/last index update could result in an empty array
 | 
				
			||||||
 | 
					        # read here since the stream is never torn down on the
 | 
				
			||||||
 | 
					        # re-compute steps.
 | 
				
			||||||
 | 
					        read_tries = 2
 | 
				
			||||||
 | 
					        while read_tries > 0:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
                # read last
 | 
					                # read last
 | 
				
			||||||
                array = shm.array
 | 
					                array = shm.array
 | 
				
			||||||
                value = array[-1][fsp_func_name]
 | 
					                value = array[-1][fsp_func_name]
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            except IndexError:
 | 
				
			||||||
 | 
					                read_tries -= 1
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if last_val_sticky:
 | 
					        if last_val_sticky:
 | 
				
			||||||
            last_val_sticky.update_from_data(-1, value)
 | 
					            last_val_sticky.update_from_data(-1, value)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue