connect to krakens openOrders websocket

kraken_gb
Konstantine Tsafatinos 2022-02-09 21:30:39 -05:00 committed by Tyler Goodlet
parent d141981cca
commit 37df05c260
2 changed files with 168 additions and 26 deletions

View File

@ -475,6 +475,21 @@ def normalize_symbol(
return ticker.lower()
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]:
"""Create a request subscription packet dict.
## TODO: point to the auth urls
https://docs.kraken.com/websockets/#message-subscribe
"""
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'event': 'subscribe',
'subscription': data,
}
async def handle_order_requests(
client: Client,
@ -556,30 +571,35 @@ async def handle_order_requests(
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
)
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled
assert resp['error'] == []
assert resp['result']['count'] == 1
try:
pending = resp['result']['pending']
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
except KeyError:
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled
assert resp['error'] == []
assert resp['result']['count'] == 1
## TODO: Change this code using .get
try:
pending = resp['result']['pending']
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
except KeyError:
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
except AssertionError:
log.error(f'Order cancel was not successful')
else:
log.error(f'Unknown order command: {request_msg}')
@ -594,6 +614,46 @@ async def trades_dialogue(
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# Generate
@asynccontextmanager
async def subscribe(ws: wsproto.WSConnection, token: str):
## TODO: Fix docs and points to right urls
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
trades_sub = make_auth_sub(
{'name': 'openOrders', 'token': token}
)
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(trades_sub)
## trade data (aka L1)
#l1_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'spread'} # 'depth': 10}
#)
## pull a first quote and deliver
#await ws.send_msg(l1_sub)
yield
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': ['openOrders'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# Authenticated block
async with get_client() as client:
acc_name = 'kraken.' + client._name
@ -606,7 +666,7 @@ async def trades_dialogue(
if float(vols[ticker]) != 0:
msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
all_positions.append(msg.dict())
open_orders = await client.kraken_endpoint('OpenOrders', {})
#await tractor.breakpoint()
@ -614,13 +674,85 @@ async def trades_dialogue(
#await trio.sleep_forever()
# Get websocket token for authenticated data stream
# Assert that a token was actually received
resp = await client.kraken_endpoint('GetWebSocketsToken', {})
assert resp['error'] == []
token = resp['result']['token']
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
async with open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
while True:
with trio.move_on_after(5) as cs:
msg = await ws.recv_msg()
print(msg)
## pull a first quote and deliver
#msg_gen = stream_messages(ws)
## TODO: use ``anext()`` when it lands in 3.10!
#typ, ohlc_last = await msg_gen.__anext__()
#topic, quote = normalize(ohlc_last)
#first_quote = {topic: quote}
#task_status.started((init_msgs, first_quote))
## lol, only "closes" when they're margin squeezing clients ;P
#feed_is_live.set()
## keep start of last interval for volume tracking
#last_interval_start = ohlc_last.etime
## start streaming
#async for typ, ohlc in msg_gen:
# if typ == 'ohlc':
# # TODO: can get rid of all this by using
# # ``trades`` subscription...
# # generate tick values to match time & sales pane:
# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
# volume = ohlc.volume
# # new OHLC sample interval
# if ohlc.etime > last_interval_start:
# last_interval_start = ohlc.etime
# tick_volume = volume
# else:
# # this is the tick volume *within the interval*
# tick_volume = volume - ohlc_last.volume
# ohlc_last = ohlc
# last = ohlc.close
# if tick_volume:
# ohlc.ticks.append({
# 'type': 'trade',
# 'price': last,
# 'size': tick_volume,
# })
# topic, quote = normalize(ohlc)
# elif typ == 'l1':
# quote = ohlc
# topic = quote['symbol'].lower()
# await send_chan.send({topic: quote})
async def stream_messages(ws):
@ -833,7 +965,7 @@ async def stream_quotes(
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# see the tips on reonnection logic:
# see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
async with open_autorecon_ws(
'wss://ws.kraken.com/',

View File

@ -53,11 +53,13 @@ class NoBsWs:
def __init__(
self,
url: str,
token: str,
stack: AsyncExitStack,
fixture: Callable,
serializer: ModuleType = json,
):
self.url = url
self.token = token
self.fixture = fixture
self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa
@ -81,9 +83,15 @@ class NoBsWs:
trio_websocket.open_websocket_url(self.url)
)
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
if self.token == '':
ret = await self._stack.enter_async_context(
self.fixture(self)
)
else:
ret = await self._stack.enter_async_context(
self.fixture(self, self.token)
)
assert ret is None
log.info(f'Connection success: {self.url}')
@ -127,12 +135,14 @@ async def open_autorecon_ws(
# TODO: proper type annot smh
fixture: Callable,
# used for authenticated websockets
token: str = '',
) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""
async with AsyncExitStack() as stack:
ws = NoBsWs(url, stack, fixture=fixture)
ws = NoBsWs(url, token, stack, fixture=fixture)
await ws._connect()
try: