From f5b8b9a14fbb2cec35eff4813b8e7e5301c8b3ff Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Tue, 28 Feb 2023 01:51:03 -0300 Subject: [PATCH] Add sym registry to PaperBoi as well as a sym ref on Transaction Add decimal quantize API to Symbol to simplify by-broker truncation Add symbol info to `pps.toml` Move _assert call to outside the _async_main context manager Minor indentation and styling changes, also convert a few prints to log calls Fix multi write / race condition on open_pps call Switch open_pps to not write by default Fix integer math kraken syminfo _tick_size initialization --- piker/brokers/kraken/broker.py | 3 +-- piker/brokers/kraken/feed.py | 4 +-- piker/clearing/_paper_engine.py | 47 ++++++++++++++++++++------------- piker/data/_source.py | 16 +++++++---- piker/pp.py | 41 ++++++++++++++++++---------- tests/test_paper.py | 27 ++++++++----------- 6 files changed, 80 insertions(+), 58 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 9378db17..7700d86b 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -469,8 +469,7 @@ async def trades_dialogue( with ( open_pps( 'kraken', - acctid, - write_on_exit=True, + acctid ) as table, open_trade_ledger( diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 57fc0126..3d795098 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -345,8 +345,8 @@ async def stream_quotes( f'Missing msg fields {fields_diff}' ) syminfo = si.to_dict() - syminfo['price_tick_size'] = 1 / 10**si.pair_decimals - syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals + syminfo['price_tick_size'] = 1. / 10**si.pair_decimals + syminfo['lot_tick_size'] = 1. / 10**si.lot_decimals syminfo['asset_type'] = 'crypto' sym_infos[sym] = syminfo ws_pairs[sym] = si.wsname diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index fc37f1e4..7a093ad4 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -38,6 +38,7 @@ import tractor from .. import data from ..data.types import Struct +from ..data._source import Symbol from ..pp import ( Position, Transaction, @@ -81,6 +82,7 @@ class PaperBoi(Struct): _reqids: bidict _positions: dict[str, Position] _trade_ledger: dict[str, Any] + _syms: dict[str, Symbol] = {} # init edge case L1 spread last_ask: tuple[float, float] = (float('inf'), 0) # price, size @@ -252,6 +254,7 @@ class PaperBoi(Struct): key = fqsn.rstrip(f'.{self.broker}') t = Transaction( fqsn=fqsn, + sym=self._syms[fqsn], tid=oid, size=size, price=price, @@ -261,27 +264,29 @@ class PaperBoi(Struct): ) with ( - open_trade_ledger(self.broker, 'paper') as ledger, - open_pps(self.broker, 'paper', True) as table - ): - ledger.update({oid: t.to_dict()}) - # Write to pps toml right now - table.update_from_trans({oid: t}) + open_trade_ledger(self.broker, 'paper') as ledger, + open_pps(self.broker, 'paper', write_on_exit=True) as table + ): + tx = t.to_dict() + tx.pop('sym') + ledger.update({oid: tx}) + # Write to pps toml right now + table.update_from_trans({oid: t}) - pp = table.pps[key] - pp_msg = BrokerdPosition( - broker=self.broker, - account='paper', - symbol=fqsn, - # TODO: we need to look up the asset currency from - # broker info. i guess for crypto this can be - # inferred from the pair? - currency=key, - size=pp.size, - avg_price=pp.ppu, - ) + pp = table.pps[key] + pp_msg = BrokerdPosition( + broker=self.broker, + account='paper', + symbol=fqsn, + # TODO: we need to look up the asset currency from + # broker info. i guess for crypto this can be + # inferred from the pair? + currency=key, + size=pp.size, + avg_price=pp.ppu, + ) - await self.ems_trades_stream.send(pp_msg) + await self.ems_trades_stream.send(pp_msg) async def simulate_fills( @@ -567,6 +572,10 @@ async def trades_dialogue( # TODO: load postions from ledger file _trade_ledger={}, + _syms={ + fqsn: flume.symbol + for fqsn, flume in feed.flumes.items() + } ) n.start_soon( diff --git a/piker/data/_source.py b/piker/data/_source.py index 87ba74a3..e8f09484 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -18,8 +18,8 @@ numpy data source coversion helpers. """ from __future__ import annotations +from decimal import Decimal, ROUND_HALF_EVEN from typing import Any -import decimal from bidict import bidict import numpy as np @@ -80,7 +80,7 @@ def float_digits( if value == 0: return 0 - return int(-decimal.Decimal(str(value)).as_tuple().exponent) + return int(-Decimal(str(value)).as_tuple().exponent) def ohlc_zeros(length: int) -> np.ndarray: @@ -156,14 +156,14 @@ class Symbol(Struct): ) -> Symbol: tick_size = info.get('price_tick_size', 0.01) - lot_tick_size = info.get('lot_tick_size', 0.0) + lot_size = info.get('lot_tick_size', 0.0) return Symbol( key=symbol, tick_size=tick_size, - lot_tick_size=lot_tick_size, + lot_tick_size=lot_size, tick_size_digits=float_digits(tick_size), - lot_size_digits=float_digits(lot_tick_size), + lot_size_digits=float_digits(lot_size), suffix=suffix, broker_info={broker: info}, ) @@ -254,6 +254,12 @@ class Symbol(Struct): return keys + def decimal_quant(self, d: Decimal): + digits = self.lot_size_digits + return d.quantize( + Decimal(f'1.{"0".ljust(digits, "0")}'), + rounding=ROUND_HALF_EVEN + ) def _nan_to_closest_num(array: np.ndarray): """Return interpolated values instead of NaN. diff --git a/piker/pp.py b/piker/pp.py index 1e4cd014..83e43731 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -45,7 +45,7 @@ import toml from . import config from .brokers import get_brokermod from .clearing._messages import BrokerdPosition, Status -from .data._source import Symbol +from .data._source import Symbol, unpack_fqsn from .log import get_logger from .data.types import Struct @@ -83,7 +83,7 @@ def open_trade_ledger( with open(tradesfile, 'rb') as cf: start = time.time() ledger = tomli.load(cf) - print(f'Ledger load took {time.time() - start}s') + log.info(f'Ledger load took {time.time() - start}s') cpy = ledger.copy() try: @@ -92,7 +92,7 @@ def open_trade_ledger( if cpy != ledger: # TODO: show diff output? # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ledger for {tradesfile}:\n') + log.info(f'Updating ledger for {tradesfile}:\n') ledger.update(cpy) # we write on close the mutated ledger data with open(tradesfile, 'w') as cf: @@ -103,17 +103,18 @@ class Transaction(Struct, frozen=True): # TODO: should this be ``.to`` (see below)? fqsn: str + sym: Symbol tid: Union[str, int] # unique transaction id size: float price: float cost: float # commisions or other additional costs dt: datetime - expiry: Optional[datetime] = None + expiry: datetime | None = None # optional key normally derived from the broker # backend which ensures the instrument-symbol this record # is for is truly unique. - bsuid: Optional[Union[str, int]] = None + bsuid: Union[str, int] | None = None # optional fqsn for the source "asset"/money symbol? # from: Optional[str] = None @@ -190,9 +191,20 @@ class Position(Struct): # listing venue here even when the backend isn't providing # it via the trades ledger.. # drop symbol obj in serialized form - s = d.pop('symbol') + s = d.get('symbol') fqsn = s.front_fqsn() + broker, key, suffix = unpack_fqsn(fqsn) + sym_info = s.broker_info[broker] + + d['symbol'] = { + 'info': { + 'asset_type': sym_info['asset_type'], + 'price_tick_size': sym_info['price_tick_size'], + 'lot_tick_size': sym_info['lot_tick_size'] + } + } + if self.expiry is None: d.pop('expiry', None) elif expiry: @@ -467,8 +479,7 @@ class Position(Struct): if self.split_ratio is not None: size = round(size * self.split_ratio) - return float(Decimal(size).quantize( - Decimal('1.0000'), rounding=ROUND_HALF_EVEN)) + return float(self.symbol.decimal_quant(Decimal(size))) def minimize_clears( self, @@ -512,7 +523,7 @@ class Position(Struct): 'cost': t.cost, 'price': t.price, 'size': t.size, - 'dt': t.dt, + 'dt': t.dt } # TODO: compute these incrementally instead @@ -559,7 +570,7 @@ class PpTable(Struct): Symbol.from_fqsn( t.fqsn, info={}, - ), + ) if not t.sym else t.sym, size=0.0, ppu=0.0, bsuid=t.bsuid, @@ -620,7 +631,7 @@ class PpTable(Struct): # XXX: debug hook for size mismatches # qqqbsuid = 320227571 # if bsuid == qqqbsuid: - # breakpoint() + # xbreakpoint() pp.ensure_state() @@ -682,10 +693,10 @@ class PpTable(Struct): ''' # TODO: show diff output? # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ``pps.toml`` for {path}:\n') - + log.info(f'Updating ``pps.toml`` for {path}:\n') # active, closed_pp_objs = table.dump_active() pp_entries = self.to_toml() + log.info(pp_entries) self.conf[self.brokername][self.acctid] = pp_entries # TODO: why tf haven't they already done this for inline @@ -883,7 +894,6 @@ def open_pps( brokername: str, acctid: str, write_on_exit: bool = False, - ) -> Generator[PpTable, None, None]: ''' Read out broker-specific position entries from @@ -937,8 +947,11 @@ def open_pps( dtstr = clears_table['dt'] dt = pendulum.parse(dtstr) clears_table['dt'] = dt + trans.append(Transaction( fqsn=bsuid, + sym=Symbol.from_fqsn( + fqsn, entry['symbol']), bsuid=bsuid, tid=tid, size=clears_table['size'], diff --git a/tests/test_paper.py b/tests/test_paper.py index f8a7fd24..8da1cf12 100644 --- a/tests/test_paper.py +++ b/tests/test_paper.py @@ -84,25 +84,20 @@ async def _async_main( case {'name': 'position'}: break - if ( - assert_entries - or assert_pps - or assert_zeroed_pps - or assert_msg - ): - _assert( - assert_entries, - assert_pps, - assert_zeroed_pps, - pps, - last_msg, - size, - executions, - ) - # Teardown piker like a user would raise KeyboardInterrupt + if assert_entries or assert_pps or assert_zeroed_pps or assert_msg: + _assert( + assert_entries, + assert_pps, + assert_zeroed_pps, + pps, + last_msg, + size, + executions, + ) + def _assert( assert_entries,