Add portal getter, store throttle rate

fsp_feeds
Tyler Goodlet 2021-09-16 09:17:14 -04:00
parent bb5916d6a9
commit fd8be33f10
2 changed files with 15 additions and 7 deletions

View File

@ -373,7 +373,9 @@ async def open_brokerd_trades_dialogue(
broker = feed.mod.name broker = feed.mod.name
# TODO: make a `tractor` bug/test for this! # TODO: make a `tractor` bug/test for this!
# portal = feed._brokerd_portal # if only i could member what the problem was..
# probably some GC of the portal thing?
# portal = feed.portal
# XXX: we must have our own portal + channel otherwise # XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed # when the data feed closes it may result in a half-closed

View File

@ -393,18 +393,23 @@ class Feed:
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
first_quotes: dict # symbol names to first quote dicts first_quotes: dict # symbol names to first quote dicts
stream: trio.abc.ReceiveChannel[dict[str, Any]]
_brokerd_portal: tractor._portal.Portal _portal: tractor.Portal
stream: trio.abc.ReceiveChannel[dict[str, Any]]
throttle_rate: Optional[int] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
_max_sample_rate: int = 0 _max_sample_rate: int = 0
search: Callable[..., Awaitable] = None
# cache of symbol info messages received as first message when # cache of symbol info messages received as first message when
# a stream startsc. # a stream startsc.
symbols: dict[str, Symbol] = field(default_factory=dict) symbols: dict[str, Symbol] = field(default_factory=dict)
@property
def portal(self) -> tractor.Portal:
return self._portal
async def receive(self) -> dict: async def receive(self) -> dict:
return await self.stream.receive() return await self.stream.receive()
@ -418,7 +423,7 @@ class Feed:
delay_s = delay_s or self._max_sample_rate delay_s = delay_s or self._max_sample_rate
async with open_sample_step_stream( async with open_sample_step_stream(
self._brokerd_portal, self.portal,
delay_s, delay_s,
) as istream: ) as istream:
yield istream yield istream
@ -526,7 +531,8 @@ async def open_feed(
mod=mod, mod=mod,
first_quotes=first_quotes, first_quotes=first_quotes,
stream=stream, stream=stream,
_brokerd_portal=portal, _portal=portal,
throttle_rate=tick_throttle,
) )
ohlc_sample_rates = [] ohlc_sample_rates = []