Add trades data stream routine to Feed

basic_orders
Tyler Goodlet 2021-01-09 10:54:45 -05:00
parent 280739b51a
commit bd180a3482
1 changed files with 24 additions and 1 deletions

View File

@ -144,9 +144,11 @@ class Feed:
name: str name: str
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray shm: ShmArray
mod: ModuleType
# ticks: ShmArray # ticks: ShmArray
_brokerd_portal: tractor._portal.Portal _brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
async def receive(self) -> dict: async def receive(self) -> dict:
return await self.stream.__anext__() return await self.stream.__anext__()
@ -164,6 +166,26 @@ class Feed:
return self._index_stream return self._index_stream
async def recv_trades_data(self) -> AsyncIterator[dict]:
if not getattr(self.mod, 'stream_trades', False):
log.warning(f"{self.mod.name} doesn't have trade data support yet :(")
# yah this is bullshitty but it worx
async def nuttin():
yield
return
return nuttin()
if not self._trade_stream:
self._trade_stream = await self._brokerd_portal.run(
self.mod.stream_trades,
topics=['all'], # do we need this?
)
return self._trade_stream
def sym_to_shm_key( def sym_to_shm_key(
broker: str, broker: str,
@ -228,5 +250,6 @@ async def open_feed(
name=name, name=name,
stream=stream, stream=stream,
shm=shm, shm=shm,
mod=mod,
_brokerd_portal=portal, _brokerd_portal=portal,
) )