diff --git a/config/brokers.toml b/config/brokers.toml index 7d288648..20216bde 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -8,8 +8,8 @@ expires_at = 1616095326.355846 [kraken] key_descr = "api_0" -public_key = "" -private_key = "" +api_key = "" +secret = "" [ib] host = "127.0.0.1" diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 3b729ae7..87a04ab4 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -138,6 +138,19 @@ class OHLC: ticks: List[Any] = field(default_factory=list) +def get_config() -> dict[str, Any]: + + conf, path = config.load() + + section = conf.get('kraken') + + if section is None: + log.warning(f'No config section found for kraken in {path}') + return {} + + return section + + def get_kraken_signature( urlpath: str, data: Dict[str, Any], @@ -170,6 +183,7 @@ class Client: 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) self._pairs: list[str] = [] + self._name = '' self._api_key = '' self._secret = '' @@ -258,7 +272,7 @@ class Client: else: positions[pair] /= asset_balance - return positions + return positions, vols async def symbol_info( self, @@ -373,16 +387,15 @@ class Client: async def get_client() -> Client: client = Client() - conf, path = config.load() - section = conf.get('kraken') + ## TODO: maybe add conditional based on section + section = get_config() + client._name = section['key_descr'] client._api_key = section['api_key'] client._secret = section['secret'] - data = { - # add non-nonce and non-ofs vars - } - # positions = await client.get_positions(data) - - # await tractor.breakpoint() + ## TODO: Add a client attribute to hold this info + #data = { + # # add non-nonce and non-ofs vars + #} # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -390,6 +403,36 @@ async def get_client() -> Client: yield client +def pack_position( + acc: str, + symkey: str, + pos: float, + vol: float +) -> dict[str, Any]: + + return BrokerdPosition( + broker='kraken', + account=acc, + symbol=symkey, + currency=symkey[-3:], + size=float(vol), + avg_price=float(pos), + ) + + +def normalize_symbol( + ticker: str +) -> str: + symlen = len(ticker) + if symlen == 6: + return ticker.lower() + else: + for sym in ['XXBT', 'XXMR', 'ZEUR']: + if sym in ticker: + ticker = ticker.replace(sym, sym[1:]) + return ticker.lower() + + @tractor.context async def trades_dialogue( ctx: tractor.Context, @@ -399,23 +442,28 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - # deliver positions to subscriber before anything else - # positions = await _trio_run_client_method(method='positions') + # Authenticated block + async with get_client() as client: + acc_name = 'kraken.' + client._name + positions, vols = await client.get_positions() - global _accounts2clients + all_positions = [] - positions = await client.get_positions() + for ticker, pos in positions.items(): + norm_sym = normalize_symbol(ticker) + if float(vols[ticker]) != 0: + msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) + all_positions.append(msg.dict()) + + #await tractor.breakpoint() - await tractor.breakpoint() + await ctx.started((all_positions, (acc_name,))) + await trio.sleep_forever() - all_positions = {} - - for pos in positions: - msg = pack_position(pos) - all_positions[msg.symbol] = msg.dict() - - await ctx.started(all_positions) + # async with ( + # ctx.open_stream() as ems_stream, + # async def stream_messages(ws): diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ee1ad8ac..630405ea 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -493,7 +493,8 @@ async def open_brokerd_trades_dialogue( finally: # parent context must have been closed # remove from cache so next client will respawn if needed - _router.relays.pop(broker) + ## TODO: Maybe add a warning + _router.relays.pop(broker, None) @tractor.context diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 698a928f..f87e2203 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -389,7 +389,7 @@ async def handle_order_requests( account = request_msg['account'] if account != 'paper': log.error( - 'On a paper account, only a `paper` selection is valid' + 'This is a paper account, only a `paper` selection is valid' ) await ems_order_stream.send(BrokerdError( oid=request_msg['oid'], @@ -463,7 +463,8 @@ async def trades_dialogue( ): # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` - await ctx.started(({}, ['paper'])) + # await ctx.started(all_positions) + await ctx.started(({}, {'paper',})) async with ( ctx.open_stream() as ems_stream,