Convert `iter_ohlc_periods()` to a `@tractor.context`
							parent
							
								
									3dc87e0426
								
							
						
					
					
						commit
						00d7bb089f
					
				| 
						 | 
					@ -133,18 +133,20 @@ async def increment_ohlc_buffer(
 | 
				
			||||||
            # a given sample period.
 | 
					            # a given sample period.
 | 
				
			||||||
            subs = sampler.subscribers.get(delay_s, ())
 | 
					            subs = sampler.subscribers.get(delay_s, ())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            for ctx in subs:
 | 
					            for stream in subs:
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
                    await ctx.send_yield({'index': shm._last.value})
 | 
					                    await stream.send({'index': shm._last.value})
 | 
				
			||||||
                except (
 | 
					                except (
 | 
				
			||||||
                    trio.BrokenResourceError,
 | 
					                    trio.BrokenResourceError,
 | 
				
			||||||
                    trio.ClosedResourceError
 | 
					                    trio.ClosedResourceError
 | 
				
			||||||
                ):
 | 
					                ):
 | 
				
			||||||
                    log.error(f'{ctx.chan.uid} dropped connection')
 | 
					                    log.error(
 | 
				
			||||||
                    subs.remove(ctx)
 | 
					                        f'{stream._ctx.chan.uid} dropped connection'
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                    subs.remove(stream)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@tractor.stream
 | 
					@tractor.context
 | 
				
			||||||
async def iter_ohlc_periods(
 | 
					async def iter_ohlc_periods(
 | 
				
			||||||
    ctx: tractor.Context,
 | 
					    ctx: tractor.Context,
 | 
				
			||||||
    delay_s: int,
 | 
					    delay_s: int,
 | 
				
			||||||
| 
						 | 
					@ -158,18 +160,20 @@ async def iter_ohlc_periods(
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    # add our subscription
 | 
					    # add our subscription
 | 
				
			||||||
    subs = sampler.subscribers.setdefault(delay_s, [])
 | 
					    subs = sampler.subscribers.setdefault(delay_s, [])
 | 
				
			||||||
    subs.append(ctx)
 | 
					    await ctx.started()
 | 
				
			||||||
 | 
					    async with ctx.open_stream() as stream:
 | 
				
			||||||
 | 
					        subs.append(stream)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        # stream and block until cancelled
 | 
					 | 
				
			||||||
        await trio.sleep_forever()
 | 
					 | 
				
			||||||
    finally:
 | 
					 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            subs.remove(ctx)
 | 
					            # stream and block until cancelled
 | 
				
			||||||
        except ValueError:
 | 
					            await trio.sleep_forever()
 | 
				
			||||||
            log.error(
 | 
					        finally:
 | 
				
			||||||
                f'iOHLC step stream was already dropped for {ctx.chan.uid}?'
 | 
					            try:
 | 
				
			||||||
            )
 | 
					                subs.remove(stream)
 | 
				
			||||||
 | 
					            except ValueError:
 | 
				
			||||||
 | 
					                log.error(
 | 
				
			||||||
 | 
					                    f'iOHLC step stream was already dropped {ctx.chan.uid}?'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def sample_and_broadcast(
 | 
					async def sample_and_broadcast(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue