Add back in legacy write loop for reference
							parent
							
								
									f9b799b53d
								
							
						
					
					
						commit
						57b3d2f7e4
					
				| 
						 | 
				
			
			@ -59,15 +59,25 @@ _quote_dt = [
 | 
			
		|||
    ('Epoch', 'i8'),
 | 
			
		||||
    ('Nanoseconds', 'i4'),
 | 
			
		||||
 | 
			
		||||
    ('Tick', 'i4'),  # do we need this?
 | 
			
		||||
    ('Tick', 'i4'),  # (-1, 0, 1) = (on bid, same, on ask)
 | 
			
		||||
    # ('fill_time', 'f4'),
 | 
			
		||||
    ('Last', 'f4'),
 | 
			
		||||
    ('Bid', 'f4'),
 | 
			
		||||
    ('Bsize', 'f4'),
 | 
			
		||||
    ('Asize', 'f4'),
 | 
			
		||||
    ('Bsize', 'i8'),
 | 
			
		||||
    ('Asize', 'i8'),
 | 
			
		||||
    ('Ask', 'f4'),
 | 
			
		||||
    ('Size', 'i8'),
 | 
			
		||||
    ('Volume', 'f4'),
 | 
			
		||||
    ('Volume', 'i8'),
 | 
			
		||||
    # ('brokerd_ts', 'i64'),
 | 
			
		||||
    # ('VWAP', 'f4')
 | 
			
		||||
]
 | 
			
		||||
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
 | 
			
		||||
_tick_map = {
 | 
			
		||||
    'Up': 1,
 | 
			
		||||
    'Equal': 0,
 | 
			
		||||
    'Down': -1,
 | 
			
		||||
    None: np.nan,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def mk_tbk(keys: tuple[str, str, str]) -> str:
 | 
			
		||||
| 
						 | 
				
			
			@ -91,8 +101,7 @@ def quote_to_marketstore_structarray(
 | 
			
		|||
    '''
 | 
			
		||||
    if last_fill:
 | 
			
		||||
        # new fill bby
 | 
			
		||||
        now = timestamp(last_fill, unit='s')
 | 
			
		||||
 | 
			
		||||
        now = timestamp(last_fill)
 | 
			
		||||
    else:
 | 
			
		||||
        # this should get inserted upstream by the broker-client to
 | 
			
		||||
        # subtract from IPC latency
 | 
			
		||||
| 
						 | 
				
			
			@ -126,8 +135,8 @@ def timestamp(date, **kwargs) -> int:
 | 
			
		|||
    '''
 | 
			
		||||
    Return marketstore compatible 'Epoch' integer in nanoseconds
 | 
			
		||||
    from a date formatted str.
 | 
			
		||||
    '''
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    return int(pd.Timestamp(date, **kwargs).value)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -162,21 +171,6 @@ async def ingest_quote_stream(
 | 
			
		|||
                for tick in quote.get('ticks', ()):
 | 
			
		||||
                    ticktype = tick.get('type', 'n/a')
 | 
			
		||||
 | 
			
		||||
            # _quote_dt = [
 | 
			
		||||
            #     # these two are required for as a "primary key"
 | 
			
		||||
            #    ('Epoch', 'i8'),
 | 
			
		||||
            #    ('Nanoseconds', 'i4'),
 | 
			
		||||
            #    ('Tick', 'i4'),
 | 
			
		||||
            #
 | 
			
		||||
            #    ('Last', 'f4'),
 | 
			
		||||
            #    ('Bid', 'f4'),
 | 
			
		||||
            #    ('Bsize', 'f4'),
 | 
			
		||||
            #    ('Asize', 'f4'),
 | 
			
		||||
            #    ('Ask', 'f4'),
 | 
			
		||||
            #    ('Size', 'i8'),
 | 
			
		||||
            #    ('Volume', 'f4'),
 | 
			
		||||
            # ]
 | 
			
		||||
 | 
			
		||||
            # techtonic tick write
 | 
			
		||||
            array = quote_to_marketstore_structarray({
 | 
			
		||||
                'IsTrade': 1 if ticktype == 'trade' else 0,
 | 
			
		||||
| 
						 | 
				
			
			@ -187,46 +181,38 @@ async def ingest_quote_stream(
 | 
			
		|||
 | 
			
		||||
            await ms_client.write(array, _tick_tbk)
 | 
			
		||||
 | 
			
		||||
            quote_cache = {
 | 
			
		||||
                'size': 0,
 | 
			
		||||
                'tick': 0
 | 
			
		||||
            }
 | 
			
		||||
            # start ingest to marketstore
 | 
			
		||||
            async for quotes in feed.stream:
 | 
			
		||||
                log.info(quotes)
 | 
			
		||||
                for symbol, quote in quotes.items():
 | 
			
		||||
            # LEGACY WRITE LOOP (using old tick dt)
 | 
			
		||||
            # quote_cache = {
 | 
			
		||||
            #     'size': 0,
 | 
			
		||||
            #     'tick': 0
 | 
			
		||||
            # }
 | 
			
		||||
 | 
			
		||||
                    for tick in quote.get('ticks', ()):
 | 
			
		||||
                        ticktype = tick.get('type')
 | 
			
		||||
                        price = tick.get('price')
 | 
			
		||||
                        size = tick.get('size')
 | 
			
		||||
            # async for quotes in qstream:
 | 
			
		||||
            #     log.info(quotes)
 | 
			
		||||
            #     for symbol, quote in quotes.items():
 | 
			
		||||
 | 
			
		||||
                        if ticktype == 'n/a' or price == -1:
 | 
			
		||||
                            # okkk..
 | 
			
		||||
                            continue
 | 
			
		||||
            #         # remap tick strs to ints
 | 
			
		||||
            #         quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
 | 
			
		||||
 | 
			
		||||
                        # clearing price event
 | 
			
		||||
                        if ticktype == 'trade':
 | 
			
		||||
                            quote_cache['volume'] = quote['volume']
 | 
			
		||||
                            quote_cache['last'] = price
 | 
			
		||||
                            # quote_cache['broker_ts'] = quote['broker_ts']
 | 
			
		||||
            #         # check for volume update (i.e. did trades happen
 | 
			
		||||
            #         # since last quote)
 | 
			
		||||
            #         new_vol = quote.get('volume', None)
 | 
			
		||||
            #         if new_vol is None:
 | 
			
		||||
            #             log.debug(f"No fills for {symbol}")
 | 
			
		||||
            #             if new_vol == quote_cache.get('volume'):
 | 
			
		||||
            #                 # should never happen due to field diffing
 | 
			
		||||
            #                 # on sender side
 | 
			
		||||
            #                 log.error(
 | 
			
		||||
            #                     f"{symbol}: got same volume as last quote?")
 | 
			
		||||
 | 
			
		||||
                        # l1 book events
 | 
			
		||||
                        elif ticktype in ('ask', 'asize'):
 | 
			
		||||
                            quote_cache['ask'] = price
 | 
			
		||||
                            quote_cache['asize'] = size
 | 
			
		||||
            #         quote_cache.update(quote)
 | 
			
		||||
 | 
			
		||||
                        elif ticktype in ('bid', 'bsize'):
 | 
			
		||||
                            quote_cache['bid'] = price
 | 
			
		||||
                            quote_cache['bsize'] = size
 | 
			
		||||
 | 
			
		||||
                    a = quote_to_marketstore_structarray(
 | 
			
		||||
                        quote_cache,
 | 
			
		||||
                        last_fill=quote.get('broker_ts', None)
 | 
			
		||||
                    )
 | 
			
		||||
                    log.info(a)
 | 
			
		||||
                    # breakpoint()
 | 
			
		||||
                    await ms_client.write(symbol, a)
 | 
			
		||||
            #         a = quote_to_marketstore_structarray(
 | 
			
		||||
            #             quote,
 | 
			
		||||
            #             # TODO: check this closer to the broker query api
 | 
			
		||||
            #             last_fill=quote.get('fill_time', '')
 | 
			
		||||
            #         )
 | 
			
		||||
            #         await ms_client.write(symbol, a)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# async def stream_quotes(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue