Factor subscription broadcasting into a func
							parent
							
								
									4a383795bf
								
							
						
					
					
						commit
						16f2f6ff94
					
				| 
						 | 
				
			
			@ -22,7 +22,7 @@ financial data flows.
 | 
			
		|||
from __future__ import annotations
 | 
			
		||||
from collections import Counter
 | 
			
		||||
import time
 | 
			
		||||
from typing import TYPE_CHECKING
 | 
			
		||||
from typing import TYPE_CHECKING, Optional
 | 
			
		||||
 | 
			
		||||
import tractor
 | 
			
		||||
import trio
 | 
			
		||||
| 
						 | 
				
			
			@ -90,6 +90,7 @@ async def increment_ohlc_buffer(
 | 
			
		|||
 | 
			
		||||
    total_s = 0  # total seconds counted
 | 
			
		||||
    lowest = min(sampler.ohlcv_shms.keys())
 | 
			
		||||
    lowest_shm = sampler.ohlcv_shms[lowest][0]
 | 
			
		||||
    ad = lowest - 0.001
 | 
			
		||||
 | 
			
		||||
    with trio.CancelScope() as cs:
 | 
			
		||||
| 
						 | 
				
			
			@ -133,21 +134,33 @@ async def increment_ohlc_buffer(
 | 
			
		|||
                    # write to the buffer
 | 
			
		||||
                    shm.push(last)
 | 
			
		||||
 | 
			
		||||
            # broadcast the buffer index step to any subscribers for
 | 
			
		||||
            # a given sample period.
 | 
			
		||||
            subs = sampler.subscribers.get(delay_s, ())
 | 
			
		||||
            await broadcast(delay_s, shm=lowest_shm)
 | 
			
		||||
 | 
			
		||||
            for stream in subs:
 | 
			
		||||
                try:
 | 
			
		||||
                    await stream.send({'index': shm._last.value})
 | 
			
		||||
                except (
 | 
			
		||||
                    trio.BrokenResourceError,
 | 
			
		||||
                    trio.ClosedResourceError
 | 
			
		||||
                ):
 | 
			
		||||
                    log.error(
 | 
			
		||||
                        f'{stream._ctx.chan.uid} dropped connection'
 | 
			
		||||
                    )
 | 
			
		||||
                    subs.remove(stream)
 | 
			
		||||
 | 
			
		||||
async def broadcast(
 | 
			
		||||
    delay_s: int,
 | 
			
		||||
    shm: Optional[ShmArray] = None,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    # broadcast the buffer index step to any subscribers for
 | 
			
		||||
    # a given sample period.
 | 
			
		||||
    subs = sampler.subscribers.get(delay_s, ())
 | 
			
		||||
 | 
			
		||||
    if shm is None:
 | 
			
		||||
        lowest = min(sampler.ohlcv_shms.keys())
 | 
			
		||||
        shm = sampler.ohlcv_shms[lowest][0]
 | 
			
		||||
 | 
			
		||||
    for stream in subs:
 | 
			
		||||
        try:
 | 
			
		||||
            await stream.send({'index': shm._last.value})
 | 
			
		||||
        except (
 | 
			
		||||
            trio.BrokenResourceError,
 | 
			
		||||
            trio.ClosedResourceError
 | 
			
		||||
        ):
 | 
			
		||||
            log.error(
 | 
			
		||||
                f'{stream._ctx.chan.uid} dropped connection'
 | 
			
		||||
            )
 | 
			
		||||
            subs.remove(stream)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue