Make the data feed layer "fqsn" aware
In order to support instruments with lifetimes (aka derivatives) we need generally need special symbol annotations which detail such meta data (such as `MNQ.GLOBEX.20220717` for daq futes). Further there is really no reason for the public api for this feed layer to care about getting a special "brokername" field since generally the data is coming directly from UIs (eg. search selection) so we might as well accept a fqsn (fully qualified symbol name) which includes the broker name; for now a suffix like `'.ib'`. We may change this schema (soon) but this at least gets us to a point where we expect the full name including broker/provider. An additional detail: for certain "generic" symbol names (like for futes) we will pull a so called "front contract" and map this to a specific fqsn underneath, so there is a double (cached) entry for that entry such that other consumers can use it the same way if desired. Some other machinery changes: - expect the `stream_quotes()` endpoint to deliver it's `.started()` msg almost immediately since we now need it deliver any fqsn asap (yes this means the ep should no longer wait on a "live" first quote and instead deliver what quote data it can right away. - expect the quotes ohlc sampler task to add in the broker name before broadcast to remote (actor) consumers since the backend isn't (yet) expected to do that add in itself. - obviously we start using all the new fqsn related `Symbol` apisfqsns
							parent
							
								
									e9d64ffee8
								
							
						
					
					
						commit
						8462ea8a28
					
				|  | @ -50,9 +50,8 @@ from ._sharedmem import ( | |||
| from .ingest import get_ingestormod | ||||
| from ._source import ( | ||||
|     base_iohlc_dtype, | ||||
|     mk_symbol, | ||||
|     Symbol, | ||||
|     mk_fqsn, | ||||
|     uncons_fqsn, | ||||
| ) | ||||
| from ..ui import _search | ||||
| from ._sampling import ( | ||||
|  | @ -192,7 +191,7 @@ async def _setup_persistent_brokerd( | |||
| async def manage_history( | ||||
|     mod: ModuleType, | ||||
|     bus: _FeedsBus, | ||||
|     symbol: str, | ||||
|     fqsn: str, | ||||
|     some_data_ready: trio.Event, | ||||
|     feed_is_live: trio.Event, | ||||
| 
 | ||||
|  | @ -206,8 +205,6 @@ async def manage_history( | |||
|     buffer. | ||||
| 
 | ||||
|     ''' | ||||
|     fqsn = mk_fqsn(mod.name, symbol) | ||||
| 
 | ||||
|     # (maybe) allocate shm array for this broker/symbol which will | ||||
|     # be used for fast near-term history capture and processing. | ||||
|     shm, opened = maybe_open_shm_array( | ||||
|  | @ -226,7 +223,7 @@ async def manage_history( | |||
|         # start history backfill task ``backfill_bars()`` is | ||||
|         # a required backend func this must block until shm is | ||||
|         # filled with first set of ohlc bars | ||||
|         _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) | ||||
|         _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) | ||||
| 
 | ||||
|     # yield back after client connect with filled shm | ||||
|     task_status.started(shm) | ||||
|  | @ -285,8 +282,6 @@ async def allocate_persistent_feed( | |||
|     except ImportError: | ||||
|         mod = get_ingestormod(brokername) | ||||
| 
 | ||||
|     fqsn = mk_fqsn(brokername, symbol) | ||||
| 
 | ||||
|     # mem chan handed to broker backend so it can push real-time | ||||
|     # quotes to this task for sampling and history storage (see below). | ||||
|     send, quote_stream = trio.open_memory_channel(10) | ||||
|  | @ -295,28 +290,9 @@ async def allocate_persistent_feed( | |||
|     some_data_ready = trio.Event() | ||||
|     feed_is_live = trio.Event() | ||||
| 
 | ||||
|     # run 2 tasks: | ||||
|     # - a history loader / maintainer | ||||
|     # - a real-time streamer which consumers and sends new data to any | ||||
|     #   consumers as well as writes to storage backends (as configured). | ||||
| 
 | ||||
|     # XXX: neither of these will raise but will cause an inf hang due to: | ||||
|     # https://github.com/python-trio/trio/issues/2258 | ||||
|     # bus.nursery.start_soon( | ||||
|     # await bus.start_task( | ||||
| 
 | ||||
|     shm = await bus.nursery.start( | ||||
|         manage_history, | ||||
|         mod, | ||||
|         bus, | ||||
|         symbol, | ||||
|         some_data_ready, | ||||
|         feed_is_live, | ||||
|     ) | ||||
| 
 | ||||
|     # establish broker backend quote stream by calling | ||||
|     # ``stream_quotes()``, which is a required broker backend endpoint. | ||||
|     init_msg, first_quotes = await bus.nursery.start( | ||||
|     init_msg, first_quote = await bus.nursery.start( | ||||
|         partial( | ||||
|             mod.stream_quotes, | ||||
|             send_chan=send, | ||||
|  | @ -325,11 +301,38 @@ async def allocate_persistent_feed( | |||
|             loglevel=loglevel, | ||||
|         ) | ||||
|     ) | ||||
|     # the broker-specific fully qualified symbol name | ||||
|     bfqsn = init_msg[symbol]['fqsn'] | ||||
| 
 | ||||
|     # HISTORY, run 2 tasks: | ||||
|     # - a history loader / maintainer | ||||
|     # - a real-time streamer which consumers and sends new data to any | ||||
|     #   consumers as well as writes to storage backends (as configured). | ||||
| 
 | ||||
|     # XXX: neither of these will raise but will cause an inf hang due to: | ||||
|     # https://github.com/python-trio/trio/issues/2258 | ||||
|     # bus.nursery.start_soon( | ||||
|     # await bus.start_task( | ||||
|     shm = await bus.nursery.start( | ||||
|         manage_history, | ||||
|         mod, | ||||
|         bus, | ||||
|         bfqsn, | ||||
|         some_data_ready, | ||||
|         feed_is_live, | ||||
|     ) | ||||
| 
 | ||||
|     # we hand an IPC-msg compatible shm token to the caller so it | ||||
|     # can read directly from the memory which will be written by | ||||
|     # this task. | ||||
|     init_msg[symbol]['shm_token'] = shm.token | ||||
|     msg = init_msg[symbol] | ||||
|     msg['shm_token'] = shm.token | ||||
| 
 | ||||
|     # true fqsn | ||||
|     fqsn = '.'.join((bfqsn, brokername)) | ||||
| 
 | ||||
|     # add a fqsn entry that includes the ``.<broker>`` suffix | ||||
|     init_msg[fqsn] = msg | ||||
| 
 | ||||
|     # TODO: pretty sure we don't need this? why not just leave 1s as | ||||
|     # the fastest "sample period" since we'll probably always want that | ||||
|  | @ -342,8 +345,22 @@ async def allocate_persistent_feed( | |||
|     log.info(f'waiting on history to load: {fqsn}') | ||||
|     await some_data_ready.wait() | ||||
| 
 | ||||
|     bus.feeds[symbol.lower()] = (init_msg, first_quotes) | ||||
|     task_status.started((init_msg,  first_quotes)) | ||||
|     # append ``.<broker>`` suffix to each quote symbol | ||||
|     bsym = symbol + f'.{brokername}' | ||||
|     generic_first_quotes = { | ||||
|         bsym: first_quote, | ||||
|         fqsn: first_quote, | ||||
|     } | ||||
| 
 | ||||
|     bus.feeds[symbol] = bus.feeds[fqsn] = ( | ||||
|         init_msg, | ||||
|         generic_first_quotes, | ||||
|     ) | ||||
|     # for ambiguous names we simply apply the retreived | ||||
|     # feed to that name (for now). | ||||
| 
 | ||||
|     # task_status.started((init_msg,  generic_first_quotes)) | ||||
|     task_status.started() | ||||
| 
 | ||||
|     # backend will indicate when real-time quotes have begun. | ||||
|     await feed_is_live.wait() | ||||
|  | @ -358,10 +375,11 @@ async def allocate_persistent_feed( | |||
|             bus, | ||||
|             shm, | ||||
|             quote_stream, | ||||
|             brokername, | ||||
|             sum_tick_vlm | ||||
|         ) | ||||
|     finally: | ||||
|         log.warning(f'{symbol}@{brokername} feed task terminated') | ||||
|         log.warning(f'{fqsn} feed task terminated') | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  | @ -394,25 +412,16 @@ async def open_feed_bus( | |||
|     assert 'brokerd' in tractor.current_actor().name | ||||
| 
 | ||||
|     bus = get_feed_bus(brokername) | ||||
|     bus._subscribers.setdefault(symbol, []) | ||||
|     fqsn = mk_fqsn(brokername, symbol) | ||||
| 
 | ||||
|     entry = bus.feeds.get(symbol) | ||||
| 
 | ||||
|     # if no cached feed for this symbol has been created for this | ||||
|     # brokerd yet, start persistent stream and shm writer task in | ||||
|     # service nursery | ||||
|     entry = bus.feeds.get(symbol) | ||||
|     if entry is None: | ||||
|         if not start_stream: | ||||
|             raise RuntimeError( | ||||
|                 f'No stream feed exists for {fqsn}?\n' | ||||
|                 f'You may need a `brokerd` started first.' | ||||
|             ) | ||||
| 
 | ||||
|         # allocate a new actor-local stream bus which will persist for | ||||
|         # this `brokerd`. | ||||
|         # allocate a new actor-local stream bus which | ||||
|         # will persist for this `brokerd`. | ||||
|         async with bus.task_lock: | ||||
|             init_msg, first_quotes = await bus.nursery.start( | ||||
|             await bus.nursery.start( | ||||
|                 partial( | ||||
|                     allocate_persistent_feed, | ||||
| 
 | ||||
|  | @ -434,9 +443,30 @@ async def open_feed_bus( | |||
|     # subscriber | ||||
|     init_msg, first_quotes = bus.feeds[symbol] | ||||
| 
 | ||||
|     msg = init_msg[symbol] | ||||
|     bfqsn = msg['fqsn'] | ||||
| 
 | ||||
|     # true fqsn | ||||
|     fqsn = '.'.join([bfqsn, brokername]) | ||||
|     assert fqsn in first_quotes | ||||
|     assert bus.feeds[fqsn] | ||||
| 
 | ||||
|     # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) | ||||
|     bsym = symbol + f'.{brokername}' | ||||
|     assert bsym in first_quotes | ||||
| 
 | ||||
|     # we use the broker-specific fqsn (bfqsn) for | ||||
|     # the sampler subscription since the backend isn't (yet) | ||||
|     # expected to append it's own name to the fqsn, so we filter | ||||
|     # on keys which *do not* include that name (e.g .ib) . | ||||
|     bus._subscribers.setdefault(bfqsn, []) | ||||
| 
 | ||||
|     # send this even to subscribers to existing feed? | ||||
|     # deliver initial info message a first quote asap | ||||
|     await ctx.started((init_msg, first_quotes)) | ||||
|     await ctx.started(( | ||||
|         init_msg, | ||||
|         first_quotes, | ||||
|     )) | ||||
| 
 | ||||
|     if not start_stream: | ||||
|         log.warning(f'Not opening real-time stream for {fqsn}') | ||||
|  | @ -449,14 +479,12 @@ async def open_feed_bus( | |||
|         # re-send to trigger display loop cycle (necessary especially | ||||
|         # when the mkt is closed and no real-time messages are | ||||
|         # expected). | ||||
|         await stream.send(first_quotes) | ||||
|         await stream.send({fqsn: first_quotes}) | ||||
| 
 | ||||
|         # open a bg task which receives quotes over a mem chan | ||||
|         # and only pushes them to the target actor-consumer at | ||||
|         # a max ``tick_throttle`` instantaneous rate. | ||||
|         if tick_throttle: | ||||
| 
 | ||||
|             # open a bg task which receives quotes over a mem chan | ||||
|             # and only pushes them to the target actor-consumer at | ||||
|             # a max ``tick_throttle`` instantaneous rate. | ||||
| 
 | ||||
|             send, recv = trio.open_memory_channel(2**10) | ||||
|             cs = await bus.start_task( | ||||
|                 uniform_rate_send, | ||||
|  | @ -469,12 +497,15 @@ async def open_feed_bus( | |||
|         else: | ||||
|             sub = (stream, tick_throttle) | ||||
| 
 | ||||
|         subs = bus._subscribers[symbol] | ||||
|         subs = bus._subscribers[bfqsn] | ||||
|         subs.append(sub) | ||||
| 
 | ||||
|         try: | ||||
|             uid = ctx.chan.uid | ||||
| 
 | ||||
|             # ctrl protocol for start/stop of quote streams based on UI | ||||
|             # state (eg. don't need a stream when a symbol isn't being | ||||
|             # displayed). | ||||
|             async for msg in stream: | ||||
| 
 | ||||
|                 if msg == 'pause': | ||||
|  | @ -499,7 +530,7 @@ async def open_feed_bus( | |||
|                 # n.cancel_scope.cancel() | ||||
|                 cs.cancel() | ||||
|             try: | ||||
|                 bus._subscribers[symbol].remove(sub) | ||||
|                 bus._subscribers[bfqsn].remove(sub) | ||||
|             except ValueError: | ||||
|                 log.warning(f'{sub} for {symbol} was already removed?') | ||||
| 
 | ||||
|  | @ -625,10 +656,10 @@ async def install_brokerd_search( | |||
| 
 | ||||
| @asynccontextmanager | ||||
| async def open_feed( | ||||
|     brokername: str, | ||||
|     symbols: list[str], | ||||
|     loglevel: Optional[str] = None, | ||||
| 
 | ||||
|     fqsns: list[str], | ||||
| 
 | ||||
|     loglevel: Optional[str] = None, | ||||
|     backpressure: bool = True, | ||||
|     start_stream: bool = True, | ||||
|     tick_throttle: Optional[float] = None,  # Hz | ||||
|  | @ -638,7 +669,10 @@ async def open_feed( | |||
|     Open a "data feed" which provides streamed real-time quotes. | ||||
| 
 | ||||
|     ''' | ||||
|     sym = symbols[0].lower() | ||||
|     fqsn = fqsns[0].lower() | ||||
| 
 | ||||
|     brokername, key, suffix = uncons_fqsn(fqsn) | ||||
|     bfqsn = fqsn.replace('.' + brokername, '') | ||||
| 
 | ||||
|     try: | ||||
|         mod = get_brokermod(brokername) | ||||
|  | @ -659,7 +693,7 @@ async def open_feed( | |||
|         portal.open_context( | ||||
|             open_feed_bus, | ||||
|             brokername=brokername, | ||||
|             symbol=sym, | ||||
|             symbol=bfqsn, | ||||
|             loglevel=loglevel, | ||||
|             start_stream=start_stream, | ||||
|             tick_throttle=tick_throttle, | ||||
|  | @ -676,9 +710,10 @@ async def open_feed( | |||
|     ): | ||||
|         # we can only read from shm | ||||
|         shm = attach_shm_array( | ||||
|             token=init_msg[sym]['shm_token'], | ||||
|             token=init_msg[bfqsn]['shm_token'], | ||||
|             readonly=True, | ||||
|         ) | ||||
|         assert fqsn in first_quotes | ||||
| 
 | ||||
|         feed = Feed( | ||||
|             name=brokername, | ||||
|  | @ -691,17 +726,15 @@ async def open_feed( | |||
|         ) | ||||
| 
 | ||||
|         for sym, data in init_msg.items(): | ||||
| 
 | ||||
|             si = data['symbol_info'] | ||||
| 
 | ||||
|             symbol = mk_symbol( | ||||
|                 key=sym, | ||||
|                 type_key=si.get('asset_type', 'forex'), | ||||
|                 tick_size=si.get('price_tick_size', 0.01), | ||||
|                 lot_tick_size=si.get('lot_tick_size', 0.0), | ||||
|             fqsn = data['fqsn'] + f'.{brokername}' | ||||
|             symbol = Symbol.from_fqsn( | ||||
|                 fqsn, | ||||
|                 info=si, | ||||
|             ) | ||||
|             symbol.broker_info[brokername] = si | ||||
| 
 | ||||
|             # symbol.broker_info[brokername] = si | ||||
|             feed.symbols[fqsn] = symbol | ||||
|             feed.symbols[sym] = symbol | ||||
| 
 | ||||
|             # cast shm dtype to list... can't member why we need this | ||||
|  | @ -725,8 +758,7 @@ async def open_feed( | |||
| @asynccontextmanager | ||||
| async def maybe_open_feed( | ||||
| 
 | ||||
|     brokername: str, | ||||
|     symbols: list[str], | ||||
|     fqsns: list[str], | ||||
|     loglevel: Optional[str] = None, | ||||
| 
 | ||||
|     **kwargs, | ||||
|  | @ -738,13 +770,12 @@ async def maybe_open_feed( | |||
|     in a tractor broadcast receiver. | ||||
| 
 | ||||
|     ''' | ||||
|     sym = symbols[0].lower() | ||||
|     fqsn = fqsns[0] | ||||
| 
 | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_feed, | ||||
|         kwargs={ | ||||
|             'brokername': brokername, | ||||
|             'symbols': [sym], | ||||
|             'fqsns': fqsns, | ||||
|             'loglevel': loglevel, | ||||
|             'tick_throttle': kwargs.get('tick_throttle'), | ||||
| 
 | ||||
|  | @ -752,11 +783,11 @@ async def maybe_open_feed( | |||
|             'backpressure': kwargs.get('backpressure', True), | ||||
|             'start_stream': kwargs.get('start_stream', True), | ||||
|         }, | ||||
|         key=sym, | ||||
|         key=fqsn, | ||||
|     ) as (cache_hit, feed): | ||||
| 
 | ||||
|         if cache_hit: | ||||
|             log.info(f'Using cached feed for {brokername}.{sym}') | ||||
|             log.info(f'Using cached feed for {fqsn}') | ||||
|             # add a new broadcast subscription for the quote stream | ||||
|             # if this feed is likely already in use | ||||
|             async with feed.stream.subscribe() as bstream: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue