Spawn and cache an fsp cluster ahead of time
Use a fixed worker count and don't respawn for every chart, instead opting for a round-robin to tasks in a cluster and (for now) hoping for the best in terms of trio scheduling, though we should obviously route via symbol-locality next. This is currently a boon for chart spawning startup times since actor creation is done AOT. Additionally, - use `zero_on_step` for dollar volume - drop rsi on startup (again) - add dollar volume (via fsp) along side unit volume - litter more profiling to fsp chart startup sequence - pre-define tick type classes for update loopwin_fixes
							parent
							
								
									65a645bdde
								
							
						
					
					
						commit
						6bea1b1adf
					
				| 
						 | 
					@ -21,22 +21,23 @@ this module ties together quote and computational (fsp) streams with
 | 
				
			||||||
graphics update methods via our custom ``pyqtgraph`` charting api.
 | 
					graphics update methods via our custom ``pyqtgraph`` charting api.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
from contextlib import asynccontextmanager
 | 
					from contextlib import asynccontextmanager as acm
 | 
				
			||||||
from functools import partial
 | 
					from functools import partial
 | 
				
			||||||
 | 
					from itertools import cycle
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
from types import ModuleType
 | 
					from types import ModuleType
 | 
				
			||||||
from typing import Optional
 | 
					from typing import Optional, AsyncGenerator
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import numpy as np
 | 
					import numpy as np
 | 
				
			||||||
from pydantic import create_model
 | 
					from pydantic import create_model
 | 
				
			||||||
 | 
					import pyqtgraph as pg
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
import trio
 | 
					import trio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from .. import brokers
 | 
					from .. import brokers
 | 
				
			||||||
from ..data.feed import (
 | 
					from .._cacheables import maybe_open_ctx
 | 
				
			||||||
    open_feed,
 | 
					from ..trionics import async_enter_all
 | 
				
			||||||
    # Feed,
 | 
					from ..data.feed import open_feed
 | 
				
			||||||
)
 | 
					 | 
				
			||||||
from ._chart import (
 | 
					from ._chart import (
 | 
				
			||||||
    ChartPlotWidget,
 | 
					    ChartPlotWidget,
 | 
				
			||||||
    LinkedSplits,
 | 
					    LinkedSplits,
 | 
				
			||||||
| 
						 | 
					@ -70,15 +71,6 @@ def update_fsp_chart(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    array = shm.array
 | 
					    array = shm.array
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # update graphics
 | 
					 | 
				
			||||||
    # NOTE: this does a length check internally which allows it
 | 
					 | 
				
			||||||
    # staying above the last row check below..
 | 
					 | 
				
			||||||
    chart.update_curve_from_array(
 | 
					 | 
				
			||||||
        graphics_name,
 | 
					 | 
				
			||||||
        array,
 | 
					 | 
				
			||||||
        array_key=array_key or graphics_name,
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        last_row = array[-1]
 | 
					        last_row = array[-1]
 | 
				
			||||||
    except IndexError:
 | 
					    except IndexError:
 | 
				
			||||||
| 
						 | 
					@ -95,22 +87,15 @@ def update_fsp_chart(
 | 
				
			||||||
        log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
 | 
					        log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
 | 
				
			||||||
        return
 | 
					        return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: provide a read sync mechanism to avoid this polling. the
 | 
					    # update graphics
 | 
				
			||||||
    # underlying issue is that a backfill (aka prepend) and subsequent
 | 
					    # NOTE: this does a length check internally which allows it
 | 
				
			||||||
    # shm array first/last index update could result in an empty array
 | 
					    # staying above the last row check below..
 | 
				
			||||||
    # read here since the stream is never torn down on the re-compute
 | 
					    chart.update_curve_from_array(
 | 
				
			||||||
    # steps.
 | 
					        graphics_name,
 | 
				
			||||||
    # read_tries = 2
 | 
					        array,
 | 
				
			||||||
    # while read_tries > 0:
 | 
					        array_key=array_key or graphics_name,
 | 
				
			||||||
    #     try:
 | 
					    )
 | 
				
			||||||
    #         # read last
 | 
					    chart._set_yrange()
 | 
				
			||||||
    #         array = shm.array
 | 
					 | 
				
			||||||
    #         value = array[-1][array_key]
 | 
					 | 
				
			||||||
    #         break
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    #     except IndexError:
 | 
					 | 
				
			||||||
    #         read_tries -= 1
 | 
					 | 
				
			||||||
    #         continue
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # XXX: re: ``array_key``: fsp func names must be unique meaning we
 | 
					    # XXX: re: ``array_key``: fsp func names must be unique meaning we
 | 
				
			||||||
    # can't have duplicates of the underlying data even if multiple
 | 
					    # can't have duplicates of the underlying data even if multiple
 | 
				
			||||||
| 
						 | 
					@ -126,24 +111,12 @@ def update_fsp_chart(
 | 
				
			||||||
        last_val_sticky.update_from_data(-1, last)
 | 
					        last_val_sticky.update_from_data(-1, last)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# _clses = {
 | 
					# a working tick-type-classes template
 | 
				
			||||||
#     'clears': {'trade', 'utrade', 'last'},
 | 
					_tick_groups = {
 | 
				
			||||||
#     'last': {'last'},
 | 
					    'clears': {'trade', 'utrade', 'last'},
 | 
				
			||||||
#     'bids': {'bid', 'bsize'},
 | 
					    'bids': {'bid', 'bsize'},
 | 
				
			||||||
#     'asks': {'ask', 'asize'},
 | 
					    'asks': {'ask', 'asize'},
 | 
				
			||||||
# }
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
# XXX: idea for frame type data structure we could use on the
 | 
					 | 
				
			||||||
# wire instead of doing it here?
 | 
					 | 
				
			||||||
# frames = {
 | 
					 | 
				
			||||||
#     'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'],
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
#     'type_a': [tick0, tick1, tick2, .., tickn],
 | 
					 | 
				
			||||||
#     'type_b': [tick0, tick1, tick2, .., tickn],
 | 
					 | 
				
			||||||
#     'type_c': [tick0, tick1, tick2, .., tickn],
 | 
					 | 
				
			||||||
#     ...
 | 
					 | 
				
			||||||
#     'type_n': [tick0, tick1, tick2, .., tickn],
 | 
					 | 
				
			||||||
# }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def chart_maxmin(
 | 
					def chart_maxmin(
 | 
				
			||||||
| 
						 | 
					@ -263,8 +236,12 @@ async def update_chart_from_quotes(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        now = time.time()
 | 
					        now = time.time()
 | 
				
			||||||
        quote_period = now - last_quote
 | 
					        quote_period = now - last_quote
 | 
				
			||||||
        if quote_period <= 1/_quote_throttle_rate:
 | 
					        quote_rate = round(1/quote_period, 1) if quote_period else float('inf')
 | 
				
			||||||
            log.warning(f'TOO FAST: {1/quote_period}')
 | 
					        if (
 | 
				
			||||||
 | 
					            quote_period <= 1/_quote_throttle_rate
 | 
				
			||||||
 | 
					            and quote_rate > _quote_throttle_rate + 2
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					            log.warning(f'High quote rate {symbol.key}: {quote_rate}')
 | 
				
			||||||
        last_quote = now
 | 
					        last_quote = now
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # chart isn't active/shown so skip render cycle and pause feed(s)
 | 
					        # chart isn't active/shown so skip render cycle and pause feed(s)
 | 
				
			||||||
| 
						 | 
					@ -291,15 +268,7 @@ async def update_chart_from_quotes(
 | 
				
			||||||
            array = ohlcv.array
 | 
					            array = ohlcv.array
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if vlm_chart:
 | 
					            if vlm_chart:
 | 
				
			||||||
                # print(f"volume: {end['volume']}")
 | 
					 | 
				
			||||||
                vlm_chart.update_curve_from_array('volume', array)
 | 
					                vlm_chart.update_curve_from_array('volume', array)
 | 
				
			||||||
 | 
					 | 
				
			||||||
                # built-in tina $vlm FSP using chl3 typical price for ohlc step
 | 
					 | 
				
			||||||
                # last = array[-1]
 | 
					 | 
				
			||||||
                # chl3 = (last['close'] + last['high'] + last['low']) / 3
 | 
					 | 
				
			||||||
                # v = last['volume']
 | 
					 | 
				
			||||||
                # dv = last['volume'] * chl3
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
 | 
					                vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if (
 | 
					                if (
 | 
				
			||||||
| 
						 | 
					@ -346,7 +315,7 @@ async def update_chart_from_quotes(
 | 
				
			||||||
            # TODO: eventually we want to separate out the utrade (aka
 | 
					            # TODO: eventually we want to separate out the utrade (aka
 | 
				
			||||||
            # dark vlm prices) here and show them as an additional
 | 
					            # dark vlm prices) here and show them as an additional
 | 
				
			||||||
            # graphic.
 | 
					            # graphic.
 | 
				
			||||||
            clear_types = {'trade', 'utrade', 'last'}
 | 
					            clear_types = _tick_groups['clears']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # XXX: if we wanted to iterate in "latest" (i.e. most
 | 
					            # XXX: if we wanted to iterate in "latest" (i.e. most
 | 
				
			||||||
            # current) tick first order as an optimization where we only
 | 
					            # current) tick first order as an optimization where we only
 | 
				
			||||||
| 
						 | 
					@ -415,11 +384,11 @@ async def update_chart_from_quotes(
 | 
				
			||||||
                        # label.size -= size
 | 
					                        # label.size -= size
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # elif ticktype in ('ask', 'asize'):
 | 
					                # elif ticktype in ('ask', 'asize'):
 | 
				
			||||||
                elif typ in ('ask', 'asize'):
 | 
					                elif typ in _tick_groups['asks']:
 | 
				
			||||||
                    l1.ask_label.update_fields({'level': price, 'size': size})
 | 
					                    l1.ask_label.update_fields({'level': price, 'size': size})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # elif ticktype in ('bid', 'bsize'):
 | 
					                # elif ticktype in ('bid', 'bsize'):
 | 
				
			||||||
                elif typ in ('bid', 'bsize'):
 | 
					                elif typ in _tick_groups['bids']:
 | 
				
			||||||
                    l1.bid_label.update_fields({'level': price, 'size': size})
 | 
					                    l1.bid_label.update_fields({'level': price, 'size': size})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # check for y-range re-size
 | 
					            # check for y-range re-size
 | 
				
			||||||
| 
						 | 
					@ -492,7 +461,7 @@ def maybe_mk_fsp_shm(
 | 
				
			||||||
    return shm, opened
 | 
					    return shm, opened
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@asynccontextmanager
 | 
					@acm
 | 
				
			||||||
async def open_fsp_sidepane(
 | 
					async def open_fsp_sidepane(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    linked: LinkedSplits,
 | 
					    linked: LinkedSplits,
 | 
				
			||||||
| 
						 | 
					@ -558,8 +527,66 @@ async def open_fsp_sidepane(
 | 
				
			||||||
        yield sidepane
 | 
					        yield sidepane
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def open_fspd_cluster(
 | 
					@acm
 | 
				
			||||||
 | 
					async def open_fsp_cluster(
 | 
				
			||||||
 | 
					    workers: int = 2
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    profiler = pg.debug.Profiler(
 | 
				
			||||||
 | 
					        delayed=False,
 | 
				
			||||||
 | 
					        disabled=False
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    portals: dict[str, tractor.Portal] = {}
 | 
				
			||||||
 | 
					    uid = tractor.current_actor().uid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async with tractor.open_nursery() as an:
 | 
				
			||||||
 | 
					        # XXX: fsp may have been opened by a duplicate chart.
 | 
				
			||||||
 | 
					        # Error for now until we figure out how to wrap fsps as
 | 
				
			||||||
 | 
					        # "feeds".  assert opened, f"A chart for {key} likely
 | 
				
			||||||
 | 
					        # already exists?"
 | 
				
			||||||
 | 
					        async with trio.open_nursery() as n:
 | 
				
			||||||
 | 
					            for index in range(workers):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                async def start(i) -> None:
 | 
				
			||||||
 | 
					                    key = f'fsp_{i}.' + '_'.join(uid)
 | 
				
			||||||
 | 
					                    portals[key] = await an.start_actor(
 | 
				
			||||||
 | 
					                        enable_modules=['piker.fsp._engine'],
 | 
				
			||||||
 | 
					                        name=key,
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                n.start_soon(start, index)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        assert len(portals) == workers
 | 
				
			||||||
 | 
					        profiler('started fsp cluster')
 | 
				
			||||||
 | 
					        yield portals
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@acm
 | 
				
			||||||
 | 
					async def maybe_open_fsp_cluster(
 | 
				
			||||||
 | 
					    workers: int = 2,
 | 
				
			||||||
 | 
					    **kwargs,
 | 
				
			||||||
 | 
					) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    uid = tractor.current_actor().uid
 | 
				
			||||||
 | 
					    async with maybe_open_ctx(
 | 
				
			||||||
 | 
					        key=uid,  # for now make a cluster per client?
 | 
				
			||||||
 | 
					        mngr=open_fsp_cluster(
 | 
				
			||||||
 | 
					            workers,
 | 
				
			||||||
 | 
					            # loglevel=loglevel,
 | 
				
			||||||
 | 
					            **kwargs,
 | 
				
			||||||
 | 
					        ),
 | 
				
			||||||
 | 
					    ) as (cache_hit, cluster_map):
 | 
				
			||||||
 | 
					        if cache_hit:
 | 
				
			||||||
 | 
					            log.info('re-using existing fsp cluster')
 | 
				
			||||||
 | 
					            yield cluster_map
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            yield cluster_map
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def start_fsp_displays(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    cluster_map: dict[str, tractor.Portal],
 | 
				
			||||||
    linkedsplits: LinkedSplits,
 | 
					    linkedsplits: LinkedSplits,
 | 
				
			||||||
    fsps: dict[str, str],
 | 
					    fsps: dict[str, str],
 | 
				
			||||||
    sym: str,
 | 
					    sym: str,
 | 
				
			||||||
| 
						 | 
					@ -580,19 +607,22 @@ async def open_fspd_cluster(
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    linkedsplits.focus()
 | 
					    linkedsplits.focus()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # spawns sub-processes which execute cpu bound fsp work
 | 
					    profiler = pg.debug.Profiler(
 | 
				
			||||||
    # which is streamed back to this parent.
 | 
					        delayed=False,
 | 
				
			||||||
    async with (
 | 
					        disabled=False
 | 
				
			||||||
        tractor.open_nursery() as n,
 | 
					    )
 | 
				
			||||||
        trio.open_nursery() as ln,
 | 
					
 | 
				
			||||||
    ):
 | 
					    async with trio.open_nursery() as n:
 | 
				
			||||||
        # Currently we spawn an actor per fsp chain but
 | 
					        # Currently we spawn an actor per fsp chain but
 | 
				
			||||||
        # likely we'll want to pool them eventually to
 | 
					        # likely we'll want to pool them eventually to
 | 
				
			||||||
        # scale horizonatlly once cores are used up.
 | 
					        # scale horizonatlly once cores are used up.
 | 
				
			||||||
        for display_name, conf in fsps.items():
 | 
					        for (display_name, conf), (name, portal) in zip(
 | 
				
			||||||
 | 
					            fsps.items(),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # rr to cluster for now..
 | 
				
			||||||
 | 
					            cycle(cluster_map.items()),
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
            func_name = conf['func_name']
 | 
					            func_name = conf['func_name']
 | 
				
			||||||
 | 
					 | 
				
			||||||
            shm, opened = maybe_mk_fsp_shm(
 | 
					            shm, opened = maybe_mk_fsp_shm(
 | 
				
			||||||
                sym,
 | 
					                sym,
 | 
				
			||||||
                field_name=func_name,
 | 
					                field_name=func_name,
 | 
				
			||||||
| 
						 | 
					@ -600,18 +630,17 @@ async def open_fspd_cluster(
 | 
				
			||||||
                readonly=True,
 | 
					                readonly=True,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            profiler(f'created shm for fsp actor: {display_name}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # XXX: fsp may have been opened by a duplicate chart.
 | 
					            # XXX: fsp may have been opened by a duplicate chart.
 | 
				
			||||||
            # Error for now until we figure out how to wrap fsps as
 | 
					            # Error for now until we figure out how to wrap fsps as
 | 
				
			||||||
            # "feeds".  assert opened, f"A chart for {key} likely
 | 
					            # "feeds".  assert opened, f"A chart for {key} likely
 | 
				
			||||||
            # already exists?"
 | 
					            # already exists?"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            portal = await n.start_actor(
 | 
					            profiler(f'attached to fsp portal: {display_name}')
 | 
				
			||||||
                enable_modules=['piker.fsp._engine'],
 | 
					 | 
				
			||||||
                name='fsp.' + display_name,
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # init async
 | 
					            # init async
 | 
				
			||||||
            ln.start_soon(
 | 
					            n.start_soon(
 | 
				
			||||||
                partial(
 | 
					                partial(
 | 
				
			||||||
                    update_chart_from_fsp,
 | 
					                    update_chart_from_fsp,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -627,6 +656,7 @@ async def open_fspd_cluster(
 | 
				
			||||||
                    is_overlay=conf.get('overlay', False),
 | 
					                    is_overlay=conf.get('overlay', False),
 | 
				
			||||||
                    group_status_key=group_status_key,
 | 
					                    group_status_key=group_status_key,
 | 
				
			||||||
                    loglevel=loglevel,
 | 
					                    loglevel=loglevel,
 | 
				
			||||||
 | 
					                    profiler=profiler,
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -650,6 +680,7 @@ async def update_chart_from_fsp(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    group_status_key: str,
 | 
					    group_status_key: str,
 | 
				
			||||||
    loglevel: str,
 | 
					    loglevel: str,
 | 
				
			||||||
 | 
					    profiler: pg.debug.Profiler,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    '''FSP stream chart update loop.
 | 
					    '''FSP stream chart update loop.
 | 
				
			||||||
| 
						 | 
					@ -658,6 +689,9 @@ async def update_chart_from_fsp(
 | 
				
			||||||
    config map.
 | 
					    config map.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    profiler(f'started chart task for fsp: {func_name}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    done = linkedsplits.window().status_bar.open_status(
 | 
					    done = linkedsplits.window().status_bar.open_status(
 | 
				
			||||||
        f'loading fsp, {display_name}..',
 | 
					        f'loading fsp, {display_name}..',
 | 
				
			||||||
        group_key=group_status_key,
 | 
					        group_key=group_status_key,
 | 
				
			||||||
| 
						 | 
					@ -676,12 +710,15 @@ async def update_chart_from_fsp(
 | 
				
			||||||
            symbol=sym,
 | 
					            symbol=sym,
 | 
				
			||||||
            func_name=func_name,
 | 
					            func_name=func_name,
 | 
				
			||||||
            loglevel=loglevel,
 | 
					            loglevel=loglevel,
 | 
				
			||||||
 | 
					            zero_on_step=conf.get('zero_on_step', False),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ) as (ctx, last_index),
 | 
					        ) as (ctx, last_index),
 | 
				
			||||||
        ctx.open_stream() as stream,
 | 
					        ctx.open_stream() as stream,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane,
 | 
					        open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane,
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
 | 
					        profiler(f'fsp:{func_name} attached to fsp ctx-stream')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if is_overlay:
 | 
					        if is_overlay:
 | 
				
			||||||
            chart = linkedsplits.chart
 | 
					            chart = linkedsplits.chart
 | 
				
			||||||
            chart.draw_curve(
 | 
					            chart.draw_curve(
 | 
				
			||||||
| 
						 | 
					@ -719,6 +756,8 @@ async def update_chart_from_fsp(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        array_key = func_name
 | 
					        array_key = func_name
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        profiler(f'fsp:{func_name} chart created')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # first UI update, usually from shm pushed history
 | 
					        # first UI update, usually from shm pushed history
 | 
				
			||||||
        update_fsp_chart(
 | 
					        update_fsp_chart(
 | 
				
			||||||
            chart,
 | 
					            chart,
 | 
				
			||||||
| 
						 | 
					@ -753,6 +792,9 @@ async def update_chart_from_fsp(
 | 
				
			||||||
        done()
 | 
					        done()
 | 
				
			||||||
        chart.linked.resize_sidepanes()
 | 
					        chart.linked.resize_sidepanes()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        profiler(f'fsp:{func_name} starting update loop')
 | 
				
			||||||
 | 
					        profiler.finish()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # update chart graphics
 | 
					        # update chart graphics
 | 
				
			||||||
        i = 0
 | 
					        i = 0
 | 
				
			||||||
        last = time.time()
 | 
					        last = time.time()
 | 
				
			||||||
| 
						 | 
					@ -853,7 +895,7 @@ def has_vlm(ohlcv: ShmArray) -> bool:
 | 
				
			||||||
    return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm)))
 | 
					    return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@asynccontextmanager
 | 
					@acm
 | 
				
			||||||
async def maybe_open_vlm_display(
 | 
					async def maybe_open_vlm_display(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    linked: LinkedSplits,
 | 
					    linked: LinkedSplits,
 | 
				
			||||||
| 
						 | 
					@ -875,12 +917,12 @@ async def maybe_open_vlm_display(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with open_fsp_sidepane(
 | 
					        async with open_fsp_sidepane(
 | 
				
			||||||
            linked, {
 | 
					            linked, {
 | 
				
			||||||
                'volume': {
 | 
					                '$_vlm': {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    'params': {
 | 
					                    'params': {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        'price_func': {
 | 
					                        'price_func': {
 | 
				
			||||||
                            'default_value': 'ohl3',
 | 
					                            'default_value': 'chl3',
 | 
				
			||||||
                            # tell target ``Edit`` widget to not allow
 | 
					                            # tell target ``Edit`` widget to not allow
 | 
				
			||||||
                            # edits for now.
 | 
					                            # edits for now.
 | 
				
			||||||
                            'widget_kwargs': {'readonly': True},
 | 
					                            'widget_kwargs': {'readonly': True},
 | 
				
			||||||
| 
						 | 
					@ -979,17 +1021,19 @@ async def display_symbol_data(
 | 
				
			||||||
    #     group_key=loading_sym_key,
 | 
					    #     group_key=loading_sym_key,
 | 
				
			||||||
    # )
 | 
					    # )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async with(
 | 
					    async with async_enter_all(
 | 
				
			||||||
        open_feed(
 | 
					        open_feed(
 | 
				
			||||||
            provider,
 | 
					            provider,
 | 
				
			||||||
            [sym],
 | 
					            [sym],
 | 
				
			||||||
            loglevel=loglevel,
 | 
					            loglevel=loglevel,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # 60 FPS to limit context switches
 | 
					            # limit to at least display's FPS
 | 
				
			||||||
 | 
					            # avoiding needless Qt-in-guest-mode context switches
 | 
				
			||||||
            tick_throttle=_quote_throttle_rate,
 | 
					            tick_throttle=_quote_throttle_rate,
 | 
				
			||||||
 | 
					        ),
 | 
				
			||||||
 | 
					        maybe_open_fsp_cluster(),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ) as feed,
 | 
					    ) as (feed, cluster_map):
 | 
				
			||||||
    ):
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ohlcv: ShmArray = feed.shm
 | 
					        ohlcv: ShmArray = feed.shm
 | 
				
			||||||
        bars = ohlcv.array
 | 
					        bars = ohlcv.array
 | 
				
			||||||
| 
						 | 
					@ -1042,25 +1086,36 @@ async def display_symbol_data(
 | 
				
			||||||
        # TODO: eventually we'll support some kind of n-compose syntax
 | 
					        # TODO: eventually we'll support some kind of n-compose syntax
 | 
				
			||||||
        fsp_conf = {
 | 
					        fsp_conf = {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            'rsi': {
 | 
					            # 'rsi': {
 | 
				
			||||||
 | 
					            #     'func_name': 'rsi',  # literal python func ref lookup name
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # literal python func ref lookup name
 | 
					            #     # map of parameters to place on the fsp sidepane widget
 | 
				
			||||||
                'func_name': 'rsi',
 | 
					            #     # which should map to dynamic inputs available to the
 | 
				
			||||||
 | 
					            #     # fsp function at runtime.
 | 
				
			||||||
 | 
					            #     'params': {
 | 
				
			||||||
 | 
					            #         'period': {
 | 
				
			||||||
 | 
					            #             'default_value': 14,
 | 
				
			||||||
 | 
					            #             'widget_kwargs': {'readonly': True},
 | 
				
			||||||
 | 
					            #         },
 | 
				
			||||||
 | 
					            #     },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # map of parameters to place on the fsp sidepane widget
 | 
					            #     # ``ChartPlotWidget`` options passthrough
 | 
				
			||||||
                # which should map to dynamic inputs available to the
 | 
					            #     'chart_kwargs': {
 | 
				
			||||||
                # fsp function at runtime.
 | 
					            #         'static_yrange': (0, 100),
 | 
				
			||||||
 | 
					            #     },
 | 
				
			||||||
 | 
					            # },
 | 
				
			||||||
 | 
					            'dolla_vlm': {
 | 
				
			||||||
 | 
					                'func_name': 'dolla_vlm',
 | 
				
			||||||
 | 
					                'zero_on_step': True,
 | 
				
			||||||
                'params': {
 | 
					                'params': {
 | 
				
			||||||
                    'period': {
 | 
					                    'price_func': {
 | 
				
			||||||
                        'default_value': 14,
 | 
					                        'default_value': 'chl3',
 | 
				
			||||||
 | 
					                        # tell target ``Edit`` widget to not allow
 | 
				
			||||||
 | 
					                        # edits for now.
 | 
				
			||||||
                        'widget_kwargs': {'readonly': True},
 | 
					                        'widget_kwargs': {'readonly': True},
 | 
				
			||||||
                    },
 | 
					                    },
 | 
				
			||||||
                },
 | 
					                },
 | 
				
			||||||
 | 
					                'chart_kwargs': {'style': 'step'}
 | 
				
			||||||
                # ``ChartPlotWidget`` options passthrough
 | 
					 | 
				
			||||||
                'chart_kwargs': {
 | 
					 | 
				
			||||||
                    'static_yrange': (0, 100),
 | 
					 | 
				
			||||||
                },
 | 
					 | 
				
			||||||
            },
 | 
					            },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
| 
						 | 
					@ -1072,11 +1127,11 @@ async def display_symbol_data(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # add VWAP to fsp config for downstream loading
 | 
					            # add VWAP to fsp config for downstream loading
 | 
				
			||||||
            fsp_conf.update({
 | 
					            fsp_conf.update({
 | 
				
			||||||
                'vwap': {
 | 
					                # 'vwap': {
 | 
				
			||||||
                    'func_name': 'vwap',
 | 
					                #     'func_name': 'vwap',
 | 
				
			||||||
                    'overlay': True,
 | 
					                #     'overlay': True,
 | 
				
			||||||
                    'anchor': 'session',
 | 
					                #     'anchor': 'session',
 | 
				
			||||||
                },
 | 
					                # },
 | 
				
			||||||
            })
 | 
					            })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # NOTE: we must immediately tell Qt to show the OHLC chart
 | 
					        # NOTE: we must immediately tell Qt to show the OHLC chart
 | 
				
			||||||
| 
						 | 
					@ -1086,13 +1141,15 @@ async def display_symbol_data(
 | 
				
			||||||
        linkedsplits.focus()
 | 
					        linkedsplits.focus()
 | 
				
			||||||
        await trio.sleep(0)
 | 
					        await trio.sleep(0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        vlm_chart = None
 | 
				
			||||||
        async with (
 | 
					        async with (
 | 
				
			||||||
            trio.open_nursery() as ln,
 | 
					            trio.open_nursery() as ln,
 | 
				
			||||||
            maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart,
 | 
					            maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart,
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            # load initial fsp chain (otherwise known as "indicators")
 | 
					            # load initial fsp chain (otherwise known as "indicators")
 | 
				
			||||||
            ln.start_soon(
 | 
					            ln.start_soon(
 | 
				
			||||||
                open_fspd_cluster,
 | 
					                start_fsp_displays,
 | 
				
			||||||
 | 
					                cluster_map,
 | 
				
			||||||
                linkedsplits,
 | 
					                linkedsplits,
 | 
				
			||||||
                fsp_conf,
 | 
					                fsp_conf,
 | 
				
			||||||
                sym,
 | 
					                sym,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue