Handle non-fqsn for derivs and don't put brokername in
							parent
							
								
									99a37f504f
								
							
						
					
					
						commit
						8b8ffe78af
					
				| 
						 | 
					@ -375,8 +375,10 @@ async def manage_history(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def allocate_persistent_feed(
 | 
					async def allocate_persistent_feed(
 | 
				
			||||||
    bus: _FeedsBus,
 | 
					    bus: _FeedsBus,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    brokername: str,
 | 
					    brokername: str,
 | 
				
			||||||
    symbol: str,
 | 
					    symbol: str,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    loglevel: str,
 | 
					    loglevel: str,
 | 
				
			||||||
    start_stream: bool = True,
 | 
					    start_stream: bool = True,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -396,6 +398,7 @@ async def allocate_persistent_feed(
 | 
				
			||||||
    - a real-time streaming task which connec
 | 
					    - a real-time streaming task which connec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
 | 
					    # load backend module
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        mod = get_brokermod(brokername)
 | 
					        mod = get_brokermod(brokername)
 | 
				
			||||||
    except ImportError:
 | 
					    except ImportError:
 | 
				
			||||||
| 
						 | 
					@ -452,7 +455,10 @@ async def allocate_persistent_feed(
 | 
				
			||||||
    # true fqsn
 | 
					    # true fqsn
 | 
				
			||||||
    fqsn = '.'.join((bfqsn, brokername))
 | 
					    fqsn = '.'.join((bfqsn, brokername))
 | 
				
			||||||
    # add a fqsn entry that includes the ``.<broker>`` suffix
 | 
					    # add a fqsn entry that includes the ``.<broker>`` suffix
 | 
				
			||||||
 | 
					    # and an entry that includes the broker-specific fqsn (including
 | 
				
			||||||
 | 
					    # any new suffixes or elements as injected by the backend).
 | 
				
			||||||
    init_msg[fqsn] = msg
 | 
					    init_msg[fqsn] = msg
 | 
				
			||||||
 | 
					    init_msg[bfqsn] = msg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: pretty sure we don't need this? why not just leave 1s as
 | 
					    # TODO: pretty sure we don't need this? why not just leave 1s as
 | 
				
			||||||
    # the fastest "sample period" since we'll probably always want that
 | 
					    # the fastest "sample period" since we'll probably always want that
 | 
				
			||||||
| 
						 | 
					@ -466,13 +472,14 @@ async def allocate_persistent_feed(
 | 
				
			||||||
    await some_data_ready.wait()
 | 
					    await some_data_ready.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # append ``.<broker>`` suffix to each quote symbol
 | 
					    # append ``.<broker>`` suffix to each quote symbol
 | 
				
			||||||
    bsym = symbol + f'.{brokername}'
 | 
					    acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    generic_first_quotes = {
 | 
					    generic_first_quotes = {
 | 
				
			||||||
        bsym: first_quote,
 | 
					        acceptable_not_fqsn_with_broker_suffix: first_quote,
 | 
				
			||||||
        fqsn: first_quote,
 | 
					        fqsn: first_quote,
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    bus.feeds[symbol] = bus.feeds[fqsn] = (
 | 
					    bus.feeds[symbol] = bus.feeds[bfqsn] = (
 | 
				
			||||||
        init_msg,
 | 
					        init_msg,
 | 
				
			||||||
        generic_first_quotes,
 | 
					        generic_first_quotes,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
| 
						 | 
					@ -523,7 +530,7 @@ async def open_feed_bus(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ctx: tractor.Context,
 | 
					    ctx: tractor.Context,
 | 
				
			||||||
    brokername: str,
 | 
					    brokername: str,
 | 
				
			||||||
    symbol: str,
 | 
					    symbol: str,  # normally expected to the broker-specific fqsn
 | 
				
			||||||
    loglevel: str,
 | 
					    loglevel: str,
 | 
				
			||||||
    tick_throttle:  Optional[float] = None,
 | 
					    tick_throttle:  Optional[float] = None,
 | 
				
			||||||
    start_stream: bool = True,
 | 
					    start_stream: bool = True,
 | 
				
			||||||
| 
						 | 
					@ -545,7 +552,9 @@ async def open_feed_bus(
 | 
				
			||||||
    # TODO: check for any stale shm entries for this symbol
 | 
					    # TODO: check for any stale shm entries for this symbol
 | 
				
			||||||
    # (after we also group them in a nice `/dev/shm/piker/` subdir).
 | 
					    # (after we also group them in a nice `/dev/shm/piker/` subdir).
 | 
				
			||||||
    # ensure we are who we think we are
 | 
					    # ensure we are who we think we are
 | 
				
			||||||
    assert 'brokerd' in tractor.current_actor().name
 | 
					    servicename = tractor.current_actor().name
 | 
				
			||||||
 | 
					    assert 'brokerd' in servicename
 | 
				
			||||||
 | 
					    assert brokername in servicename
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    bus = get_feed_bus(brokername)
 | 
					    bus = get_feed_bus(brokername)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -555,7 +564,7 @@ async def open_feed_bus(
 | 
				
			||||||
    entry = bus.feeds.get(symbol)
 | 
					    entry = bus.feeds.get(symbol)
 | 
				
			||||||
    if entry is None:
 | 
					    if entry is None:
 | 
				
			||||||
        # allocate a new actor-local stream bus which
 | 
					        # allocate a new actor-local stream bus which
 | 
				
			||||||
        # will persist for this `brokerd`.
 | 
					        # will persist for this `brokerd`'s service lifetime.
 | 
				
			||||||
        async with bus.task_lock:
 | 
					        async with bus.task_lock:
 | 
				
			||||||
            await bus.nursery.start(
 | 
					            await bus.nursery.start(
 | 
				
			||||||
                partial(
 | 
					                partial(
 | 
				
			||||||
| 
						 | 
					@ -584,7 +593,7 @@ async def open_feed_bus(
 | 
				
			||||||
    # true fqsn
 | 
					    # true fqsn
 | 
				
			||||||
    fqsn = '.'.join([bfqsn, brokername])
 | 
					    fqsn = '.'.join([bfqsn, brokername])
 | 
				
			||||||
    assert fqsn in first_quotes
 | 
					    assert fqsn in first_quotes
 | 
				
			||||||
    assert bus.feeds[fqsn]
 | 
					    assert bus.feeds[bfqsn]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
 | 
					    # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
 | 
				
			||||||
    bsym = symbol + f'.{brokername}'
 | 
					    bsym = symbol + f'.{brokername}'
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue