Factor out ws msg hearbeat and error handling

Move the core ws message handling into `stream_messages()` and call that
from 2 new stream processors: `process_data_feed_msgs()` and
`process_order_msgs()`. Add comments for hints on how to implement the
order msg parsing as well as `pprint` received msgs to console for now.
kraken_orders
Tyler Goodlet 2022-02-11 11:20:13 -05:00 committed by Konstantine Tsafatinos
parent 0c905920e2
commit 0122669dd4
1 changed files with 81 additions and 91 deletions

View File

@ -40,17 +40,17 @@ from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws from ..data._web_bs import open_autorecon_ws, NoBsWs
from ..clearing._messages import ( from ..clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus, BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill BrokerdOrderAck, BrokerdError, BrokerdCancel,
BrokerdFill,
) )
import urllib.parse import urllib.parse
import hashlib import hashlib
import hmac import hmac
import base64 import base64
import pandas as pd
log = get_logger(__name__) log = get_logger(__name__)
@ -686,76 +686,27 @@ async def trades_dialogue(
): ):
## TODO: maybe add multiple accounts ## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
async with open_autorecon_ws( async with open_autorecon_ws(
'wss://ws-auth.kraken.com/', 'wss://ws-auth.kraken.com/',
fixture=subscribe, fixture=subscribe,
token=token, token=token,
) as ws: ) as ws:
from pprint import pprint
while True: async for msg in process_order_msgs(ws):
with trio.move_on_after(5) as cs: pprint(msg)
msg = await ws.recv_msg()
print(msg)
## pull a first quote and deliver
#msg_gen = stream_messages(ws)
## TODO: use ``anext()`` when it lands in 3.10!
#typ, ohlc_last = await msg_gen.__anext__()
#topic, quote = normalize(ohlc_last)
#first_quote = {topic: quote}
#task_status.started((init_msgs, first_quote))
## lol, only "closes" when they're margin squeezing clients ;P
#feed_is_live.set()
## keep start of last interval for volume tracking
#last_interval_start = ohlc_last.etime
## start streaming
#async for typ, ohlc in msg_gen:
# if typ == 'ohlc':
# # TODO: can get rid of all this by using
# # ``trades`` subscription...
# # generate tick values to match time & sales pane:
# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
# volume = ohlc.volume
# # new OHLC sample interval
# if ohlc.etime > last_interval_start:
# last_interval_start = ohlc.etime
# tick_volume = volume
# else:
# # this is the tick volume *within the interval*
# tick_volume = volume - ohlc_last.volume
# ohlc_last = ohlc
# last = ohlc.close
# if tick_volume:
# ohlc.ticks.append({
# 'type': 'trade',
# 'price': last,
# 'size': tick_volume,
# })
# topic, quote = normalize(ohlc)
# elif typ == 'l1':
# quote = ohlc
# topic = quote['symbol'].lower()
# await send_chan.send({topic: quote})
async def stream_messages(ws): async def stream_messages(
ws: NoBsWs,
):
'''
Message stream parser and heartbeat handler.
Deliver ws subscription messages as well as handle heartbeat logic
though a single async generator.
'''
too_slow_count = last_hb = 0 too_slow_count = last_hb = 0
while True: while True:
@ -793,39 +744,76 @@ async def stream_messages(ws):
if err: if err:
raise BrokerError(err) raise BrokerError(err)
else: else:
chan_id, *payload_array, chan_name, pair = msg yield msg
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) async def process_data_feed_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
elif 'spread' in chan_name: '''
async for msg in stream_messages(ws):
bid, ask, ts, bsize, asize = map(float, payload_array[0]) chan_id, *payload_array, chan_name, pair = msg
# TODO: really makes you think IB has a horrible API... if 'ohlc' in chan_name:
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize}, yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]: elif 'spread' in chan_name:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else: bid, ask, ts, bsize, asize = map(float, payload_array[0])
print(f'UNHANDLED MSG: {msg}')
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
async def process_order_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
'''
async for msg in stream_messages(ws):
# TODO: write your order event parser here!
# HINT: create a ``pydantic.BaseModel`` to parse and validate
# and then in the caller recast to our native ``BrokerdX`` msg types.
# form of order msgs:
# [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544',
# 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':
# '0.00000000', 'fee': '0.00000000', 'avg_price':
# '0.00000000', 'userref': 1, 'cancel_reason': 'User
# requested'}}], 'openOrders', {'sequence': 4}]
yield msg
def normalize( def normalize(
ohlc: OHLC, ohlc: OHLC,
) -> dict: ) -> dict:
quote = asdict(ohlc) quote = asdict(ohlc)
quote['broker_ts'] = quote['time'] quote['broker_ts'] = quote['time']
@ -966,13 +954,14 @@ async def stream_quotes(
# see the tips on reconnection logic: # see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs
async with open_autorecon_ws( async with open_autorecon_ws(
'wss://ws.kraken.com/', 'wss://ws.kraken.com/',
fixture=subscribe, fixture=subscribe,
) as ws: ) as ws:
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await msg_gen.__anext__()
@ -1031,6 +1020,7 @@ async def stream_quotes(
@tractor.context @tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> Client: ) -> Client:
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client: