Drop throttled rate margin to 100us
							parent
							
								
									65bf5a386d
								
							
						
					
					
						commit
						59defa378c
					
				| 
						 | 
				
			
			@ -172,7 +172,6 @@ async def sample_and_broadcast(
 | 
			
		|||
 | 
			
		||||
    # iterate stream delivered by broker
 | 
			
		||||
    async for quotes in quote_stream:
 | 
			
		||||
 | 
			
		||||
        # TODO: ``numba`` this!
 | 
			
		||||
        for sym, quote in quotes.items():
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -185,8 +184,12 @@ async def sample_and_broadcast(
 | 
			
		|||
 | 
			
		||||
            # start writing the shm buffer with appropriate
 | 
			
		||||
            # trade data
 | 
			
		||||
            for tick in quote['ticks']:
 | 
			
		||||
 | 
			
		||||
            # TODO: we should probably not write every single
 | 
			
		||||
            # value to an OHLC sample stream XD
 | 
			
		||||
            # for a tick stream sure.. but this is excessive..
 | 
			
		||||
            ticks = quote['ticks']
 | 
			
		||||
            for tick in ticks:
 | 
			
		||||
                ticktype = tick['type']
 | 
			
		||||
 | 
			
		||||
                # write trade events to shm last OHLC sample
 | 
			
		||||
| 
						 | 
				
			
			@ -258,7 +261,8 @@ async def sample_and_broadcast(
 | 
			
		|||
 | 
			
		||||
                except (
 | 
			
		||||
                    trio.BrokenResourceError,
 | 
			
		||||
                    trio.ClosedResourceError
 | 
			
		||||
                    trio.ClosedResourceError,
 | 
			
		||||
                    trio.EndOfChannel,
 | 
			
		||||
                ):
 | 
			
		||||
                    # XXX: do we need to deregister here
 | 
			
		||||
                    # if it's done in the fee bus code?
 | 
			
		||||
| 
						 | 
				
			
			@ -268,6 +272,10 @@ async def sample_and_broadcast(
 | 
			
		|||
                        f'{stream._ctx.chan.uid} dropped  '
 | 
			
		||||
                        '`brokerd`-quotes-feed connection'
 | 
			
		||||
                    )
 | 
			
		||||
                    if tick_throttle:
 | 
			
		||||
                        assert stream.closed()
 | 
			
		||||
                        # await stream.aclose()
 | 
			
		||||
 | 
			
		||||
                    subs.remove((stream, tick_throttle))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -283,12 +291,8 @@ async def uniform_rate_send(
 | 
			
		|||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    sleep_period = 1/rate - 0.000616
 | 
			
		||||
    sleep_period = 1/rate - 0.0001  # 100us
 | 
			
		||||
    last_send = time.time()
 | 
			
		||||
    aname = stream._ctx.chan.uid[0]
 | 
			
		||||
    fsp = False
 | 
			
		||||
    if 'fsp' in aname:
 | 
			
		||||
        fsp = True
 | 
			
		||||
 | 
			
		||||
    while True:
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -308,20 +312,33 @@ async def uniform_rate_send(
 | 
			
		|||
                sym, next_quote = quote_stream.receive_nowait()
 | 
			
		||||
                ticks = next_quote.get('ticks')
 | 
			
		||||
 | 
			
		||||
                # XXX: idea for frame type data structure we could use on the
 | 
			
		||||
                # wire instead of a simple list?
 | 
			
		||||
                # frames = {
 | 
			
		||||
                #     'index': ['type_a', 'type_c', 'type_n', 'type_n'],
 | 
			
		||||
 | 
			
		||||
                #     'type_a': [tick0, tick1, tick2, .., tickn],
 | 
			
		||||
                #     'type_b': [tick0, tick1, tick2, .., tickn],
 | 
			
		||||
                #     'type_c': [tick0, tick1, tick2, .., tickn],
 | 
			
		||||
                #     ...
 | 
			
		||||
                #     'type_n': [tick0, tick1, tick2, .., tickn],
 | 
			
		||||
                # }
 | 
			
		||||
                if ticks:
 | 
			
		||||
                    first_quote['ticks'].extend(ticks)
 | 
			
		||||
 | 
			
		||||
            except trio.WouldBlock:
 | 
			
		||||
                now = time.time()
 | 
			
		||||
                rate = 1 / (now - last_send)
 | 
			
		||||
                last_send = now
 | 
			
		||||
 | 
			
		||||
                # log.info(f'{rate} Hz sending quotes')  # \n{first_quote}')
 | 
			
		||||
                log.debug(
 | 
			
		||||
                    f'`{sym}` throttled send hz: {round(rate, ndigits=1)}'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # TODO: now if only we could sync this to the display
 | 
			
		||||
                # rate timing exactly lul
 | 
			
		||||
                try:
 | 
			
		||||
                    await stream.send({sym: first_quote})
 | 
			
		||||
                    last_send = now
 | 
			
		||||
                    break
 | 
			
		||||
                except trio.ClosedResourceError:
 | 
			
		||||
                    # if the feed consumer goes down then drop
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue