Only drop throttle feeds if channel disconnects?

incremental_update_paths
Tyler Goodlet 2022-06-01 12:13:08 -04:00
parent fc24f5efd1
commit 363ba8f9ae
2 changed files with 44 additions and 14 deletions

View File

@ -22,7 +22,7 @@ financial data flows.
from __future__ import annotations from __future__ import annotations
from collections import Counter from collections import Counter
import time import time
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional, Union
import tractor import tractor
import trio import trio
@ -32,6 +32,7 @@ from ..log import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ShmArray from ._sharedmem import ShmArray
from .feed import _FeedsBus
log = get_logger(__name__) log = get_logger(__name__)
@ -219,7 +220,7 @@ async def iter_ohlc_periods(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: '_FeedsBus', # noqa bus: _FeedsBus, # noqa
shm: ShmArray, shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
brokername: str, brokername: str,
@ -298,7 +299,13 @@ async def sample_and_broadcast(
# end up triggering backpressure which which will # end up triggering backpressure which which will
# eventually block this producer end of the feed and # eventually block this producer end of the feed and
# thus other consumers still attached. # thus other consumers still attached.
subs = bus._subscribers[broker_symbol.lower()] subs: list[
tuple[
Union[tractor.MsgStream, trio.MemorySendChannel],
tractor.Context,
Optional[float], # tick throttle in Hz
]
] = bus._subscribers[broker_symbol.lower()]
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqsn schema (but maybe it # it's own "name" into the fqsn schema (but maybe it
@ -307,7 +314,7 @@ async def sample_and_broadcast(
bsym = f'{broker_symbol}.{brokername}' bsym = f'{broker_symbol}.{brokername}'
lags: int = 0 lags: int = 0
for (stream, tick_throttle) in subs: for (stream, ctx, tick_throttle) in subs:
try: try:
with trio.move_on_after(0.2) as cs: with trio.move_on_after(0.2) as cs:
@ -319,11 +326,11 @@ async def sample_and_broadcast(
(bsym, quote) (bsym, quote)
) )
except trio.WouldBlock: except trio.WouldBlock:
ctx = getattr(stream, '_ctx', None) chan = ctx.chan
if ctx: if ctx:
log.warning( log.warning(
f'Feed overrun {bus.brokername} ->' f'Feed overrun {bus.brokername} ->'
f'{ctx.channel.uid} !!!' f'{chan.uid} !!!'
) )
else: else:
key = id(stream) key = id(stream)
@ -333,11 +340,26 @@ async def sample_and_broadcast(
f'feed @ {tick_throttle} Hz' f'feed @ {tick_throttle} Hz'
) )
if overruns[key] > 6: if overruns[key] > 6:
# TODO: should we check for the
# context being cancelled? this
# could happen but the
# channel-ipc-pipe is still up.
if not chan.connected():
log.warning( log.warning(
f'Dropping consumer {stream}' 'Dropping broken consumer:\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
) )
await stream.aclose() await stream.aclose()
raise trio.BrokenResourceError raise trio.BrokenResourceError
else:
log.warning(
'Feed getting overrun bro!\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
)
continue
else: else:
await stream.send( await stream.send(
{bsym: quote} {bsym: quote}
@ -482,6 +504,7 @@ async def uniform_rate_send(
# if the feed consumer goes down then drop # if the feed consumer goes down then drop
# out of this rate limiter # out of this rate limiter
log.warning(f'{stream} closed') log.warning(f'{stream} closed')
await stream.aclose()
return return
# reset send cycle state # reset send cycle state

View File

@ -33,6 +33,7 @@ from typing import (
Generator, Generator,
Awaitable, Awaitable,
TYPE_CHECKING, TYPE_CHECKING,
Union,
) )
import trio import trio
@ -117,7 +118,13 @@ class _FeedsBus(BaseModel):
# https://github.com/samuelcolvin/pydantic/issues/2816 # https://github.com/samuelcolvin/pydantic/issues/2816
_subscribers: dict[ _subscribers: dict[
str, str,
list[tuple[tractor.MsgStream, Optional[float]]] list[
tuple[
Union[tractor.MsgStream, trio.MemorySendChannel],
tractor.Context,
Optional[float], # tick throttle in Hz
]
]
] = {} ] = {}
async def start_task( async def start_task(
@ -1118,10 +1125,10 @@ async def open_feed_bus(
recv, recv,
stream, stream,
) )
sub = (send, tick_throttle) sub = (send, ctx, tick_throttle)
else: else:
sub = (stream, tick_throttle) sub = (stream, ctx, tick_throttle)
subs = bus._subscribers[bfqsn] subs = bus._subscribers[bfqsn]
subs.append(sub) subs.append(sub)