decimal_prices_thru_ems: yeah, just suck it up and do Order.price: Decimal for now.. #44

Open
goodboy wants to merge 13 commits from decimal_prices_thru_ems into mp_fomo_polish
14 changed files with 159 additions and 158 deletions

View File

@ -2,22 +2,29 @@
from decimal import ( from decimal import (
Decimal, Decimal,
) )
from pathlib import Path
import numpy as np import numpy as np
import polars as pl # import polars as pl
import trio import trio
import tractor import tractor
from datetime import datetime from datetime import datetime
from pprint import pformat # from pprint import pformat
from piker.brokers.deribit.api import ( from piker.brokers.deribit.api import (
get_client, get_client,
maybe_open_oi_feed, maybe_open_oi_feed,
) )
from piker.storage import open_storage_client, StorageClient from piker.storage import open_storage_client, StorageClient
from piker.log import get_logger
import sys import sys
import pyqtgraph as pg import pyqtgraph as pg
from PyQt6 import QtCore from PyQt6 import QtCore
from pyqtgraph import ScatterPlotItem, InfiniteLine from pyqtgraph import ScatterPlotItem, InfiniteLine
from PyQt6.QtWidgets import QApplication from PyQt6.QtWidgets import QApplication
from cryptofeed.symbols import Symbol
log = get_logger(__name__)
# XXX, use 2 newlines between top level LOC (even between these # XXX, use 2 newlines between top level LOC (even between these
# imports and the next function line ;) # imports and the next function line ;)
@ -192,10 +199,13 @@ async def max_pain_daemon(
), ),
], dtype=dtype) ], dtype=dtype)
path = await client.write_oi( path: Path = await client.write_oi(
col_sym_key, col_sym_key,
data, data,
) )
# TODO, use std logging like this throughout for status
# emissions on console!
log.info(f'Wrote OI history to {path}')
def get_max_pain( def get_max_pain(
oi_by_strikes: dict[str, dict[str, Decimal]] oi_by_strikes: dict[str, dict[str, Decimal]]
@ -258,7 +268,7 @@ async def max_pain_daemon(
# hardcoded to something, sorry.) # hardcoded to something, sorry.)
timestamp = msg[1]['timestamp'] timestamp = msg[1]['timestamp']
max_pain = get_max_pain(oi_by_strikes) max_pain = get_max_pain(oi_by_strikes)
intrinsic_values = get_total_intrinsic_values(oi_by_strikes) # intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
# graph here # graph here
plot_graph(oi_by_strikes, plot) plot_graph(oi_by_strikes, plot)
@ -298,14 +308,27 @@ async def max_pain_daemon(
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=True,
loglevel='info',
) as an:
from tractor import log
log.get_console_log(level='info')
p: tractor.Portal = await n.start_actor( ptl: tractor.Portal = await an.start_actor(
'max_pain_daemon', 'max_pain_daemon',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
# ^TODO, we can actually run this in the root-actor now
# if needed as per 2nd "section" in,
# https://pikers.dev/goodboy/tractor/pulls/2
#
# NOTE, will first require us porting to modern
# `tractor:main` though ofc!
) )
await p.run(max_pain_daemon) await ptl.run(max_pain_daemon)
if __name__ == '__main__': if __name__ == '__main__':
trio.run(main) trio.run(main)

View File

@ -42,7 +42,6 @@ from ._mktinfo import (
dec_digits, dec_digits,
digits_to_dec, digits_to_dec,
MktPair, MktPair,
Symbol,
unpack_fqme, unpack_fqme,
_derivs as DerivTypes, _derivs as DerivTypes,
) )
@ -60,7 +59,6 @@ __all__ = [
'Asset', 'Asset',
'MktPair', 'MktPair',
'Position', 'Position',
'Symbol',
'Transaction', 'Transaction',
'TransactionLedger', 'TransactionLedger',
'dec_digits', 'dec_digits',

View File

@ -390,8 +390,8 @@ class MktPair(Struct, frozen=True):
cls, cls,
fqme: str, fqme: str,
price_tick: float | str, price_tick: float|str,
size_tick: float | str, size_tick: float|str,
bs_mktid: str, bs_mktid: str,
broker: str | None = None, broker: str | None = None,
@ -677,90 +677,3 @@ def unpack_fqme(
# '.'.join([mkt_ep, venue]), # '.'.join([mkt_ep, venue]),
suffix, suffix,
) )
class Symbol(Struct):
'''
I guess this is some kinda container thing for dealing with
all the different meta-data formats from brokers?
'''
key: str
broker: str = ''
venue: str = ''
# precision descriptors for price and vlm
tick_size: Decimal = Decimal('0.01')
lot_tick_size: Decimal = Decimal('0.0')
suffix: str = ''
broker_info: dict[str, dict[str, Any]] = {}
@classmethod
def from_fqme(
cls,
fqsn: str,
info: dict[str, Any],
) -> Symbol:
broker, mktep, venue, suffix = unpack_fqme(fqsn)
tick_size = info.get('price_tick_size', 0.01)
lot_size = info.get('lot_tick_size', 0.0)
return Symbol(
broker=broker,
key=mktep,
tick_size=tick_size,
lot_tick_size=lot_size,
venue=venue,
suffix=suffix,
broker_info={broker: info},
)
@property
def type_key(self) -> str:
return list(self.broker_info.values())[0]['asset_type']
@property
def tick_size_digits(self) -> int:
return float_digits(self.tick_size)
@property
def lot_size_digits(self) -> int:
return float_digits(self.lot_tick_size)
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.lot_tick_size))
@property
def broker(self) -> str:
return list(self.broker_info.keys())[0]
@property
def fqme(self) -> str:
return maybe_cons_tokens([
self.key, # final "pair name" (eg. qqq[/usd], btcusdt)
self.venue,
self.suffix, # includes expiry and other con info
self.broker,
])
def quantize(
self,
size: float,
) -> Decimal:
digits = float_digits(self.lot_tick_size)
return Decimal(size).quantize(
Decimal(f'1.{"0".ljust(digits, "0")}'),
rounding=ROUND_HALF_EVEN
)
# NOTE: when cast to `str` return fqme
def __str__(self) -> str:
return self.fqme

View File

@ -374,9 +374,14 @@ class Client:
pair: Pair = pair_type(**item) pair: Pair = pair_type(**item)
except Exception as e: except Exception as e:
e.add_note( e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n" f'\n'
'Check out their API docs here:\n\n' f'New or removed field we need to codify!\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information' f'pair-type: {pair_type!r}\n'
f'\n'
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
f'Check out their API docs here:\n'
f'\n'
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
) )
raise raise
pair_table[pair.symbol.upper()] = pair pair_table[pair.symbol.upper()] = pair

View File

@ -144,6 +144,11 @@ class SpotPair(Pair, frozen=True):
permissions: list[str] permissions: list[str]
permissionSets: list[list[str]] permissionSets: list[list[str]]
# can the paint botz creat liq gaps even easier on this asset?
# Bp
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
amendAllowed: bool
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair' ns_path: str = 'piker.brokers.binance:SpotPair'

View File

@ -328,7 +328,6 @@ class Client:
''' '''
Return the set of currencies for deribit. Return the set of currencies for deribit.
''' '''
assets = {}
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
'public/get_currencies', 'public/get_currencies',
params={} params={}

View File

@ -175,9 +175,8 @@ async def handle_order_requests(
case { case {
'account': 'kraken.spot' as account, 'account': 'kraken.spot' as account,
'action': action, 'action': 'buy'|'sell',
} if action in {'buy', 'sell'}: }:
# validate # validate
order = BrokerdOrder(**msg) order = BrokerdOrder(**msg)
@ -262,6 +261,12 @@ async def handle_order_requests(
} | extra } | extra
log.info(f'Submitting WS order request:\n{pformat(req)}') log.info(f'Submitting WS order request:\n{pformat(req)}')
# NOTE HOWTO, debug order requests
#
# if 'XRP' in pair:
# await tractor.pause()
await ws.send_msg(req) await ws.send_msg(req)
# placehold for sanity checking in relay loop # placehold for sanity checking in relay loop
@ -1085,6 +1090,8 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n' f'Failed to {action} order {reqid}:\n'
f'{errmsg}' f'{errmsg}'
) )
# if tractor._state.debug_mode():
# await tractor.pause()
symbol: str = 'N/A' symbol: str = 'N/A'
if chain := apiflows.get(reqid): if chain := apiflows.get(reqid):

View File

@ -76,7 +76,6 @@ if TYPE_CHECKING:
# TODO: numba all of this # TODO: numba all of this
def mk_check( def mk_check(
trigger_price: float, trigger_price: float,
known_last: float, known_last: float,
action: str, action: str,
@ -162,7 +161,7 @@ async def clear_dark_triggers(
router: Router, router: Router,
brokerd_orders_stream: tractor.MsgStream, brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa quote_stream: tractor.MsgStream,
broker: str, broker: str,
fqme: str, fqme: str,
@ -178,6 +177,7 @@ async def clear_dark_triggers(
''' '''
# XXX: optimize this for speed! # XXX: optimize this for speed!
# TODO: # TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this! # - numba all this!
# - this stream may eventually contain multiple symbols # - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False quote_stream._raise_on_lag = False
@ -1182,12 +1182,16 @@ async def process_client_order_cmds(
submitting live orders immediately if requested by the client. submitting live orders immediately if requested by the client.
''' '''
# cmd: dict # TODO, only allow `msgspec.Struct` form!
cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(
f'Received order cmd:\n'
f'{pformat(cmd)}\n'
)
# CAWT DAMN we need struct support! # CAWT DAMN we need struct support!
oid = str(cmd['oid']) oid: str = str(cmd['oid'])
# register this stream as an active order dialog (msg flow) for # register this stream as an active order dialog (msg flow) for
# this order id such that translated message from the brokerd # this order id such that translated message from the brokerd
@ -1293,7 +1297,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': trigger_price, 'price': price,
'size': size, 'size': size,
'action': ('buy' | 'sell') as action, 'action': ('buy' | 'sell') as action,
'exec_mode': ('live' | 'paper'), 'exec_mode': ('live' | 'paper'),
@ -1325,7 +1329,7 @@ async def process_client_order_cmds(
symbol=sym, symbol=sym,
action=action, action=action,
price=trigger_price, price=price,
size=size, size=size,
account=req.account, account=req.account,
) )
@ -1347,7 +1351,11 @@ async def process_client_order_cmds(
# (``translate_and_relay_brokerd_events()`` above) will # (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(
f'Sending live order to {broker}:\n'
f'{pformat(msg)}'
)
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
@ -1363,7 +1371,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': trigger_price, 'price': price,
'size': size, 'size': size,
'exec_mode': exec_mode, 'exec_mode': exec_mode,
'action': action, 'action': action,
@ -1391,7 +1399,12 @@ async def process_client_order_cmds(
if isnan(last): if isnan(last):
last = flume.rt_shm.array[-1]['close'] last = flume.rt_shm.array[-1]['close']
pred = mk_check(trigger_price, last, action) trigger_price: float = float(price)
pred = mk_check(
trigger_price,
last,
action,
)
# NOTE: for dark orders currently we submit # NOTE: for dark orders currently we submit
# the triggered live order at a price 5 ticks # the triggered live order at a price 5 ticks
@ -1531,7 +1544,7 @@ async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
fqme: str, fqme: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str | None = None, loglevel: str|None = None,
) -> tuple[ ) -> tuple[
dict[ dict[

View File

@ -19,6 +19,7 @@ Clearing sub-system message and protocols.
""" """
from __future__ import annotations from __future__ import annotations
from decimal import Decimal
from typing import ( from typing import (
Literal, Literal,
) )
@ -71,7 +72,15 @@ class Order(Struct):
symbol: str # | MktPair symbol: str # | MktPair
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float # https://docs.python.org/3/library/decimal.html#decimal-objects
#
# ?TODO? decimal usage throughout?
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
# bit?
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
# -[ ] should we also use it for .size?
#
price: Decimal
size: float # -ve is "sell", +ve is "buy" size: float # -ve is "sell", +ve is "buy"
brokers: list[str] = [] brokers: list[str] = []
@ -178,7 +187,7 @@ class BrokerdOrder(Struct):
time_ns: int time_ns: int
symbol: str # fqme symbol: str # fqme
price: float price: Decimal
size: float size: float
# TODO: if we instead rely on a +ve/-ve size to determine # TODO: if we instead rely on a +ve/-ve size to determine

View File

@ -508,7 +508,7 @@ async def handle_order_requests(
reqid = await client.submit_limit( reqid = await client.submit_limit(
oid=order.oid, oid=order.oid,
symbol=f'{order.symbol}.{client.broker}', symbol=f'{order.symbol}.{client.broker}',
price=order.price, price=float(order.price),
action=order.action, action=order.action,
size=order.size, size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that # XXX: by default 0 tells ``ib_insync`` methods that

View File

@ -876,6 +876,7 @@ async def uniform_rate_send(
except tractor.RemoteActorError as rme: except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun: if rme.type is not tractor._exceptions.StreamOverrun:
raise raise
ctx = stream._ctx ctx = stream._ctx
chan = ctx.chan chan = ctx.chan
log.warning( log.warning(
@ -892,6 +893,7 @@ async def uniform_rate_send(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,
trio.EndOfChannel,
): ):
# if the feed consumer goes down then drop # if the feed consumer goes down then drop
# out of this rate limiter # out of this rate limiter

View File

@ -21,6 +21,7 @@ Chart trading, the only way to scalp.
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from decimal import Decimal
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import time import time
@ -41,7 +42,6 @@ from piker.accounting import (
Position, Position,
mk_allocator, mk_allocator,
MktPair, MktPair,
Symbol,
) )
from piker.clearing import ( from piker.clearing import (
open_ems, open_ems,
@ -143,6 +143,15 @@ class OrderMode:
} }
_staged_order: Order | None = None _staged_order: Order | None = None
@property
def curr_mkt(self) -> MktPair:
'''
Deliver the currently selected `MktPair` according
chart state.
'''
return self.chart.linked.mkt
def on_level_change_update_next_order_info( def on_level_change_update_next_order_info(
self, self,
level: float, level: float,
@ -172,7 +181,11 @@ class OrderMode:
line.update_labels(order_info) line.update_labels(order_info)
# update bound-in staged order # update bound-in staged order
order.price = level mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=level,
quantity_type='price',
)
order.size = order_info['size'] order.size = order_info['size']
# when an order is changed we flip the settings side-pane to # when an order is changed we flip the settings side-pane to
@ -187,7 +200,9 @@ class OrderMode:
) -> LevelLine: ) -> LevelLine:
level = order.price # TODO, if we instead just always decimalize at the ems layer
# we can avoid this back-n-forth casting?
level = float(order.price)
line = order_line( line = order_line(
chart or self.chart, chart or self.chart,
@ -224,7 +239,11 @@ class OrderMode:
# the order mode allocator but we still need to update the # the order mode allocator but we still need to update the
# "staged" order message we'll send to the ems # "staged" order message we'll send to the ems
def update_order_price(y: float) -> None: def update_order_price(y: float) -> None:
order.price = y mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=y,
quantity_type='price',
)
line._on_level_change = update_order_price line._on_level_change = update_order_price
@ -275,34 +294,31 @@ class OrderMode:
chart = cursor.linked.chart chart = cursor.linked.chart
if ( if (
not chart not chart
and cursor and
and cursor.active_plot cursor
and
cursor.active_plot
): ):
return return
chart = cursor.active_plot chart = cursor.active_plot
price = cursor._datum_xy[1] price: float = cursor._datum_xy[1]
if not price: if not price:
# zero prices are not supported by any means # zero prices are not supported by any means
# since that's illogical / a no-op. # since that's illogical / a no-op.
return return
mkt: MktPair = self.chart.linked.mkt
# NOTE : we could also use instead,
# mkt.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision # TODO: should we be enforcing this precision
# at a different layer in the stack? right now # at a different layer in the stack?
# any precision error will literally be relayed # |_ might require `MktPair` tracking in the EMS?
# all the way back from the backend. # |_ right now any precision error will be relayed
# all the way back from the backend and vice-versa..
price = round( #
price, mkt: MktPair = self.curr_mkt
ndigits=mkt.price_tick_digits, price: Decimal = mkt.quantize(
size=price,
quantity_type='price',
) )
order = self._staged_order = Order( order = self._staged_order = Order(
action=action, action=action,
price=price, price=price,
@ -378,7 +394,7 @@ class OrderMode:
'oid': oid, 'oid': oid,
}) })
if order.price <= 0: if float(order.price) <= 0:
log.error( log.error(
'*!? Invalid `Order.price <= 0` ?!*\n' '*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form # TODO: make this present multi-line in object form
@ -515,14 +531,15 @@ class OrderMode:
# if an order msg is provided update the line # if an order msg is provided update the line
# **from** that msg. # **from** that msg.
if order: if order:
if order.price <= 0: price: float = float(order.price)
if price <= 0:
log.error(f'Order has 0 price, cancelling..\n{order}') log.error(f'Order has 0 price, cancelling..\n{order}')
self.cancel_orders([order.oid]) self.cancel_orders([order.oid])
return None return None
line.set_level(order.price) line.set_level(price)
self.on_level_change_update_next_order_info( self.on_level_change_update_next_order_info(
level=order.price, level=price,
line=line, line=line,
order=order, order=order,
# use the corresponding position tracker for the # use the corresponding position tracker for the
@ -681,9 +698,9 @@ class OrderMode:
) -> Dialog | None: ) -> Dialog | None:
# NOTE: the `.order` attr **must** be set with the # NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded. # equivalent order msg in order to be loaded.
order = msg.req order: Order = msg.req
oid = str(msg.oid) oid = str(msg.oid)
symbol = order.symbol symbol: str = order.symbol
# TODO: MEGA UGGG ZONEEEE! # TODO: MEGA UGGG ZONEEEE!
src = msg.src src = msg.src
@ -702,13 +719,22 @@ class OrderMode:
order.oid = str(order.oid) order.oid = str(order.oid)
order.brokers = [brokername] order.brokers = [brokername]
# TODO: change this over to `MktPair`, but it's # ?TODO? change this over to `MktPair`, but it's gonna be
# gonna be tough since we don't have any such data # tough since we don't have any such data really in our
# really in our clearing msg schema.. # clearing msg schema..
order.symbol = Symbol.from_fqme( # BUT WAIT! WHY do we even want/need this!?
fqsn=fqme, #
info={}, # order.symbol = self.curr_mkt
) #
# XXX, the old approach.. which i don't quire member why..
# -[ ] verify we for sure don't require this any more!
# |_https://github.com/pikers/piker/issues/517
#
# order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# info={},
# )
maybe_dialog: Dialog | None = self.submit_order( maybe_dialog: Dialog | None = self.submit_order(
send_msg=False, send_msg=False,
order=order, order=order,
@ -1101,7 +1127,7 @@ async def process_trade_msg(
) )
) )
): ):
msg.req = order msg.req: Order = order
dialog: ( dialog: (
Dialog Dialog
# NOTE: on an invalid order submission (eg. # NOTE: on an invalid order submission (eg.
@ -1166,7 +1192,7 @@ async def process_trade_msg(
tm = time.time() tm = time.time()
mode.on_fill( mode.on_fill(
oid, oid,
price=req.price, price=float(req.price),
time_s=tm, time_s=tm,
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
@ -1221,7 +1247,7 @@ async def process_trade_msg(
tm = details['broker_time'] tm = details['broker_time']
mode.on_fill( mode.on_fill(
oid, oid,
price=details['price'], price=float(details['price']),
time_s=tm, time_s=tm,
pointing='up' if action == 'buy' else 'down', pointing='up' if action == 'buy' else 'down',
) )

View File

@ -62,8 +62,9 @@ ignore-init-module-imports = false
fixable = ["ALL"] fixable = ["ALL"]
unfixable = [] unfixable = []
# TODO? uhh why no work!?
# Allow unused variables when underscore-prefixed. # Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" # dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format] [format]
# Use single quotes in `ruff format`. # Use single quotes in `ruff format`.

View File

@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules # NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`. # import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re: except tractor.RemoteActorError as re:
assert re.type == ModuleNotFoundError assert re.type is ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme) run_and_tollerate_cancels(load_bad_fqme)