From bd180a3482e58a871854b56f86cfca0e37a49a50 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 9 Jan 2021 10:54:45 -0500 Subject: [PATCH] Add trades data stream routine to Feed --- piker/data/__init__.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 579e596f..cbdd7e9f 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -144,9 +144,11 @@ class Feed: name: str stream: AsyncIterator[Dict[str, Any]] shm: ShmArray + mod: ModuleType # ticks: ShmArray _brokerd_portal: tractor._portal.Portal - _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + _index_stream: Optional[AsyncIterator[int]] = None + _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None async def receive(self) -> dict: return await self.stream.__anext__() @@ -164,6 +166,26 @@ class Feed: return self._index_stream + async def recv_trades_data(self) -> AsyncIterator[dict]: + + if not getattr(self.mod, 'stream_trades', False): + log.warning(f"{self.mod.name} doesn't have trade data support yet :(") + + # yah this is bullshitty but it worx + async def nuttin(): + yield + return + + return nuttin() + + if not self._trade_stream: + self._trade_stream = await self._brokerd_portal.run( + self.mod.stream_trades, + topics=['all'], # do we need this? + ) + + return self._trade_stream + def sym_to_shm_key( broker: str, @@ -228,5 +250,6 @@ async def open_feed( name=name, stream=stream, shm=shm, + mod=mod, _brokerd_portal=portal, )