From 5952e7f53809bde2170b6dc22115a2a83e773a31 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 31 Jan 2022 07:33:40 -0500 Subject: [PATCH] Add dark vlm deduplication support via flag --- piker/data/_normalize.py | 46 ++++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index 56d64b75..45eed1be 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -14,27 +14,61 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Stream format enforcement. -""" -from typing import AsyncIterator, Optional, Tuple - -import numpy as np +''' +from itertools import chain +from typing import AsyncIterator def iterticks( quote: dict, - types: Tuple[str] = ('trade', 'dark_trade'), + types: tuple[str] = ('trade', 'dark_trade'), + deduplicate_darks: bool = False, ) -> AsyncIterator: ''' Iterate through ticks delivered per quote cycle. ''' + if deduplicate_darks: + assert 'dark_trade' in types + # print(f"{quote}\n\n") ticks = quote.get('ticks', ()) + trades = {} + darks = {} + if ticks: + + # do a first pass and attempt to remove duplicate dark + # trades with the same tick signature. + if deduplicate_darks: + for tick in ticks: + ttype = tick.get('type') + sig = ( + tick['time'], + tick['price'], + tick['size'] + ) + + if ttype == 'dark_trade': + darks[sig] = tick + + elif ttype == 'trade': + trades[sig] = tick + + # filter duplicates + for sig, tick in trades.items(): + tick = darks.pop(sig, None) + if tick: + ticks.remove(tick) + # print(f'DUPLICATE {tick}') + + # re-insert ticks + ticks.extend(list(chain(trades.values(), darks.values()))) + for tick in ticks: # print(f"{quote['symbol']}: {tick}") ttype = tick.get('type')