Factor out ws msg hearbeat and error handling

Move the core ws message handling into `stream_messages()` and call that
from 2 new stream processors: `process_data_feed_msgs()` and
`process_order_msgs()`. Add comments for hints on how to implement the
order msg parsing as well as `pprint` received msgs to console for now.
kraken_gb
Tyler Goodlet 2022-02-11 11:20:13 -05:00
parent 37df05c260
commit 30fdf550d7
1 changed files with 81 additions and 91 deletions

View File

@ -40,17 +40,17 @@ from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws from ..data._web_bs import open_autorecon_ws, NoBsWs
from ..clearing._messages import ( from ..clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus, BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill BrokerdOrderAck, BrokerdError, BrokerdCancel,
BrokerdFill,
) )
import urllib.parse import urllib.parse
import hashlib import hashlib
import hmac import hmac
import base64 import base64
import pandas as pd
log = get_logger(__name__) log = get_logger(__name__)
@ -157,7 +157,7 @@ def get_config() -> dict[str, Any]:
def get_kraken_signature( def get_kraken_signature(
urlpath: str, urlpath: str,
data: Dict[str, Any], data: Dict[str, Any],
secret: str secret: str
) -> str: ) -> str:
postdata = urllib.parse.urlencode(data) postdata = urllib.parse.urlencode(data)
@ -171,9 +171,9 @@ def get_kraken_signature(
class InvalidKey(ValueError): class InvalidKey(ValueError):
"""EAPI:Invalid key """EAPI:Invalid key
This error is returned when the API key used for the call is This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one Settings -> API tab of account management or generate a new one
and update your application.""" and update your application."""
@ -686,76 +686,27 @@ async def trades_dialogue(
): ):
## TODO: maybe add multiple accounts ## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
async with open_autorecon_ws( async with open_autorecon_ws(
'wss://ws-auth.kraken.com/', 'wss://ws-auth.kraken.com/',
fixture=subscribe, fixture=subscribe,
token=token, token=token,
) as ws: ) as ws:
from pprint import pprint
while True: async for msg in process_order_msgs(ws):
with trio.move_on_after(5) as cs: pprint(msg)
msg = await ws.recv_msg()
print(msg)
## pull a first quote and deliver
#msg_gen = stream_messages(ws)
## TODO: use ``anext()`` when it lands in 3.10!
#typ, ohlc_last = await msg_gen.__anext__()
#topic, quote = normalize(ohlc_last)
#first_quote = {topic: quote}
#task_status.started((init_msgs, first_quote))
## lol, only "closes" when they're margin squeezing clients ;P
#feed_is_live.set()
## keep start of last interval for volume tracking
#last_interval_start = ohlc_last.etime
## start streaming
#async for typ, ohlc in msg_gen:
# if typ == 'ohlc':
# # TODO: can get rid of all this by using
# # ``trades`` subscription...
# # generate tick values to match time & sales pane:
# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
# volume = ohlc.volume
# # new OHLC sample interval
# if ohlc.etime > last_interval_start:
# last_interval_start = ohlc.etime
# tick_volume = volume
# else:
# # this is the tick volume *within the interval*
# tick_volume = volume - ohlc_last.volume
# ohlc_last = ohlc
# last = ohlc.close
# if tick_volume:
# ohlc.ticks.append({
# 'type': 'trade',
# 'price': last,
# 'size': tick_volume,
# })
# topic, quote = normalize(ohlc)
# elif typ == 'l1':
# quote = ohlc
# topic = quote['symbol'].lower()
# await send_chan.send({topic: quote})
async def stream_messages(ws): async def stream_messages(
ws: NoBsWs,
):
'''
Message stream parser and heartbeat handler.
Deliver ws subscription messages as well as handle heartbeat logic
though a single async generator.
'''
too_slow_count = last_hb = 0 too_slow_count = last_hb = 0
while True: while True:
@ -793,39 +744,76 @@ async def stream_messages(ws):
if err: if err:
raise BrokerError(err) raise BrokerError(err)
else: else:
chan_id, *payload_array, chan_name, pair = msg yield msg
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) async def process_data_feed_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
elif 'spread' in chan_name: '''
async for msg in stream_messages(ws):
bid, ask, ts, bsize, asize = map(float, payload_array[0]) chan_id, *payload_array, chan_name, pair = msg
# TODO: really makes you think IB has a horrible API... if 'ohlc' in chan_name:
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize}, yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]: elif 'spread' in chan_name:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else: bid, ask, ts, bsize, asize = map(float, payload_array[0])
print(f'UNHANDLED MSG: {msg}')
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
async def process_order_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
'''
async for msg in stream_messages(ws):
# TODO: write your order event parser here!
# HINT: create a ``pydantic.BaseModel`` to parse and validate
# and then in the caller recast to our native ``BrokerdX`` msg types.
# form of order msgs:
# [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544',
# 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':
# '0.00000000', 'fee': '0.00000000', 'avg_price':
# '0.00000000', 'userref': 1, 'cancel_reason': 'User
# requested'}}], 'openOrders', {'sequence': 4}]
yield msg
def normalize( def normalize(
ohlc: OHLC, ohlc: OHLC,
) -> dict: ) -> dict:
quote = asdict(ohlc) quote = asdict(ohlc)
quote['broker_ts'] = quote['time'] quote['broker_ts'] = quote['time']
@ -967,13 +955,14 @@ async def stream_quotes(
# see the tips on reconnection logic: # see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs
async with open_autorecon_ws( async with open_autorecon_ws(
'wss://ws.kraken.com/', 'wss://ws.kraken.com/',
fixture=subscribe, fixture=subscribe,
) as ws: ) as ws:
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await msg_gen.__anext__()
@ -1032,6 +1021,7 @@ async def stream_quotes(
@tractor.context @tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> Client: ) -> Client:
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client: