Compare commits

..

No commits in common. "30fdf550d7fa41eee211569023e34989a7129eb9" and "20a5ffdc2b78bbea867a783c7f0aa140e65ab316" have entirely different histories.

5 changed files with 66 additions and 404 deletions

View File

@ -8,8 +8,8 @@ expires_at = 1616095326.355846
[kraken] [kraken]
key_descr = "api_0" key_descr = "api_0"
api_key = "" public_key = ""
secret = "" private_key = ""
[ib] [ib]
host = "127.0.0.1" host = "127.0.0.1"

View File

@ -33,24 +33,20 @@ import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto import wsproto
from itertools import count
from .. import config from .. import config
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import open_autorecon_ws
from ..clearing._messages import ( from ..clearing._messages import BrokerdPosition
BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel,
BrokerdFill,
)
import urllib.parse import urllib.parse
import hashlib import hashlib
import hmac import hmac
import base64 import base64
import pandas as pd
log = get_logger(__name__) log = get_logger(__name__)
@ -142,19 +138,6 @@ class OHLC:
ticks: List[Any] = field(default_factory=list) ticks: List[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
return {}
return section
def get_kraken_signature( def get_kraken_signature(
urlpath: str, urlpath: str,
data: Dict[str, Any], data: Dict[str, Any],
@ -187,7 +170,6 @@ class Client:
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}) })
self._pairs: list[str] = [] self._pairs: list[str] = []
self._name = ''
self._api_key = '' self._api_key = ''
self._secret = '' self._secret = ''
@ -235,7 +217,7 @@ class Client:
) )
return resproc(resp, log) return resproc(resp, log)
async def kraken_endpoint( async def get_user_data(
self, self,
method: str, method: str,
data: Dict[str, Any] data: Dict[str, Any]
@ -243,17 +225,18 @@ class Client:
uri_path = f'/0/private/{method}' uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time())) data['nonce'] = str(int(1000*time.time()))
resp = await self._private(method, data, uri_path) resp = await self._private(method, data, uri_path)
return resp err = resp['error']
if err:
print(err)
return resp['result']
async def get_positions( async def get_positions(
self, self,
data: Dict[str, Any] = {} data: Dict[str, Any] = {}
) -> Dict[str, Any]: ) -> Dict[str, Any]:
resp = await self.kraken_endpoint('Balance', data) balances = await self.get_user_data('Balance', data)
balances = resp['result']
## TODO: grab all entries, not just first 50 ## TODO: grab all entries, not just first 50
resp = await self.kraken_endpoint('TradesHistory', data) traders = await self.get_user_data('TradesHistory', data)
traders = resp['result']
positions = {} positions = {}
vols = {} vols = {}
@ -275,46 +258,7 @@ class Client:
else: else:
positions[pair] /= asset_balance positions[pair] /= asset_balance
return positions, vols return positions
async def submit_limit(
self,
oid: str,
symbol: str,
price: float,
action: str,
size: float,
# account: str,
reqid: int = None,
) -> int:
"""Place an order and return integer request id provided by client.
"""
# Build order data for kraken api
data = {
"userref": reqid,
"ordertype": "limit",
"type": action,
"volume": str(size),
"pair": symbol,
"price": str(price),
# set to True test AddOrder call without a real submission
"validate": False
}
resp = await self.kraken_endpoint('AddOrder', data)
return resp
async def submit_cancel(
self,
reqid: str,
) -> None:
"""Send cancel request for order id ``reqid``.
"""
# txid is a transaction id given by kraken
data = {"txid": reqid}
resp = await self.kraken_endpoint('CancelOrder', data)
return resp
async def symbol_info( async def symbol_info(
self, self,
@ -429,15 +373,16 @@ class Client:
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
## TODO: maybe add conditional based on section conf, path = config.load()
section = get_config() section = conf.get('kraken')
client._name = section['key_descr']
client._api_key = section['api_key'] client._api_key = section['api_key']
client._secret = section['secret'] client._secret = section['secret']
## TODO: Add a client attribute to hold this info data = {
#data = { # add non-nonce and non-ofs vars
# # add non-nonce and non-ofs vars }
#} # positions = await client.get_positions(data)
# await tractor.breakpoint()
# at startup, load all symbols locally for fast search # at startup, load all symbols locally for fast search
await client.cache_symbols() await client.cache_symbols()
@ -445,166 +390,6 @@ async def get_client() -> Client:
yield client yield client
def pack_position(
acc: str,
symkey: str,
pos: float,
vol: float
) -> dict[str, Any]:
return BrokerdPosition(
broker='kraken',
account=acc,
symbol=symkey,
currency=symkey[-3:],
size=float(vol),
avg_price=float(pos),
)
def normalize_symbol(
ticker: str
) -> str:
symlen = len(ticker)
if symlen == 6:
return ticker.lower()
else:
for sym in ['XXBT', 'XXMR', 'ZEUR']:
if sym in ticker:
ticker = ticker.replace(sym, sym[1:])
return ticker.lower()
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]:
"""Create a request subscription packet dict.
## TODO: point to the auth urls
https://docs.kraken.com/websockets/#message-subscribe
"""
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'event': 'subscribe',
'subscription': data,
}
async def handle_order_requests(
client: Client,
ems_order_stream: tractor.MsgStream,
) -> None:
request_msg: dict
order: BrokerdOrder
userref_counter = count()
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Kraken only, No account found: `{account}` ?',
).dict())
continue
# validate
temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
resp = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
## XXX: how do I handle new orders
reqid=temp_id,
)
err = resp['error']
if err:
log.error(f'Failed to submit order')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=temp_id,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
## TODO: handle multiple cancels
## txid is an array of strings
reqid = resp['result']['txid'][0]
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
# account the made the order
account=order.account
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
)
try:
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled
assert resp['error'] == []
assert resp['result']['count'] == 1
## TODO: Change this code using .get
try:
pending = resp['result']['pending']
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
except KeyError:
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
except AssertionError:
log.error(f'Order cancel was not successful')
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context @tractor.context
async def trades_dialogue( async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
@ -614,99 +399,27 @@ async def trades_dialogue(
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
# Generate # deliver positions to subscriber before anything else
# positions = await _trio_run_client_method(method='positions')
@asynccontextmanager global _accounts2clients
async def subscribe(ws: wsproto.WSConnection, token: str):
## TODO: Fix docs and points to right urls
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
trades_sub = make_auth_sub(
{'name': 'openOrders', 'token': token}
)
# TODO: we want to eventually allow unsubs which should positions = await client.get_positions()
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(trades_sub)
## trade data (aka L1) await tractor.breakpoint()
#l1_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'spread'} # 'depth': 10}
#)
## pull a first quote and deliver
#await ws.send_msg(l1_sub)
yield
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': ['openOrders'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# Authenticated block all_positions = {}
async with get_client() as client:
acc_name = 'kraken.' + client._name
positions, vols = await client.get_positions()
all_positions = [] for pos in positions:
msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
for ticker, pos in positions.items(): await ctx.started(all_positions)
norm_sym = normalize_symbol(ticker)
if float(vols[ticker]) != 0:
msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
all_positions.append(msg.dict())
open_orders = await client.kraken_endpoint('OpenOrders', {})
#await tractor.breakpoint()
await ctx.started((all_positions, (acc_name,)))
#await trio.sleep_forever()
# Get websocket token for authenticated data stream
# Assert that a token was actually received
resp = await client.kraken_endpoint('GetWebSocketsToken', {})
assert resp['error'] == []
token = resp['result']['token']
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
async with open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
from pprint import pprint
async for msg in process_order_msgs(ws):
pprint(msg)
async def stream_messages( async def stream_messages(ws):
ws: NoBsWs,
):
'''
Message stream parser and heartbeat handler.
Deliver ws subscription messages as well as handle heartbeat logic
though a single async generator.
'''
too_slow_count = last_hb = 0 too_slow_count = last_hb = 0
while True: while True:
@ -744,18 +457,6 @@ async def stream_messages(
if err: if err:
raise BrokerError(err) raise BrokerError(err)
else: else:
yield msg
async def process_data_feed_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
'''
async for msg in stream_messages(ws):
chan_id, *payload_array, chan_name, pair = msg chan_id, *payload_array, chan_name, pair = msg
if 'ohlc' in chan_name: if 'ohlc' in chan_name:
@ -785,35 +486,10 @@ async def process_data_feed_msgs(
else: else:
print(f'UNHANDLED MSG: {msg}') print(f'UNHANDLED MSG: {msg}')
yield msg
async def process_order_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
'''
async for msg in stream_messages(ws):
# TODO: write your order event parser here!
# HINT: create a ``pydantic.BaseModel`` to parse and validate
# and then in the caller recast to our native ``BrokerdX`` msg types.
# form of order msgs:
# [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544',
# 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':
# '0.00000000', 'fee': '0.00000000', 'avg_price':
# '0.00000000', 'userref': 1, 'cancel_reason': 'User
# requested'}}], 'openOrders', {'sequence': 4}]
yield msg
def normalize( def normalize(
ohlc: OHLC, ohlc: OHLC,
) -> dict: ) -> dict:
quote = asdict(ohlc) quote = asdict(ohlc)
quote['broker_ts'] = quote['time'] quote['broker_ts'] = quote['time']
@ -953,16 +629,15 @@ async def stream_quotes(
# XXX: do we need to ack the unsub? # XXX: do we need to ack the unsub?
# await ws.recv_msg() # await ws.recv_msg()
# see the tips on reconnection logic: # see the tips on reonnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs
async with open_autorecon_ws( async with open_autorecon_ws(
'wss://ws.kraken.com/', 'wss://ws.kraken.com/',
fixture=subscribe, fixture=subscribe,
) as ws: ) as ws:
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = process_data_feed_msgs(ws) msg_gen = stream_messages(ws)
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await msg_gen.__anext__()
@ -1021,7 +696,6 @@ async def stream_quotes(
@tractor.context @tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> Client: ) -> Client:
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:

View File

@ -493,8 +493,7 @@ async def open_brokerd_trades_dialogue(
finally: finally:
# parent context must have been closed # parent context must have been closed
# remove from cache so next client will respawn if needed # remove from cache so next client will respawn if needed
## TODO: Maybe add a warning _router.relays.pop(broker)
_router.relays.pop(broker, None)
@tractor.context @tractor.context

View File

@ -389,7 +389,7 @@ async def handle_order_requests(
account = request_msg['account'] account = request_msg['account']
if account != 'paper': if account != 'paper':
log.error( log.error(
'This is a paper account, only a `paper` selection is valid' 'On a paper account, only a `paper` selection is valid'
) )
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'], oid=request_msg['oid'],
@ -463,8 +463,7 @@ async def trades_dialogue(
): ):
# TODO: load paper positions per broker from .toml config file # TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]`` # and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions) await ctx.started(({}, ['paper']))
await ctx.started(({}, {'paper',}))
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,

View File

@ -53,13 +53,11 @@ class NoBsWs:
def __init__( def __init__(
self, self,
url: str, url: str,
token: str,
stack: AsyncExitStack, stack: AsyncExitStack,
fixture: Callable, fixture: Callable,
serializer: ModuleType = json, serializer: ModuleType = json,
): ):
self.url = url self.url = url
self.token = token
self.fixture = fixture self.fixture = fixture
self._stack = stack self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa self._ws: 'WebSocketConnection' = None # noqa
@ -83,15 +81,9 @@ class NoBsWs:
trio_websocket.open_websocket_url(self.url) trio_websocket.open_websocket_url(self.url)
) )
# rerun user code fixture # rerun user code fixture
if self.token == '':
ret = await self._stack.enter_async_context( ret = await self._stack.enter_async_context(
self.fixture(self) self.fixture(self)
) )
else:
ret = await self._stack.enter_async_context(
self.fixture(self, self.token)
)
assert ret is None assert ret is None
log.info(f'Connection success: {self.url}') log.info(f'Connection success: {self.url}')
@ -135,14 +127,12 @@ async def open_autorecon_ws(
# TODO: proper type annot smh # TODO: proper type annot smh
fixture: Callable, fixture: Callable,
# used for authenticated websockets
token: str = '',
) -> AsyncGenerator[tuple[...], NoBsWs]: ) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em. """Apparently we can QoS for all sorts of reasons..so catch em.
""" """
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
ws = NoBsWs(url, token, stack, fixture=fixture) ws = NoBsWs(url, stack, fixture=fixture)
await ws._connect() await ws._connect()
try: try: