Begin to use `@tractor.msg.pub` throughout streaming API

Since the new FSP system will require time aligned data amongst actors,
it makes sense to share broker data feeds as much as possible on a local
system. There doesn't seem to be downside to this approach either since
if not fanning-out in our code, the broker (server) has to do it anyway
(and who knows how junk their implementation is) though with more
clients, sockets etc. in memory on our end. It also preps the code for
introducing a more "serious" pub-sub systems like zeromq/nanomessage.
unleash_the_kraken
Tyler Goodlet 2020-08-19 07:42:49 -04:00
parent 44010abf4d
commit ea8205968c
1 changed files with 24 additions and 3 deletions

View File

@ -4,7 +4,7 @@ Kraken backend.
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, asdict, field
from itertools import starmap from itertools import starmap
from typing import List, Dict, Any from typing import List, Dict, Any, Callable
import json import json
import time import time
@ -144,7 +144,9 @@ class OHLC:
setattr(self, f, val.type(getattr(self, f))) setattr(self, f, val.type(getattr(self, f)))
@tractor.msg.pub
async def stream_quotes( async def stream_quotes(
get_topics: Callable,
# These are the symbols not expected by the ws api # These are the symbols not expected by the ws api
# they are looked up inside this routine. # they are looked up inside this routine.
symbols: List[str] = ['XBTUSD', 'XMRUSD'], symbols: List[str] = ['XBTUSD', 'XMRUSD'],
@ -181,6 +183,10 @@ async def stream_quotes(
# 'depth': '25', # 'depth': '25',
}, },
} }
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_message(json.dumps(subs)) await ws.send_message(json.dumps(subs))
async def recv(): async def recv():
@ -222,7 +228,14 @@ async def stream_quotes(
# pull a first quote and deliver # pull a first quote and deliver
ohlc_gen = recv_ohlc() ohlc_gen = recv_ohlc()
ohlc_last = await ohlc_gen.__anext__() ohlc_last = await ohlc_gen.__anext__()
yield asdict(ohlc_last)
# seriously eh? what's with this non-symmetry everywhere
# in subscription systems...
quote = asdict(ohlc_last)
topic = quote['pair'].replace('/', '')
# packetize as {topic: quote}
yield {topic: quote}
# keep start of last interval for volume tracking # keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime last_interval_start = ohlc_last.etime
@ -246,8 +259,16 @@ async def stream_quotes(
'price': ohlc.close, 'price': ohlc.close,
'size': tick_volume, 'size': tick_volume,
}) })
yield asdict(ohlc)
# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
quote = asdict(ohlc)
print(quote)
topic = quote['pair'].replace('/', '')
yield {topic: quote}
ohlc_last = ohlc ohlc_last = ohlc
except (ConnectionClosed, DisconnectionTimeout): except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting") log.exception("Good job kraken...reconnecting")