From ea8205968ce9e457e60a59ab34b8b18cb1437f7f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 19 Aug 2020 07:42:49 -0400 Subject: [PATCH] Begin to use `@tractor.msg.pub` throughout streaming API Since the new FSP system will require time aligned data amongst actors, it makes sense to share broker data feeds as much as possible on a local system. There doesn't seem to be downside to this approach either since if not fanning-out in our code, the broker (server) has to do it anyway (and who knows how junk their implementation is) though with more clients, sockets etc. in memory on our end. It also preps the code for introducing a more "serious" pub-sub systems like zeromq/nanomessage. --- piker/brokers/kraken.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 605c5177..ea3850fd 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -4,7 +4,7 @@ Kraken backend. from contextlib import asynccontextmanager from dataclasses import dataclass, asdict, field from itertools import starmap -from typing import List, Dict, Any +from typing import List, Dict, Any, Callable import json import time @@ -144,7 +144,9 @@ class OHLC: setattr(self, f, val.type(getattr(self, f))) +@tractor.msg.pub async def stream_quotes( + get_topics: Callable, # These are the symbols not expected by the ws api # they are looked up inside this routine. symbols: List[str] = ['XBTUSD', 'XMRUSD'], @@ -181,6 +183,10 @@ async def stream_quotes( # 'depth': '25', }, } + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. await ws.send_message(json.dumps(subs)) async def recv(): @@ -222,7 +228,14 @@ async def stream_quotes( # pull a first quote and deliver ohlc_gen = recv_ohlc() ohlc_last = await ohlc_gen.__anext__() - yield asdict(ohlc_last) + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + quote = asdict(ohlc_last) + topic = quote['pair'].replace('/', '') + + # packetize as {topic: quote} + yield {topic: quote} # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime @@ -246,8 +259,16 @@ async def stream_quotes( 'price': ohlc.close, 'size': tick_volume, }) - yield asdict(ohlc) + + # XXX: format required by ``tractor.msg.pub`` + # requires a ``Dict[topic: str, quote: dict]`` + quote = asdict(ohlc) + print(quote) + topic = quote['pair'].replace('/', '') + yield {topic: quote} + ohlc_last = ohlc + except (ConnectionClosed, DisconnectionTimeout): log.exception("Good job kraken...reconnecting")