Flip `.feed` and `._sampling` over to new stuff

In `.feed` and `._sampling` move to using the new
`tractor.Context.open_stream(allow_overruns: bool)` (cough, A BREAKING
CHANGE).

Also set `Flume.mkt` during construction in `.feed.open_feed()`.
rekt_pps
Tyler Goodlet 2023-04-14 01:50:36 -04:00
parent a301fabd6c
commit 0917b580c9
2 changed files with 12 additions and 27 deletions

View File

@ -782,9 +782,6 @@ async def uniform_rate_send(
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
''' '''
# try not to error-out on overruns of the subscribed client
stream._ctx._backpressure = True
# TODO: compute the approx overhead latency per cycle # TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616 left_to_sleep = throttle_period = 1/rate - 0.000616

View File

@ -1116,7 +1116,7 @@ async def allocate_persistent_feed(
# TODO: we have to use this for now since currently the # TODO: we have to use this for now since currently the
# MktPair above doesn't render the correct output key it seems # MktPair above doesn't render the correct output key it seems
# when we provide the `MktInfo` here?..? # when we provide the `MktInfo` here?..?
symbol=symbol, mkt=symbol,
first_quote=first_quote, first_quote=first_quote,
_rt_shm_token=rt_shm.token, _rt_shm_token=rt_shm.token,
@ -1206,10 +1206,6 @@ async def open_feed_bus(
symbol. symbol.
''' '''
# ensure that a quote feed stream which is pushing too fast doesn't
# cause and overrun in the client.
ctx._backpressure = True
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
@ -1285,8 +1281,7 @@ async def open_feed_bus(
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(
{fqsn: flume.to_msg() {fqsn: flume.to_msg()
for fqsn, flume in flumes.items() for fqsn, flume in flumes.items()}
}
) )
if not start_stream: if not start_stream:
@ -1295,7 +1290,12 @@ async def open_feed_bus(
# real-time stream loop # real-time stream loop
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream(
# NOTE we allow this since it's common to have the live
# quote feed actor's sampling task push faster then the
# the local UI-graphics code during startup.
allow_overruns=True,
) as stream,
): ):
local_subs: dict[str, set[tuple]] = {} local_subs: dict[str, set[tuple]] = {}
@ -1323,7 +1323,6 @@ async def open_feed_bus(
# a max ``tick_throttle`` instantaneous rate. # a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
ctx._backpressure = False
cs = await bus.start_task( cs = await bus.start_task(
uniform_rate_send, uniform_rate_send,
tick_throttle, tick_throttle,
@ -1455,14 +1454,6 @@ class Feed(Struct):
_max_sample_rate: int = 1 _max_sample_rate: int = 1
# @property
# def portal(self) -> tractor.Portal:
# return self._portal
# @property
# def name(self) -> str:
# return self.mod.name
async def pause(self) -> None: async def pause(self) -> None:
for stream in set(self.streams.values()): for stream in set(self.streams.values()):
await stream.send('pause') await stream.send('pause')
@ -1537,7 +1528,7 @@ async def maybe_open_feed(
'tick_throttle': kwargs.get('tick_throttle'), 'tick_throttle': kwargs.get('tick_throttle'),
# XXX: super critical to have bool defaults here XD # XXX: super critical to have bool defaults here XD
'backpressure': kwargs.get('backpressure', True), 'allow_overruns': kwargs.get('allow_overruns', True),
'start_stream': kwargs.get('start_stream', True), 'start_stream': kwargs.get('start_stream', True),
}, },
key=fqsn, key=fqsn,
@ -1569,7 +1560,7 @@ async def open_feed(
fqsns: list[str], fqsns: list[str],
loglevel: str | None = None, loglevel: str | None = None,
backpressure: bool = True, allow_overruns: bool = True,
start_stream: bool = True, start_stream: bool = True,
tick_throttle: float | None = None, # Hz tick_throttle: float | None = None, # Hz
@ -1659,9 +1650,6 @@ async def open_feed(
(brokermod, bfqsns), (brokermod, bfqsns),
) in zip(ctxs, providers.items()): ) in zip(ctxs, providers.items()):
# NOTE: do it asap to avoid overruns during multi-feed setup?
ctx._backpressure = backpressure
for fqsn, flume_msg in flumes_msg_dict.items(): for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg) flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn assert flume.symbol.fqsn == fqsn
@ -1683,11 +1671,11 @@ async def open_feed(
stream_ctxs.append( stream_ctxs.append(
ctx.open_stream( ctx.open_stream(
# XXX: be explicit about stream backpressure # XXX: be explicit about stream overruns
# since we should **never** overrun on feeds # since we should **never** overrun on feeds
# being too fast, which will pretty much # being too fast, which will pretty much
# always happen with HFT XD # always happen with HFT XD
backpressure=backpressure, allow_overruns=allow_overruns,
) )
) )