From 7b4472e37e7eda687f3b47321fe8dd2d20b65d4c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 26 Jun 2023 19:30:20 -0400 Subject: [PATCH] data._sampling.frame_ticks(): slight rework to generalize --- piker/data/_sampling.py | 73 ++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 31 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 641edf53..aeabea0b 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -27,6 +27,7 @@ from collections import ( from contextlib import asynccontextmanager as acm import time from typing import ( + Any, AsyncIterator, TYPE_CHECKING, ) @@ -587,9 +588,9 @@ async def sample_and_broadcast( # TODO: we should probably not write every single # value to an OHLC sample stream XD # for a tick stream sure.. but this is excessive.. - ticks = quote['ticks'] + ticks: list[dict] = quote['ticks'] for tick in ticks: - ticktype = tick['type'] + ticktype: str = tick['type'] # write trade events to shm last OHLC sample if ticktype in ('trade', 'utrade'): @@ -599,13 +600,14 @@ async def sample_and_broadcast( # more compact inline-way to do this assignment # to both buffers? for shm in [rt_shm, hist_shm]: + # update last entry # benchmarked in the 4-5 us range o, high, low, v = shm.array[-1][ ['open', 'high', 'low', 'volume'] ] - new_v = tick.get('size', 0) + new_v: float = tick.get('size', 0) if v == 0 and new_v: # no trades for this bar yet so the open @@ -654,7 +656,7 @@ async def sample_and_broadcast( # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct # key here. - fqme = f'{broker_symbol}.{brokername}' + fqme: str = f'{broker_symbol}.{brokername}' lags: int = 0 # TODO: speed up this loop in an AOT compiled lang (like @@ -757,28 +759,21 @@ _auction_ticks: set[str] = set.union(*_tick_groups.values()) def frame_ticks( - first_quote: dict, - last_quote: dict, - ticks_by_type: dict, -) -> None: + quote: dict[str, Any], + + ticks_by_type: dict[str, list[dict[str, Any]]] = {}, + ticks_in_order: list[dict[str, Any]] | None = None + +) -> dict: + # append quotes since last iteration into the last quote's # tick array/buffer. - ticks = last_quote.get('ticks') - # TODO: once we decide to get fancy really we should # have a shared mem tick buffer that is just # continually filled and the UI just ready from it # at it's display rate. - if ticks: - # TODO: do we need this any more or can we just - # expect the receiver to unwind the below - # `ticks_by_type: dict`? - # => undwinding would potentially require a - # `dict[str, set | list]` instead with an - # included `'types' field which is an (ordered) - # set of tick type fields in the order which - # types arrived? - first_quote['ticks'].extend(ticks) + + if ticks := quote.get('ticks'): # XXX: build a tick-by-type table of lists # of tick messages. This allows for less @@ -797,9 +792,25 @@ def frame_ticks( # append in reverse FIFO order for in-order iteration on # receiver side. + tick: dict[str, Any] for tick in ticks: - ttype = tick['type'] - ticks_by_type[ttype].append(tick) + ticks_by_type.setdefault( + tick['type'], + [], + ).append(tick) + + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? + if ticks_in_order: + ticks_in_order.extend(ticks) + + return ticks_by_type async def uniform_rate_send( @@ -835,10 +846,10 @@ async def uniform_rate_send( diff = 0 task_status.started() - ticks_by_type: defaultdict[ + ticks_by_type: dict[ str, - list[dict], - ] = defaultdict(list) + list[dict[str, Any]], + ] = {} clear_types = _tick_groups['clears'] @@ -866,9 +877,9 @@ async def uniform_rate_send( # expired we aren't supposed to send yet so append # to the tick frame. frame_ticks( - first_quote, last_quote, - ticks_by_type, + ticks_in_order=first_quote['ticks'], + ticks_by_type=ticks_by_type, ) # send cycle isn't due yet so continue waiting @@ -888,8 +899,8 @@ async def uniform_rate_send( frame_ticks( first_quote, - first_quote, - ticks_by_type, + ticks_in_order=first_quote['ticks'], + ticks_by_type=ticks_by_type, ) # we have a quote already so send it now. @@ -905,9 +916,9 @@ async def uniform_rate_send( break frame_ticks( - first_quote, last_quote, - ticks_by_type, + ticks_in_order=first_quote['ticks'], + ticks_by_type=ticks_by_type, ) # measured_rate = 1 / (time.time() - last_send)