Update fsps and overlays inside main OHLC chart update loop
							parent
							
								
									3095d602e4
								
							
						
					
					
						commit
						f98733118b
					
				| 
						 | 
					@ -59,9 +59,58 @@ _clear_throttle_rate: int = 58  # Hz
 | 
				
			||||||
_book_throttle_rate: int = 16  # Hz
 | 
					_book_throttle_rate: int = 16  # Hz
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def update_fsp_chart(
 | 
				
			||||||
 | 
					    chart: ChartPlotWidget,
 | 
				
			||||||
 | 
					    shm: ShmArray,
 | 
				
			||||||
 | 
					    display_name: str,
 | 
				
			||||||
 | 
					    array_key: Optional[str],
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    array = shm.array
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # XXX: is this a problem any more after porting to the
 | 
				
			||||||
 | 
					    # ``tractor.Context`` api or can we remove it?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TODO: provide a read sync mechanism to avoid this polling.  the
 | 
				
			||||||
 | 
					    # underlying issue is that a backfill (aka prepend) and subsequent
 | 
				
			||||||
 | 
					    # shm array first/last index update could result in an empty array
 | 
				
			||||||
 | 
					    # read here since the stream is never torn down on the re-compute
 | 
				
			||||||
 | 
					    # steps.
 | 
				
			||||||
 | 
					    # read_tries = 2
 | 
				
			||||||
 | 
					    # while read_tries > 0:
 | 
				
			||||||
 | 
					    #     try:
 | 
				
			||||||
 | 
					    #         # read last
 | 
				
			||||||
 | 
					    #         array = shm.array
 | 
				
			||||||
 | 
					    #         value = array[-1][array_key]
 | 
				
			||||||
 | 
					    #         break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #     except IndexError:
 | 
				
			||||||
 | 
					    #         read_tries -= 1
 | 
				
			||||||
 | 
					    #         continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # update graphics
 | 
				
			||||||
 | 
					    chart.update_curve_from_array(
 | 
				
			||||||
 | 
					        display_name,
 | 
				
			||||||
 | 
					        array,
 | 
				
			||||||
 | 
					        array_key=array_key or display_name,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    last_val_sticky = chart._ysticks.get(display_name)
 | 
				
			||||||
 | 
					    if last_val_sticky:
 | 
				
			||||||
 | 
					        # read from last calculated value
 | 
				
			||||||
 | 
					        # XXX: fsp func names must be unique meaning we don't have
 | 
				
			||||||
 | 
					        # duplicates of the underlying data even if multiple
 | 
				
			||||||
 | 
					        # sub-charts reference it under different 'named charts'.
 | 
				
			||||||
 | 
					        array = shm.array[array_key]
 | 
				
			||||||
 | 
					        if len(array):
 | 
				
			||||||
 | 
					            value = array[-1]
 | 
				
			||||||
 | 
					            last_val_sticky.update_from_data(-1, value)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def update_chart_from_quotes(
 | 
					async def update_chart_from_quotes(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    chart: ChartPlotWidget,
 | 
					    linked: LinkedSplits,
 | 
				
			||||||
    stream: tractor.MsgStream,
 | 
					    stream: tractor.MsgStream,
 | 
				
			||||||
    ohlcv: np.ndarray,
 | 
					    ohlcv: np.ndarray,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -84,6 +133,8 @@ async def update_chart_from_quotes(
 | 
				
			||||||
    # - 1-5 sec bar lookback-autocorrection like tws does?
 | 
					    # - 1-5 sec bar lookback-autocorrection like tws does?
 | 
				
			||||||
    #   (would require a background history checker task)
 | 
					    #   (would require a background history checker task)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    chart = linked.chart
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # update last price sticky
 | 
					    # update last price sticky
 | 
				
			||||||
    last_price_sticky = chart._ysticks[chart.name]
 | 
					    last_price_sticky = chart._ysticks[chart.name]
 | 
				
			||||||
    last_price_sticky.update_from_data(
 | 
					    last_price_sticky.update_from_data(
 | 
				
			||||||
| 
						 | 
					@ -310,6 +361,33 @@ async def update_chart_from_quotes(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                last_mx, last_mn = mx, mn
 | 
					                last_mx, last_mn = mx, mn
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # run synchronous update on all derived fsp subplots
 | 
				
			||||||
 | 
					            # print(f'subplots: {linked.subplots.keys()}')
 | 
				
			||||||
 | 
					            for name, subchart in linked.subplots.items():
 | 
				
			||||||
 | 
					                update_fsp_chart(
 | 
				
			||||||
 | 
					                    subchart,
 | 
				
			||||||
 | 
					                    subchart._shm,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # XXX: do we really needs seperate names here?
 | 
				
			||||||
 | 
					                    name,
 | 
				
			||||||
 | 
					                    array_key=name,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # TODO: all overlays on all subplots..
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # run synchronous update on all derived overlays
 | 
				
			||||||
 | 
					            # print(f'overlays: {chart._overlays}')
 | 
				
			||||||
 | 
					            for name, shm in chart._overlays.items():
 | 
				
			||||||
 | 
					                update_fsp_chart(
 | 
				
			||||||
 | 
					                    chart,
 | 
				
			||||||
 | 
					                    shm,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # XXX: do we really needs seperate names here?
 | 
				
			||||||
 | 
					                    name,
 | 
				
			||||||
 | 
					                    array_key=name,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def maybe_mk_fsp_shm(
 | 
					def maybe_mk_fsp_shm(
 | 
				
			||||||
    sym: str,
 | 
					    sym: str,
 | 
				
			||||||
| 
						 | 
					@ -318,7 +396,8 @@ def maybe_mk_fsp_shm(
 | 
				
			||||||
    readonly: bool = True,
 | 
					    readonly: bool = True,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> (ShmArray, bool):
 | 
					) -> (ShmArray, bool):
 | 
				
			||||||
    '''Allocate a single row shm array for an symbol-fsp pair.
 | 
					    '''Allocate a single row shm array for an symbol-fsp pair if none
 | 
				
			||||||
 | 
					    exists, otherwise load the shm already existing for that token.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    uid = tractor.current_actor().uid
 | 
					    uid = tractor.current_actor().uid
 | 
				
			||||||
| 
						 | 
					@ -436,7 +515,6 @@ async def open_fspd_cluster(
 | 
				
			||||||
        tractor.open_nursery() as n,
 | 
					        tractor.open_nursery() as n,
 | 
				
			||||||
        trio.open_nursery() as ln,
 | 
					        trio.open_nursery() as ln,
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
 | 
					 | 
				
			||||||
        # Currently we spawn an actor per fsp chain but
 | 
					        # Currently we spawn an actor per fsp chain but
 | 
				
			||||||
        # likely we'll want to pool them eventually to
 | 
					        # likely we'll want to pool them eventually to
 | 
				
			||||||
        # scale horizonatlly once cores are used up.
 | 
					        # scale horizonatlly once cores are used up.
 | 
				
			||||||
| 
						 | 
					@ -456,8 +534,6 @@ async def open_fspd_cluster(
 | 
				
			||||||
            # "feeds".  assert opened, f"A chart for {key} likely
 | 
					            # "feeds".  assert opened, f"A chart for {key} likely
 | 
				
			||||||
            # already exists?"
 | 
					            # already exists?"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # conf['shm'] = shm
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            portal = await n.start_actor(
 | 
					            portal = await n.start_actor(
 | 
				
			||||||
                enable_modules=['piker.fsp._engine'],
 | 
					                enable_modules=['piker.fsp._engine'],
 | 
				
			||||||
                name='fsp.' + display_name,
 | 
					                name='fsp.' + display_name,
 | 
				
			||||||
| 
						 | 
					@ -543,6 +619,8 @@ async def update_chart_from_fsp(
 | 
				
			||||||
                overlay=True,
 | 
					                overlay=True,
 | 
				
			||||||
                color='default_light',
 | 
					                color='default_light',
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					            # specially store ref to shm for lookup in display loop
 | 
				
			||||||
 | 
					            chart._overlays[display_name] = shm
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            chart = linkedsplits.add_plot(
 | 
					            chart = linkedsplits.add_plot(
 | 
				
			||||||
| 
						 | 
					@ -636,52 +714,6 @@ async def update_chart_from_fsp(
 | 
				
			||||||
                last = time.time()
 | 
					                last = time.time()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def update_fsp_chart(
 | 
					 | 
				
			||||||
    chart: ChartPlotWidget,
 | 
					 | 
				
			||||||
    shm: ShmArray,
 | 
					 | 
				
			||||||
    display_name: str,
 | 
					 | 
				
			||||||
    array_key: str,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    array = shm.array
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # XXX: is this a problem any more after porting to the
 | 
					 | 
				
			||||||
    # ``tractor.Context`` api?
 | 
					 | 
				
			||||||
    # TODO: provide a read sync mechanism to avoid this polling.  the
 | 
					 | 
				
			||||||
    # underlying issue is that a backfill (aka prepend) and subsequent
 | 
					 | 
				
			||||||
    # shm array first/last index update could result in an empty array
 | 
					 | 
				
			||||||
    # read here since the stream is never torn down on the re-compute
 | 
					 | 
				
			||||||
    # steps.
 | 
					 | 
				
			||||||
    # read_tries = 2
 | 
					 | 
				
			||||||
    # while read_tries > 0:
 | 
					 | 
				
			||||||
    #     try:
 | 
					 | 
				
			||||||
    #         # read last
 | 
					 | 
				
			||||||
    #         array = shm.array
 | 
					 | 
				
			||||||
    #         value = array[-1][array_key]
 | 
					 | 
				
			||||||
    #         break
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    #     except IndexError:
 | 
					 | 
				
			||||||
    #         read_tries -= 1
 | 
					 | 
				
			||||||
    #         continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # update graphics
 | 
					 | 
				
			||||||
    chart.update_curve_from_array(
 | 
					 | 
				
			||||||
        display_name,
 | 
					 | 
				
			||||||
        array,
 | 
					 | 
				
			||||||
        array_key=array_key,
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    last_val_sticky = chart._ysticks.get(display_name)
 | 
					 | 
				
			||||||
    if last_val_sticky:
 | 
					 | 
				
			||||||
        # read from last calculated value
 | 
					 | 
				
			||||||
        # XXX: fsp func names must be unique meaning we don't have
 | 
					 | 
				
			||||||
        # duplicates of the underlying data even if multiple
 | 
					 | 
				
			||||||
        # sub-charts reference it under different 'named charts'.
 | 
					 | 
				
			||||||
        value = shm.array[array_key][-1]
 | 
					 | 
				
			||||||
        last_val_sticky.update_from_data(-1, value)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
async def check_for_new_bars(feed, ohlcv, linkedsplits):
 | 
					async def check_for_new_bars(feed, ohlcv, linkedsplits):
 | 
				
			||||||
    """Task which updates from new bars in the shared ohlcv buffer every
 | 
					    """Task which updates from new bars in the shared ohlcv buffer every
 | 
				
			||||||
    ``delay_s`` seconds.
 | 
					    ``delay_s`` seconds.
 | 
				
			||||||
| 
						 | 
					@ -1002,7 +1034,7 @@ async def display_symbol_data(
 | 
				
			||||||
            # start graphics update loop(s)after receiving first live quote
 | 
					            # start graphics update loop(s)after receiving first live quote
 | 
				
			||||||
            ln.start_soon(
 | 
					            ln.start_soon(
 | 
				
			||||||
                update_chart_from_quotes,
 | 
					                update_chart_from_quotes,
 | 
				
			||||||
                chart,
 | 
					                linkedsplits,
 | 
				
			||||||
                feed.stream,
 | 
					                feed.stream,
 | 
				
			||||||
                ohlcv,
 | 
					                ohlcv,
 | 
				
			||||||
                wap_in_history,
 | 
					                wap_in_history,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue