Convert to stream, parse into dataclass

unleash_the_kraken
Tyler Goodlet 2020-07-05 11:43:58 -04:00
parent 2738b54851
commit 7bccfc7b10
1 changed files with 60 additions and 31 deletions

View File

@ -1,32 +1,33 @@
""" """
Kraken backend. Kraken backend.
""" """
from dataclasses import dataclass, asdict
from typing import List from typing import List
import json import json
import trio
import tractor import tractor
from trio_websocket import open_websocket_url from trio_websocket import open_websocket_url
if __name__ == '__main__': async def stream_quotes(
pairs: List[str] = ['BTC/USD', 'XRP/USD'],
sub_type: str = 'ohlc',
) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``.
async def stream_quotes( ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
pairs: List[str] = ['BTC/USD'],
) -> None:
"""Subscribe ohlc quotes for ``pairs``.
``pairs`` must be formatted like `crypto/fiat`.
""" """
async with open_websocket_url( async with open_websocket_url(
'wss://ws.kraken.com', 'wss://ws.kraken.com',
) as ws: ) as ws:
# setup subs # setup subs
# see: https://docs.kraken.com/websockets/#message-subscribe
subs = { subs = {
'event': 'subscribe',
'pair': pairs, 'pair': pairs,
'event': 'subscribe',
'subscription': { 'subscription': {
'name': 'ohlc', 'name': sub_type,
'interval': 1, # 1 min
# 'name': 'ticker', # 'name': 'ticker',
# 'name': 'openOrders', # 'name': 'openOrders',
# 'depth': '25', # 'depth': '25',
@ -34,11 +35,39 @@ if __name__ == '__main__':
} }
await ws.send_message(json.dumps(subs)) await ws.send_message(json.dumps(subs))
async def recv():
return json.loads(await ws.get_message())
@dataclass
class OHLC:
chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: int # Accumulated volume within interval
count: int # Number of trades within interval
while True: while True:
msg = json.loads(await ws.get_message()) msg = await recv()
if isinstance(msg, dict) and msg.get('event') == 'heartbeat': if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
continue continue
else:
chan_id, ohlc_array, chan_name, pair = msg
ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array)
yield ohlc
print(msg)
trio.run(stream_quotes) if __name__ == '__main__':
async def stream_ohlc():
async for msg in stream_quotes():
print(asdict(msg))
tractor.run(stream_ohlc)