Add back in legacy write loop for reference
							parent
							
								
									8047714101
								
							
						
					
					
						commit
						445b82283d
					
				|  | @ -59,15 +59,25 @@ _quote_dt = [ | |||
|     ('Epoch', 'i8'), | ||||
|     ('Nanoseconds', 'i4'), | ||||
| 
 | ||||
|     ('Tick', 'i4'),  # do we need this? | ||||
|     ('Tick', 'i4'),  # (-1, 0, 1) = (on bid, same, on ask) | ||||
|     # ('fill_time', 'f4'), | ||||
|     ('Last', 'f4'), | ||||
|     ('Bid', 'f4'), | ||||
|     ('Bsize', 'f4'), | ||||
|     ('Asize', 'f4'), | ||||
|     ('Bsize', 'i8'), | ||||
|     ('Asize', 'i8'), | ||||
|     ('Ask', 'f4'), | ||||
|     ('Size', 'i8'), | ||||
|     ('Volume', 'f4'), | ||||
|     ('Volume', 'i8'), | ||||
|     # ('brokerd_ts', 'i64'), | ||||
|     # ('VWAP', 'f4') | ||||
| ] | ||||
| _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) | ||||
| _tick_map = { | ||||
|     'Up': 1, | ||||
|     'Equal': 0, | ||||
|     'Down': -1, | ||||
|     None: np.nan, | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def mk_tbk(keys: tuple[str, str, str]) -> str: | ||||
|  | @ -91,8 +101,7 @@ def quote_to_marketstore_structarray( | |||
|     ''' | ||||
|     if last_fill: | ||||
|         # new fill bby | ||||
|         now = timestamp(last_fill, unit='s') | ||||
| 
 | ||||
|         now = timestamp(last_fill) | ||||
|     else: | ||||
|         # this should get inserted upstream by the broker-client to | ||||
|         # subtract from IPC latency | ||||
|  | @ -126,8 +135,8 @@ def timestamp(date, **kwargs) -> int: | |||
|     ''' | ||||
|     Return marketstore compatible 'Epoch' integer in nanoseconds | ||||
|     from a date formatted str. | ||||
|     ''' | ||||
| 
 | ||||
|     ''' | ||||
|     return int(pd.Timestamp(date, **kwargs).value) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -162,21 +171,6 @@ async def ingest_quote_stream( | |||
|                 for tick in quote.get('ticks', ()): | ||||
|                     ticktype = tick.get('type', 'n/a') | ||||
| 
 | ||||
|             # _quote_dt = [ | ||||
|             #     # these two are required for as a "primary key" | ||||
|             #    ('Epoch', 'i8'), | ||||
|             #    ('Nanoseconds', 'i4'), | ||||
|             #    ('Tick', 'i4'), | ||||
|             # | ||||
|             #    ('Last', 'f4'), | ||||
|             #    ('Bid', 'f4'), | ||||
|             #    ('Bsize', 'f4'), | ||||
|             #    ('Asize', 'f4'), | ||||
|             #    ('Ask', 'f4'), | ||||
|             #    ('Size', 'i8'), | ||||
|             #    ('Volume', 'f4'), | ||||
|             # ] | ||||
| 
 | ||||
|             # techtonic tick write | ||||
|             array = quote_to_marketstore_structarray({ | ||||
|                 'IsTrade': 1 if ticktype == 'trade' else 0, | ||||
|  | @ -187,46 +181,38 @@ async def ingest_quote_stream( | |||
| 
 | ||||
|             await ms_client.write(array, _tick_tbk) | ||||
| 
 | ||||
|             quote_cache = { | ||||
|                 'size': 0, | ||||
|                 'tick': 0 | ||||
|             } | ||||
|             # start ingest to marketstore | ||||
|             async for quotes in feed.stream: | ||||
|                 log.info(quotes) | ||||
|                 for symbol, quote in quotes.items(): | ||||
|             # LEGACY WRITE LOOP (using old tick dt) | ||||
|             # quote_cache = { | ||||
|             #     'size': 0, | ||||
|             #     'tick': 0 | ||||
|             # } | ||||
| 
 | ||||
|                     for tick in quote.get('ticks', ()): | ||||
|                         ticktype = tick.get('type') | ||||
|                         price = tick.get('price') | ||||
|                         size = tick.get('size') | ||||
|             # async for quotes in qstream: | ||||
|             #     log.info(quotes) | ||||
|             #     for symbol, quote in quotes.items(): | ||||
| 
 | ||||
|                         if ticktype == 'n/a' or price == -1: | ||||
|                             # okkk.. | ||||
|                             continue | ||||
|             #         # remap tick strs to ints | ||||
|             #         quote['tick'] = _tick_map[quote.get('tick', 'Equal')] | ||||
| 
 | ||||
|                         # clearing price event | ||||
|                         if ticktype == 'trade': | ||||
|                             quote_cache['volume'] = quote['volume'] | ||||
|                             quote_cache['last'] = price | ||||
|                             # quote_cache['broker_ts'] = quote['broker_ts'] | ||||
|             #         # check for volume update (i.e. did trades happen | ||||
|             #         # since last quote) | ||||
|             #         new_vol = quote.get('volume', None) | ||||
|             #         if new_vol is None: | ||||
|             #             log.debug(f"No fills for {symbol}") | ||||
|             #             if new_vol == quote_cache.get('volume'): | ||||
|             #                 # should never happen due to field diffing | ||||
|             #                 # on sender side | ||||
|             #                 log.error( | ||||
|             #                     f"{symbol}: got same volume as last quote?") | ||||
| 
 | ||||
|                         # l1 book events | ||||
|                         elif ticktype in ('ask', 'asize'): | ||||
|                             quote_cache['ask'] = price | ||||
|                             quote_cache['asize'] = size | ||||
|             #         quote_cache.update(quote) | ||||
| 
 | ||||
|                         elif ticktype in ('bid', 'bsize'): | ||||
|                             quote_cache['bid'] = price | ||||
|                             quote_cache['bsize'] = size | ||||
| 
 | ||||
|                     a = quote_to_marketstore_structarray( | ||||
|                         quote_cache, | ||||
|                         last_fill=quote.get('broker_ts', None) | ||||
|                     ) | ||||
|                     log.info(a) | ||||
|                     # breakpoint() | ||||
|                     await ms_client.write(symbol, a) | ||||
|             #         a = quote_to_marketstore_structarray( | ||||
|             #             quote, | ||||
|             #             # TODO: check this closer to the broker query api | ||||
|             #             last_fill=quote.get('fill_time', '') | ||||
|             #         ) | ||||
|             #         await ms_client.write(symbol, a) | ||||
| 
 | ||||
| 
 | ||||
| # async def stream_quotes( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue