Add sym registry to PaperBoi as well as a sym ref on Transaction
Add decimal quantize API to Symbol to simplify by-broker truncation Add symbol info to `pps.toml` Move _assert call to outside the _async_main context manager Minor indentation and styling changes, also convert a few prints to log calls Fix multi write / race condition on open_pps call Switch open_pps to not write by default Fix integer math kraken syminfo _tick_size initializationdecimalization_take_2
parent
dc78994dcf
commit
f5b8b9a14f
|
@ -469,8 +469,7 @@ async def trades_dialogue(
|
||||||
with (
|
with (
|
||||||
open_pps(
|
open_pps(
|
||||||
'kraken',
|
'kraken',
|
||||||
acctid,
|
acctid
|
||||||
write_on_exit=True,
|
|
||||||
) as table,
|
) as table,
|
||||||
|
|
||||||
open_trade_ledger(
|
open_trade_ledger(
|
||||||
|
|
|
@ -345,8 +345,8 @@ async def stream_quotes(
|
||||||
f'Missing msg fields {fields_diff}'
|
f'Missing msg fields {fields_diff}'
|
||||||
)
|
)
|
||||||
syminfo = si.to_dict()
|
syminfo = si.to_dict()
|
||||||
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
|
syminfo['price_tick_size'] = 1. / 10**si.pair_decimals
|
||||||
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
|
syminfo['lot_tick_size'] = 1. / 10**si.lot_decimals
|
||||||
syminfo['asset_type'] = 'crypto'
|
syminfo['asset_type'] = 'crypto'
|
||||||
sym_infos[sym] = syminfo
|
sym_infos[sym] = syminfo
|
||||||
ws_pairs[sym] = si.wsname
|
ws_pairs[sym] = si.wsname
|
||||||
|
|
|
@ -38,6 +38,7 @@ import tractor
|
||||||
|
|
||||||
from .. import data
|
from .. import data
|
||||||
from ..data.types import Struct
|
from ..data.types import Struct
|
||||||
|
from ..data._source import Symbol
|
||||||
from ..pp import (
|
from ..pp import (
|
||||||
Position,
|
Position,
|
||||||
Transaction,
|
Transaction,
|
||||||
|
@ -81,6 +82,7 @@ class PaperBoi(Struct):
|
||||||
_reqids: bidict
|
_reqids: bidict
|
||||||
_positions: dict[str, Position]
|
_positions: dict[str, Position]
|
||||||
_trade_ledger: dict[str, Any]
|
_trade_ledger: dict[str, Any]
|
||||||
|
_syms: dict[str, Symbol] = {}
|
||||||
|
|
||||||
# init edge case L1 spread
|
# init edge case L1 spread
|
||||||
last_ask: tuple[float, float] = (float('inf'), 0) # price, size
|
last_ask: tuple[float, float] = (float('inf'), 0) # price, size
|
||||||
|
@ -252,6 +254,7 @@ class PaperBoi(Struct):
|
||||||
key = fqsn.rstrip(f'.{self.broker}')
|
key = fqsn.rstrip(f'.{self.broker}')
|
||||||
t = Transaction(
|
t = Transaction(
|
||||||
fqsn=fqsn,
|
fqsn=fqsn,
|
||||||
|
sym=self._syms[fqsn],
|
||||||
tid=oid,
|
tid=oid,
|
||||||
size=size,
|
size=size,
|
||||||
price=price,
|
price=price,
|
||||||
|
@ -261,27 +264,29 @@ class PaperBoi(Struct):
|
||||||
)
|
)
|
||||||
|
|
||||||
with (
|
with (
|
||||||
open_trade_ledger(self.broker, 'paper') as ledger,
|
open_trade_ledger(self.broker, 'paper') as ledger,
|
||||||
open_pps(self.broker, 'paper', True) as table
|
open_pps(self.broker, 'paper', write_on_exit=True) as table
|
||||||
):
|
):
|
||||||
ledger.update({oid: t.to_dict()})
|
tx = t.to_dict()
|
||||||
# Write to pps toml right now
|
tx.pop('sym')
|
||||||
table.update_from_trans({oid: t})
|
ledger.update({oid: tx})
|
||||||
|
# Write to pps toml right now
|
||||||
|
table.update_from_trans({oid: t})
|
||||||
|
|
||||||
pp = table.pps[key]
|
pp = table.pps[key]
|
||||||
pp_msg = BrokerdPosition(
|
pp_msg = BrokerdPosition(
|
||||||
broker=self.broker,
|
broker=self.broker,
|
||||||
account='paper',
|
account='paper',
|
||||||
symbol=fqsn,
|
symbol=fqsn,
|
||||||
# TODO: we need to look up the asset currency from
|
# TODO: we need to look up the asset currency from
|
||||||
# broker info. i guess for crypto this can be
|
# broker info. i guess for crypto this can be
|
||||||
# inferred from the pair?
|
# inferred from the pair?
|
||||||
currency=key,
|
currency=key,
|
||||||
size=pp.size,
|
size=pp.size,
|
||||||
avg_price=pp.ppu,
|
avg_price=pp.ppu,
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.ems_trades_stream.send(pp_msg)
|
await self.ems_trades_stream.send(pp_msg)
|
||||||
|
|
||||||
|
|
||||||
async def simulate_fills(
|
async def simulate_fills(
|
||||||
|
@ -567,6 +572,10 @@ async def trades_dialogue(
|
||||||
|
|
||||||
# TODO: load postions from ledger file
|
# TODO: load postions from ledger file
|
||||||
_trade_ledger={},
|
_trade_ledger={},
|
||||||
|
_syms={
|
||||||
|
fqsn: flume.symbol
|
||||||
|
for fqsn, flume in feed.flumes.items()
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
numpy data source coversion helpers.
|
numpy data source coversion helpers.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from decimal import Decimal, ROUND_HALF_EVEN
|
||||||
from typing import Any
|
from typing import Any
|
||||||
import decimal
|
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -80,7 +80,7 @@ def float_digits(
|
||||||
if value == 0:
|
if value == 0:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
return int(-decimal.Decimal(str(value)).as_tuple().exponent)
|
return int(-Decimal(str(value)).as_tuple().exponent)
|
||||||
|
|
||||||
|
|
||||||
def ohlc_zeros(length: int) -> np.ndarray:
|
def ohlc_zeros(length: int) -> np.ndarray:
|
||||||
|
@ -156,14 +156,14 @@ class Symbol(Struct):
|
||||||
) -> Symbol:
|
) -> Symbol:
|
||||||
|
|
||||||
tick_size = info.get('price_tick_size', 0.01)
|
tick_size = info.get('price_tick_size', 0.01)
|
||||||
lot_tick_size = info.get('lot_tick_size', 0.0)
|
lot_size = info.get('lot_tick_size', 0.0)
|
||||||
|
|
||||||
return Symbol(
|
return Symbol(
|
||||||
key=symbol,
|
key=symbol,
|
||||||
tick_size=tick_size,
|
tick_size=tick_size,
|
||||||
lot_tick_size=lot_tick_size,
|
lot_tick_size=lot_size,
|
||||||
tick_size_digits=float_digits(tick_size),
|
tick_size_digits=float_digits(tick_size),
|
||||||
lot_size_digits=float_digits(lot_tick_size),
|
lot_size_digits=float_digits(lot_size),
|
||||||
suffix=suffix,
|
suffix=suffix,
|
||||||
broker_info={broker: info},
|
broker_info={broker: info},
|
||||||
)
|
)
|
||||||
|
@ -254,6 +254,12 @@ class Symbol(Struct):
|
||||||
|
|
||||||
return keys
|
return keys
|
||||||
|
|
||||||
|
def decimal_quant(self, d: Decimal):
|
||||||
|
digits = self.lot_size_digits
|
||||||
|
return d.quantize(
|
||||||
|
Decimal(f'1.{"0".ljust(digits, "0")}'),
|
||||||
|
rounding=ROUND_HALF_EVEN
|
||||||
|
)
|
||||||
|
|
||||||
def _nan_to_closest_num(array: np.ndarray):
|
def _nan_to_closest_num(array: np.ndarray):
|
||||||
"""Return interpolated values instead of NaN.
|
"""Return interpolated values instead of NaN.
|
||||||
|
|
41
piker/pp.py
41
piker/pp.py
|
@ -45,7 +45,7 @@ import toml
|
||||||
from . import config
|
from . import config
|
||||||
from .brokers import get_brokermod
|
from .brokers import get_brokermod
|
||||||
from .clearing._messages import BrokerdPosition, Status
|
from .clearing._messages import BrokerdPosition, Status
|
||||||
from .data._source import Symbol
|
from .data._source import Symbol, unpack_fqsn
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .data.types import Struct
|
from .data.types import Struct
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ def open_trade_ledger(
|
||||||
with open(tradesfile, 'rb') as cf:
|
with open(tradesfile, 'rb') as cf:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
ledger = tomli.load(cf)
|
ledger = tomli.load(cf)
|
||||||
print(f'Ledger load took {time.time() - start}s')
|
log.info(f'Ledger load took {time.time() - start}s')
|
||||||
cpy = ledger.copy()
|
cpy = ledger.copy()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -92,7 +92,7 @@ def open_trade_ledger(
|
||||||
if cpy != ledger:
|
if cpy != ledger:
|
||||||
# TODO: show diff output?
|
# TODO: show diff output?
|
||||||
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
|
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
|
||||||
print(f'Updating ledger for {tradesfile}:\n')
|
log.info(f'Updating ledger for {tradesfile}:\n')
|
||||||
ledger.update(cpy)
|
ledger.update(cpy)
|
||||||
# we write on close the mutated ledger data
|
# we write on close the mutated ledger data
|
||||||
with open(tradesfile, 'w') as cf:
|
with open(tradesfile, 'w') as cf:
|
||||||
|
@ -103,17 +103,18 @@ class Transaction(Struct, frozen=True):
|
||||||
# TODO: should this be ``.to`` (see below)?
|
# TODO: should this be ``.to`` (see below)?
|
||||||
fqsn: str
|
fqsn: str
|
||||||
|
|
||||||
|
sym: Symbol
|
||||||
tid: Union[str, int] # unique transaction id
|
tid: Union[str, int] # unique transaction id
|
||||||
size: float
|
size: float
|
||||||
price: float
|
price: float
|
||||||
cost: float # commisions or other additional costs
|
cost: float # commisions or other additional costs
|
||||||
dt: datetime
|
dt: datetime
|
||||||
expiry: Optional[datetime] = None
|
expiry: datetime | None = None
|
||||||
|
|
||||||
# optional key normally derived from the broker
|
# optional key normally derived from the broker
|
||||||
# backend which ensures the instrument-symbol this record
|
# backend which ensures the instrument-symbol this record
|
||||||
# is for is truly unique.
|
# is for is truly unique.
|
||||||
bsuid: Optional[Union[str, int]] = None
|
bsuid: Union[str, int] | None = None
|
||||||
|
|
||||||
# optional fqsn for the source "asset"/money symbol?
|
# optional fqsn for the source "asset"/money symbol?
|
||||||
# from: Optional[str] = None
|
# from: Optional[str] = None
|
||||||
|
@ -190,9 +191,20 @@ class Position(Struct):
|
||||||
# listing venue here even when the backend isn't providing
|
# listing venue here even when the backend isn't providing
|
||||||
# it via the trades ledger..
|
# it via the trades ledger..
|
||||||
# drop symbol obj in serialized form
|
# drop symbol obj in serialized form
|
||||||
s = d.pop('symbol')
|
s = d.get('symbol')
|
||||||
fqsn = s.front_fqsn()
|
fqsn = s.front_fqsn()
|
||||||
|
|
||||||
|
broker, key, suffix = unpack_fqsn(fqsn)
|
||||||
|
sym_info = s.broker_info[broker]
|
||||||
|
|
||||||
|
d['symbol'] = {
|
||||||
|
'info': {
|
||||||
|
'asset_type': sym_info['asset_type'],
|
||||||
|
'price_tick_size': sym_info['price_tick_size'],
|
||||||
|
'lot_tick_size': sym_info['lot_tick_size']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if self.expiry is None:
|
if self.expiry is None:
|
||||||
d.pop('expiry', None)
|
d.pop('expiry', None)
|
||||||
elif expiry:
|
elif expiry:
|
||||||
|
@ -467,8 +479,7 @@ class Position(Struct):
|
||||||
if self.split_ratio is not None:
|
if self.split_ratio is not None:
|
||||||
size = round(size * self.split_ratio)
|
size = round(size * self.split_ratio)
|
||||||
|
|
||||||
return float(Decimal(size).quantize(
|
return float(self.symbol.decimal_quant(Decimal(size)))
|
||||||
Decimal('1.0000'), rounding=ROUND_HALF_EVEN))
|
|
||||||
|
|
||||||
def minimize_clears(
|
def minimize_clears(
|
||||||
self,
|
self,
|
||||||
|
@ -512,7 +523,7 @@ class Position(Struct):
|
||||||
'cost': t.cost,
|
'cost': t.cost,
|
||||||
'price': t.price,
|
'price': t.price,
|
||||||
'size': t.size,
|
'size': t.size,
|
||||||
'dt': t.dt,
|
'dt': t.dt
|
||||||
}
|
}
|
||||||
|
|
||||||
# TODO: compute these incrementally instead
|
# TODO: compute these incrementally instead
|
||||||
|
@ -559,7 +570,7 @@ class PpTable(Struct):
|
||||||
Symbol.from_fqsn(
|
Symbol.from_fqsn(
|
||||||
t.fqsn,
|
t.fqsn,
|
||||||
info={},
|
info={},
|
||||||
),
|
) if not t.sym else t.sym,
|
||||||
size=0.0,
|
size=0.0,
|
||||||
ppu=0.0,
|
ppu=0.0,
|
||||||
bsuid=t.bsuid,
|
bsuid=t.bsuid,
|
||||||
|
@ -620,7 +631,7 @@ class PpTable(Struct):
|
||||||
# XXX: debug hook for size mismatches
|
# XXX: debug hook for size mismatches
|
||||||
# qqqbsuid = 320227571
|
# qqqbsuid = 320227571
|
||||||
# if bsuid == qqqbsuid:
|
# if bsuid == qqqbsuid:
|
||||||
# breakpoint()
|
# xbreakpoint()
|
||||||
|
|
||||||
pp.ensure_state()
|
pp.ensure_state()
|
||||||
|
|
||||||
|
@ -682,10 +693,10 @@ class PpTable(Struct):
|
||||||
'''
|
'''
|
||||||
# TODO: show diff output?
|
# TODO: show diff output?
|
||||||
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
|
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
|
||||||
print(f'Updating ``pps.toml`` for {path}:\n')
|
log.info(f'Updating ``pps.toml`` for {path}:\n')
|
||||||
|
|
||||||
# active, closed_pp_objs = table.dump_active()
|
# active, closed_pp_objs = table.dump_active()
|
||||||
pp_entries = self.to_toml()
|
pp_entries = self.to_toml()
|
||||||
|
log.info(pp_entries)
|
||||||
self.conf[self.brokername][self.acctid] = pp_entries
|
self.conf[self.brokername][self.acctid] = pp_entries
|
||||||
|
|
||||||
# TODO: why tf haven't they already done this for inline
|
# TODO: why tf haven't they already done this for inline
|
||||||
|
@ -883,7 +894,6 @@ def open_pps(
|
||||||
brokername: str,
|
brokername: str,
|
||||||
acctid: str,
|
acctid: str,
|
||||||
write_on_exit: bool = False,
|
write_on_exit: bool = False,
|
||||||
|
|
||||||
) -> Generator[PpTable, None, None]:
|
) -> Generator[PpTable, None, None]:
|
||||||
'''
|
'''
|
||||||
Read out broker-specific position entries from
|
Read out broker-specific position entries from
|
||||||
|
@ -937,8 +947,11 @@ def open_pps(
|
||||||
dtstr = clears_table['dt']
|
dtstr = clears_table['dt']
|
||||||
dt = pendulum.parse(dtstr)
|
dt = pendulum.parse(dtstr)
|
||||||
clears_table['dt'] = dt
|
clears_table['dt'] = dt
|
||||||
|
|
||||||
trans.append(Transaction(
|
trans.append(Transaction(
|
||||||
fqsn=bsuid,
|
fqsn=bsuid,
|
||||||
|
sym=Symbol.from_fqsn(
|
||||||
|
fqsn, entry['symbol']),
|
||||||
bsuid=bsuid,
|
bsuid=bsuid,
|
||||||
tid=tid,
|
tid=tid,
|
||||||
size=clears_table['size'],
|
size=clears_table['size'],
|
||||||
|
|
|
@ -84,25 +84,20 @@ async def _async_main(
|
||||||
case {'name': 'position'}:
|
case {'name': 'position'}:
|
||||||
break
|
break
|
||||||
|
|
||||||
if (
|
|
||||||
assert_entries
|
|
||||||
or assert_pps
|
|
||||||
or assert_zeroed_pps
|
|
||||||
or assert_msg
|
|
||||||
):
|
|
||||||
_assert(
|
|
||||||
assert_entries,
|
|
||||||
assert_pps,
|
|
||||||
assert_zeroed_pps,
|
|
||||||
pps,
|
|
||||||
last_msg,
|
|
||||||
size,
|
|
||||||
executions,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Teardown piker like a user would
|
# Teardown piker like a user would
|
||||||
raise KeyboardInterrupt
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
if assert_entries or assert_pps or assert_zeroed_pps or assert_msg:
|
||||||
|
_assert(
|
||||||
|
assert_entries,
|
||||||
|
assert_pps,
|
||||||
|
assert_zeroed_pps,
|
||||||
|
pps,
|
||||||
|
last_msg,
|
||||||
|
size,
|
||||||
|
executions,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _assert(
|
def _assert(
|
||||||
assert_entries,
|
assert_entries,
|
||||||
|
|
Loading…
Reference in New Issue