Compare commits

..

No commits in common. "dpbackup" and "310_plus" have entirely different histories.

40 changed files with 1942 additions and 4092 deletions

View File

@ -22,10 +22,10 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from collections import defaultdict from collections import defaultdict
from msgspec import Struct from pydantic import BaseModel
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log from .log import get_logger, get_console_log
from .brokers import get_brokermod from .brokers import get_brokermod
@ -47,13 +47,16 @@ _root_modules = [
] ]
class Services(Struct): class Services(BaseModel):
actor_n: tractor._supervise.ActorNursery actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config:
arbitrary_types_allowed = True
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,

View File

@ -34,13 +34,13 @@ from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto import wsproto
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__) log = get_logger(__name__)
@ -79,14 +79,12 @@ _show_wap_in_history = False
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information # https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(Struct, frozen=True): class Pair(BaseModel):
symbol: str symbol: str
status: str status: str
baseAsset: str baseAsset: str
baseAssetPrecision: int baseAssetPrecision: int
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAsset: str quoteAsset: str
quotePrecision: int quotePrecision: int
quoteAssetPrecision: int quoteAssetPrecision: int
@ -289,7 +287,7 @@ async def get_client() -> Client:
# validation type # validation type
class AggTrade(Struct): class AggTrade(BaseModel):
e: str # Event type e: str # Event type
E: int # Event time E: int # Event time
s: str # Symbol s: str # Symbol
@ -343,9 +341,7 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
elif msg.get('e') == 'aggTrade': elif msg.get('e') == 'aggTrade':
# NOTE: this is purely for a definition, ``msgspec.Struct`` # validate
# does not runtime-validate until you decode/encode.
# see: https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg) msg = AggTrade(**msg)
# TODO: type out and require this quote format # TODO: type out and require this quote format
@ -356,8 +352,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
'brokerd_ts': time.time(), 'brokerd_ts': time.time(),
'ticks': [{ 'ticks': [{
'type': 'trade', 'type': 'trade',
'price': float(msg.p), 'price': msg.p,
'size': float(msg.q), 'size': msg.q,
'broker_ts': msg.T, 'broker_ts': msg.T,
}], }],
} }
@ -452,7 +448,7 @@ async def stream_quotes(
d = cache[sym.upper()] d = cache[sym.upper()]
syminfo = Pair(**d) # validation syminfo = Pair(**d) # validation
si = sym_infos[sym] = syminfo.to_dict() si = sym_infos[sym] = syminfo.dict()
# XXX: after manually inspecting the response format we # XXX: after manually inspecting the response format we
# just directly pick out the info we need # just directly pick out the info we need

View File

@ -20,10 +20,15 @@ Interactive Brokers API backend.
Sub-modules within break into the core functionalities: Sub-modules within break into the core functionalities:
- ``broker.py`` part for orders / trading endpoints - ``broker.py`` part for orders / trading endpoints
- ``feed.py`` for real-time data feed endpoints - ``data.py`` for real-time data feed endpoints
- ``api.py`` for the core API machinery which is ``trio``-ized
- ``client.py`` for the core API machinery which is ``trio``-ized
wrapping around ``ib_insync``. wrapping around ``ib_insync``.
- ``report.py`` for the hackery to build manual pp calcs
to avoid ib's absolute bullshit FIFO style position
tracking..
""" """
from .api import ( from .api import (
get_client, get_client,
@ -33,10 +38,7 @@ from .feed import (
open_symbol_search, open_symbol_search,
stream_quotes, stream_quotes,
) )
from .broker import ( from .broker import trades_dialogue
trades_dialogue,
norm_trade_records,
)
__all__ = [ __all__ = [
'get_client', 'get_client',

View File

@ -38,21 +38,15 @@ import time
from types import SimpleNamespace from types import SimpleNamespace
from bidict import bidict
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
import ib_insync as ibis
from ib_insync.wrapper import RequestError from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails from ib_insync.contract import Contract, ContractDetails
from ib_insync.order import Order from ib_insync.order import Order
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
from ib_insync.objects import ( from ib_insync.objects import Position
Position, import ib_insync as ibis
Fill,
Execution,
CommissionReport,
)
from ib_insync.wrapper import Wrapper from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
import numpy as np import numpy as np
@ -161,23 +155,30 @@ class NonShittyIB(ibis.IB):
self.client.apiEnd += self.disconnectedEvent self.client.apiEnd += self.disconnectedEvent
# map of symbols to contract ids
_adhoc_cmdty_data_map = {
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
# NOTE: some cmdtys/metals don't have trade data like gold/usd:
# https://groups.io/g/twsapi/message/44174
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
_futes_venues = ( _futes_venues = (
'GLOBEX', 'GLOBEX',
'NYMEX', 'NYMEX',
'CME', 'CME',
'CMECRYPTO', 'CMECRYPTO',
'COMEX',
'CMDTY', # special name case..
) )
_adhoc_futes_set = { _adhoc_futes_set = {
# equities # equities
'nq.globex', 'nq.globex',
'mnq.globex', # micro 'mnq.globex',
'es.globex', 'es.globex',
'mes.globex', # micro 'mes.globex',
# cypto$ # cypto$
'brr.cmecrypto', 'brr.cmecrypto',
@ -194,46 +195,20 @@ _adhoc_futes_set = {
# metals # metals
'xauusd.cmdty', # gold spot 'xauusd.cmdty', # gold spot
'gc.nymex', 'gc.nymex',
'mgc.nymex', # micro 'mgc.nymex',
# oil & gas
'cl.nymex',
'xagusd.cmdty', # silver spot 'xagusd.cmdty', # silver spot
'ni.nymex', # silver futes 'ni.nymex', # silver futes
'qi.comex', # mini-silver futes 'qi.comex', # mini-silver futes
} }
# map of symbols to contract ids
_adhoc_symbol_map = {
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
# NOTE: some cmdtys/metals don't have trade data like gold/usd:
# https://groups.io/g/twsapi/message/44174
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
for qsn in _adhoc_futes_set:
sym, venue = qsn.split('.')
assert venue.upper() in _futes_venues, f'{venue}'
_adhoc_symbol_map[sym.upper()] = (
{'exchange': venue},
{},
)
# exchanges we don't support at the moment due to not knowing # exchanges we don't support at the moment due to not knowing
# how to do symbol-contract lookup correctly likely due # how to do symbol-contract lookup correctly likely due
# to not having the data feeds subscribed. # to not having the data feeds subscribed.
_exch_skip_list = { _exch_skip_list = {
'ASX', # aussie stocks 'ASX', # aussie stocks
'MEXI', # mexican stocks 'MEXI', # mexican stocks
'VALUE', # no idea
# no idea
'VALUE',
'FUNDSERV',
'SWB2',
} }
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
@ -286,29 +261,27 @@ class Client:
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
async def trades(self) -> dict[str, Any]: async def trades(
''' self,
Return list of trade-fills from current session in ``dict``. # api_only: bool = False,
''' ) -> dict[str, Any]:
fills: list[Fill] = self.ib.fills()
norm_fills: list[dict] = [] # orders = await self.ib.reqCompletedOrdersAsync(
# apiOnly=api_only
# )
fills = await self.ib.reqExecutionsAsync()
norm_fills = []
for fill in fills: for fill in fills:
fill = fill._asdict() # namedtuple fill = fill._asdict() # namedtuple
for key, val in fill.items(): for key, val in fill.copy().items():
match val: if isinstance(val, Contract):
case Contract() | Execution() | CommissionReport(): fill[key] = asdict(val)
fill[key] = asdict(val)
norm_fills.append(fill) norm_fills.append(fill)
return norm_fills return norm_fills
async def orders(self) -> list[Order]:
return await self.ib.reqAllOpenOrdersAsync(
apiOnly=False,
)
async def bars( async def bars(
self, self,
fqsn: str, fqsn: str,
@ -510,14 +483,6 @@ class Client:
return con return con
async def get_con(
self,
conid: int,
) -> Contract:
return await self.ib.qualifyContractsAsync(
ibis.Contract(conId=conid)
)
async def find_contract( async def find_contract(
self, self,
pattern: str, pattern: str,
@ -588,7 +553,7 @@ class Client:
# commodities # commodities
elif exch == 'CMDTY': # eg. XAUUSD.CMDTY elif exch == 'CMDTY': # eg. XAUUSD.CMDTY
con_kwargs, bars_kwargs = _adhoc_symbol_map[sym] con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym]
con = ibis.Commodity(**con_kwargs) con = ibis.Commodity(**con_kwargs)
con.bars_kwargs = bars_kwargs con.bars_kwargs = bars_kwargs
@ -846,23 +811,10 @@ _scan_ignore: set[tuple[str, int]] = set()
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
conf, path = config.load('brokers') conf, path = config.load()
section = conf.get('ib') section = conf.get('ib')
accounts = section.get('accounts')
if not accounts:
raise ValueError(
'brokers.toml -> `ib.accounts` must be defined\n'
f'location: {path}'
)
names = list(accounts.keys())
accts = section['accounts'] = bidict(accounts)
log.info(
f'brokers.toml defines {len(accts)} accounts: '
f'{pformat(names)}'
)
if section is None: if section is None:
log.warning(f'No config section found for ib in {path}') log.warning(f'No config section found for ib in {path}')
return {} return {}
@ -1038,7 +990,7 @@ async def load_aio_clients(
for acct, client in _accounts2clients.items(): for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}') log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect() client.ib.disconnect()
_client_cache.pop((host, port), None) _client_cache.pop((host, port))
async def load_clients_for_trio( async def load_clients_for_trio(
@ -1067,6 +1019,9 @@ async def load_clients_for_trio(
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
_proxies: dict[str, MethodProxy] = {}
@acm @acm
async def open_client_proxies() -> tuple[ async def open_client_proxies() -> tuple[
dict[str, MethodProxy], dict[str, MethodProxy],
@ -1089,14 +1044,13 @@ async def open_client_proxies() -> tuple[
if cache_hit: if cache_hit:
log.info(f'Re-using cached clients: {clients}') log.info(f'Re-using cached clients: {clients}')
proxies = {}
for acct_name, client in clients.items(): for acct_name, client in clients.items():
proxy = await stack.enter_async_context( proxy = await stack.enter_async_context(
open_client_proxy(client), open_client_proxy(client),
) )
proxies[acct_name] = proxy _proxies[acct_name] = proxy
yield proxies, clients yield _proxies, clients
def get_preferred_data_client( def get_preferred_data_client(
@ -1245,13 +1199,11 @@ async def open_client_proxy(
event_table = {} event_table = {}
async with ( async with (
to_asyncio.open_channel_from( to_asyncio.open_channel_from(
open_aio_client_method_relay, open_aio_client_method_relay,
client=client, client=client,
event_consumers=event_table, event_consumers=event_table,
) as (first, chan), ) as (first, chan),
trio.open_nursery() as relay_n, trio.open_nursery() as relay_n,
): ):

File diff suppressed because it is too large Load Diff

View File

@ -217,8 +217,8 @@ async def get_bars(
) )
elif ( elif (
err.code == 162 and err.code == 162
'HMDS query returned no data' in err.message and 'HMDS query returned no data' in err.message
): ):
# XXX: this is now done in the storage mgmt layer # XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt # and we shouldn't implicitly decrement the frame dt
@ -237,13 +237,6 @@ async def get_bars(
frame_size=2000, frame_size=2000,
) )
# elif (
# err.code == 162 and
# 'Trading TWS session is connected from a different IP address' in err.message
# ):
# log.warning("ignoring ip address warning")
# continue
elif _pacing in msg: elif _pacing in msg:
log.warning( log.warning(
@ -916,17 +909,17 @@ async def open_symbol_search(
# trigger async request # trigger async request
await trio.sleep(0) await trio.sleep(0)
# # match against our ad-hoc set immediately # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extractBests( adhoc_matches = fuzzy.extractBests(
# pattern, pattern,
# list(_adhoc_futes_set), list(_adhoc_futes_set),
# score_cutoff=90, score_cutoff=90,
# ) )
# log.info(f'fuzzy matched adhocs: {adhoc_matches}') log.info(f'fuzzy matched adhocs: {adhoc_matches}')
# adhoc_match_results = {} adhoc_match_results = {}
# if adhoc_matches: if adhoc_matches:
# # TODO: do we need to pull contract details? # TODO: do we need to pull contract details?
# adhoc_match_results = {i[0]: {} for i in adhoc_matches} adhoc_match_results = {i[0]: {} for i in adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}') log.debug(f'fuzzy matching stocks {stock_results}')
stock_matches = fuzzy.extractBests( stock_matches = fuzzy.extractBests(
@ -935,8 +928,7 @@ async def open_symbol_search(
score_cutoff=50, score_cutoff=50,
) )
# matches = adhoc_match_results | { matches = adhoc_match_results | {
matches = {
item[0]: {} for item in stock_matches item[0]: {} for item in stock_matches
} }
# TODO: we used to deliver contract details # TODO: we used to deliver contract details

File diff suppressed because it is too large Load Diff

View File

@ -1,64 +0,0 @@
``kraken`` backend
------------------
though they don't have the most liquidity of all the cexes they sure are
accommodating to those of us who appreciate a little ``xmr``.
status
******
current support is *production grade* and both real-time data and order
management should be correct and fast. this backend is used by core devs
for live trading.
config
******
In order to get order mode support your ``brokers.toml``
needs to have something like the following:
.. code:: toml
[kraken]
accounts.spot = 'spot'
key_descr = "spot"
api_key = "69696969696969696696969696969696969696969696969696969696"
secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696"
If everything works correctly you should see any current positions
loaded in the pps pane on chart load and you should also be able to
check your trade records in the file::
<pikerk_conf_dir>/ledgers/trades_kraken_spot.toml
An example ledger file will have entries written verbatim from the
trade events schema:
.. code:: toml
[TFJBKK-SMBZS-VJ4UWS]
ordertxid = "SMBZSA-7CNQU-3HWLNJ"
postxid = "SMBZSE-M7IF5-CFI7LT"
pair = "XXMRZEUR"
time = 1655691993.4133966
type = "buy"
ordertype = "limit"
price = "103.97000000"
cost = "499.99999977"
fee = "0.80000000"
vol = "4.80907954"
margin = "0.00000000"
misc = ""
your ``pps.toml`` file will have position entries like,
.. code:: toml
[kraken.spot."xmreur.kraken"]
size = 4.80907954
be_price = 103.97000000
bsuid = "XXMRZEUR"
clears = [
{ tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" },
]

View File

@ -1,61 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Kraken backend.
Sub-modules within break into the core functionalities:
- ``broker.py`` part for orders / trading endpoints
- ``feed.py`` for real-time data feed endpoints
- ``api.py`` for the core API machinery which is ``trio``-ized
wrapping around ``ib_insync``.
'''
from piker.log import get_logger
log = get_logger(__name__)
from .api import (
get_client,
)
from .feed import (
open_history_client,
open_symbol_search,
stream_quotes,
)
from .broker import (
trades_dialogue,
norm_trade_records,
)
__all__ = [
'get_client',
'trades_dialogue',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'norm_trade_records',
]
# tractor RPC enable arg
__enable_modules__: list[str] = [
'api',
'feed',
'broker',
]

View File

@ -1,469 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Kraken web API wrapping.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import field
from datetime import datetime
import itertools
from typing import (
Any,
Optional,
Union,
)
import time
# import trio
# import tractor
import pendulum
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
from pydantic.dataclasses import dataclass
import urllib.parse
import hashlib
import hmac
import base64
from piker import config
from piker.brokers._util import (
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
)
from . import log
# <uri>/<version>/
_url = 'https://api.kraken.com/0'
# Broker specific ohlc schema which includes a vwap field
_ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', float),
('count', int),
('bar_wap', float),
]
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True
_symbol_info_translation: dict[str, str] = {
'tick_decimals': 'pair_decimals',
}
@dataclass
class OHLC:
'''
Description of the flattened OHLC quote format.
For schema details see:
https://docs.kraken.com/websockets/#message-ohlc
'''
chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval
# (sampled) generated tick data
ticks: list[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
return {}
return section
def get_kraken_signature(
urlpath: str,
data: dict[str, Any],
secret: str
) -> str:
postdata = urllib.parse.urlencode(data)
encoded = (str(data['nonce']) + postdata).encode()
message = urlpath.encode() + hashlib.sha256(encoded).digest()
mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512)
sigdigest = base64.b64encode(mac.digest())
return sigdigest.decode()
class InvalidKey(ValueError):
'''
EAPI:Invalid key
This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one
and update your application.
'''
class Client:
def __init__(
self,
name: str = '',
api_key: str = '',
secret: str = ''
) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._sesh.headers.update({
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self._pairs: list[str] = []
self._name = name
self._api_key = api_key
self._secret = secret
@property
def pairs(self) -> dict[str, Any]:
if self._pairs is None:
raise RuntimeError(
"Make sure to run `cache_symbols()` on startup!"
)
# retreive and cache all symbols
return self._pairs
async def _public(
self,
method: str,
data: dict,
) -> dict[str, Any]:
resp = await self._sesh.post(
path=f'/public/{method}',
json=data,
timeout=float('inf')
)
return resproc(resp, log)
async def _private(
self,
method: str,
data: dict,
uri_path: str
) -> dict[str, Any]:
headers = {
'Content-Type':
'application/x-www-form-urlencoded',
'API-Key':
self._api_key,
'API-Sign':
get_kraken_signature(uri_path, data, self._secret)
}
resp = await self._sesh.post(
path=f'/private/{method}',
data=data,
headers=headers,
timeout=float('inf')
)
return resproc(resp, log)
async def endpoint(
self,
method: str,
data: dict[str, Any]
) -> dict[str, Any]:
uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time()))
return await self._private(method, data, uri_path)
async def get_trades(
self,
) -> dict[str, Any]:
'''
Get the trades (aka cleared orders) history from the rest endpoint:
https://docs.kraken.com/rest/#operation/getTradeHistory
'''
ofs = 0
trades_by_id: dict[str, Any] = {}
for i in itertools.count():
# increment 'ofs' pagination offset
ofs = i*50
resp = await self.endpoint(
'TradesHistory',
{'ofs': ofs},
)
by_id = resp['result']['trades']
trades_by_id.update(by_id)
# we can get up to 50 results per query
if (
len(by_id) < 50
):
err = resp.get('error')
if err:
raise BrokerError(err)
# we know we received the max amount of
# trade results so there may be more history.
# catch the end of the trades
count = resp['result']['count']
break
# santity check on update
assert count == len(trades_by_id.values())
return trades_by_id
async def submit_limit(
self,
symbol: str,
price: float,
action: str,
size: float,
reqid: str = None,
validate: bool = False # set True test call without a real submission
) -> dict:
'''
Place an order and return integer request id provided by client.
'''
# Build common data dict for common keys from both endpoints
data = {
"pair": symbol,
"price": str(price),
"validate": validate
}
if reqid is None:
# Build order data for kraken api
data |= {
"ordertype": "limit",
"type": action,
"volume": str(size),
}
return await self.endpoint('AddOrder', data)
else:
# Edit order data for kraken api
data["txid"] = reqid
return await self.endpoint('EditOrder', data)
async def submit_cancel(
self,
reqid: str,
) -> dict:
'''
Send cancel request for order id ``reqid``.
'''
# txid is a transaction id given by kraken
return await self.endpoint('CancelOrder', {"txid": reqid})
async def symbol_info(
self,
pair: Optional[str] = None,
):
if pair is not None:
pairs = {'pair': pair}
else:
pairs = None # get all pairs
resp = await self._public('AssetPairs', pairs)
err = resp['error']
if err:
symbolname = pairs['pair'] if pair else None
raise SymbolNotFound(f'{symbolname}.kraken')
pairs = resp['result']
if pair is not None:
_, data = next(iter(pairs.items()))
return data
else:
return pairs
async def cache_symbols(
self,
) -> dict:
if not self._pairs:
self._pairs = await self.symbol_info()
return self._pairs
async def search_symbols(
self,
pattern: str,
limit: int = None,
) -> dict[str, Any]:
if self._pairs is not None:
data = self._pairs
else:
data = await self.symbol_info()
matches = fuzzy.extractBests(
pattern,
data,
score_cutoff=50,
)
# repack in dict form
return {item[0]['altname']: item[0] for item in matches}
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: Optional[Union[int, datetime]] = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
if since is None:
since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, int(since)))
json = await self._public(
'OHLC',
data={
'pair': symbol,
'since': since,
},
)
try:
res = json['result']
res.pop('last')
bars = next(iter(res.values()))
new_bars = []
first = bars[0]
last_nz_vwap = first[-3]
if last_nz_vwap == 0:
# use close if vwap is zero
last_nz_vwap = first[-4]
# convert all fields to native types
for i, bar in enumerate(bars):
# normalize weird zero-ed vwap values..cmon kraken..
# indicates vwap didn't change since last bar
vwap = float(bar.pop(-3))
if vwap != 0:
last_nz_vwap = vwap
if vwap == 0:
vwap = last_nz_vwap
# re-insert vwap as the last of the fields
bar.append(vwap)
new_bars.append(
(i,) + tuple(
ftype(bar[j]) for j, (name, ftype) in enumerate(
_ohlc_dtype[1:]
)
)
)
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
except KeyError:
errmsg = json['error'][0]
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
@acm
async def get_client() -> Client:
section = get_config()
if section:
client = Client(
name=section['key_descr'],
api_key=section['api_key'],
secret=section['secret']
)
else:
client = Client()
# at startup, load all symbols locally for fast search
await client.cache_symbols()
yield client
def normalize_symbol(
ticker: str
) -> str:
'''
Normalize symbol names to to a 3x3 pair.
'''
remap = {
'XXBTZEUR': 'XBTEUR',
'XXMRZEUR': 'XMREUR',
# ws versions? pretty weird..
'XBT/EUR': 'XBTEUR',
'XMR/EUR': 'XMREUR',
}
symlen = len(ticker)
if symlen != 6:
ticker = remap[ticker]
else:
raise ValueError(f'Unhandled symbol: {ticker}')
return ticker.lower()

View File

@ -1,810 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Order api and machinery
'''
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from functools import partial
from itertools import chain, count
from pprint import pformat
import time
from typing import (
Any,
AsyncIterator,
Union,
)
from async_generator import aclosing
from bidict import bidict
import pendulum
import trio
import tractor
import wsproto
from piker import pp
from piker.clearing._messages import (
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
)
from . import log
from .api import (
Client,
BrokerError,
get_client,
normalize_symbol,
)
from .feed import (
get_console_log,
open_autorecon_ws,
NoBsWs,
stream_messages,
)
MsgUnion = Union[
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
]
async def handle_order_requests(
ws: NoBsWs,
client: Client,
ems_order_stream: tractor.MsgStream,
token: str,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None:
'''
Process new order submission requests from the EMS
and deliver acks or errors.
'''
# XXX: UGH, let's unify this.. with ``msgspec``.
msg: dict[str, Any]
order: BrokerdOrder
counter = count()
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
case {
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case {
'account': 'kraken.spot' as account,
'action': action,
} if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**msg)
# logic from old `Client.submit_limit()`
if order.oid in ids:
ep = 'editOrder'
reqid = ids[order.oid] # integer not txid
txid = reqids2txids[reqid]
extra = {
'orderid': txid, # txid
}
else:
ep = 'addOrder'
reqid = next(counter)
ids[order.oid] = reqid
log.debug(
f"Adding order {reqid}\n"
f'{ids}'
)
extra = {
'ordertype': 'limit',
'type': order.action,
}
psym = order.symbol.upper()
pair = f'{psym[:3]}/{psym[3:]}'
# call ws api to submit the order:
# https://docs.kraken.com/websockets/#message-addOrder
req = {
'event': ep,
'token': token,
'reqid': reqid, # remapped-to-int uid from ems
'pair': pair,
'price': str(order.price),
'volume': str(order.size),
# only ensures request is valid, nothing more
# validate: 'true',
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
await ws.send_msg(req)
resp = BrokerdOrderAck(
oid=order.oid, # ems order request id
reqid=reqid, # our custom int mapping
account=account, # piker account
)
await ems_order_stream.send(resp)
# placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order)
case _:
account = msg.get('account')
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
),
)
)
@acm
async def subscribe(
ws: wsproto.WSConnection,
token: str,
subs: list[str] = ['ownTrades', 'openOrders'],
):
'''
Setup ws api subscriptions:
https://docs.kraken.com/websockets/#message-subscribe
By default we sign up for trade and order update events.
'''
# more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert token
for sub in subs:
msg = {
'event': 'subscribe',
'subscription': {
'name': sub,
'token': token,
}
}
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(msg)
yield
for sub in subs:
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': [sub],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
async with get_client() as client:
# TODO: make ems flip to paper mode via
# some returned signal if the user only wants to use
# the data feed or we return this?
# await ctx.started(({}, ['paper']))
if not client._api_key:
raise RuntimeError(
'Missing Kraken API key in `brokers.toml`!?!?')
# auth required block
acctid = client._name
acc_name = 'kraken.' + acctid
# pull and deliver trades ledger
trades = await client.get_trades()
log.info(
f'Loaded {len(trades)} trades from account `{acc_name}`'
)
with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans),
)
position_msgs: list[dict] = []
pps: dict[int, pp.Position]
for pps in [active, closed]:
for tid, p in pps.items():
msg = BrokerdPosition(
broker='kraken',
account=acc_name,
symbol=p.symbol.front_fqsn(),
size=p.size,
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg)
await ctx.started(
(position_msgs, [acc_name])
)
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
err = resp.get('error')
if err:
raise BrokerError(err)
token = resp['result']['token']
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=partial(
subscribe,
token=token,
),
) as ws,
trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
):
# task local msg dialog tracking
emsflow: dict[
str,
list[MsgUnion],
] = {}
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
# task for processing inbound requests from ems
n.start_soon(
handle_order_requests,
ws,
client,
ems_stream,
token,
emsflow,
ids,
reqids2txids,
)
# enter relay loop
await handle_order_updates(
ws,
stream,
ems_stream,
emsflow,
ids,
reqids2txids,
trans,
acctid,
acc_name,
token,
)
async def handle_order_updates(
ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
Main msg handling loop for all things order management.
This code is broken out to make the context explicit and state variables
defined in the signature clear to the reader.
'''
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
trans: list[pp.Transaction]
async for msg in ws_stream:
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
case [
trades_msgs,
'ownTrades',
# won't exist for historical values?
# 'userref': reqid,
{'sequence': seq},
]:
# flatten msgs to an {id -> data} table for processing
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans}
}
for tid, trade in trades.items():
# NOTE: try to get the requid sent in the order
# request message if posssible; it may not be
# provided since this sub also returns generic
# historical trade events.
reqid = trade.get('userref', trade['ordertxid'])
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg)
# update ledger and position tracking
with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# emit any new pp msgs to ems
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# TODO
# currency=''
)
await ems_stream.send(pp_msg)
# process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders
case [
order_msgs,
'openOrders',
{'sequence': seq},
]:
for order_msg in order_msgs:
log.info(
f'Order msg update_{seq}:\n'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
match update_msg:
# we ignore internal order updates triggered by
# kraken's "edit" endpoint.
case {
'cancel_reason': 'Order replaced',
'status': status,
'userref': reqid,
**rest,
}:
continue
case {
'status': status,
'userref': reqid,
**rest,
# XXX: eg. of remaining msg schema:
# 'avg_price': _,
# 'cost': _,
# 'descr': {
# 'close': None,
# 'leverage': None,
# 'order': descr,
# 'ordertype': 'limit',
# 'pair': 'XMR/EUR',
# 'price': '74.94000000',
# 'price2': '0.00000000',
# 'type': 'buy'
# },
# 'expiretm': None,
# 'fee': '0.00000000',
# 'limitprice': '0.00000000',
# 'misc': '',
# 'oflags': 'fciq',
# 'opentm': '1656966131.337344',
# 'refid': None,
# 'starttm': None,
# 'stopprice': '0.00000000',
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm, # 0.0000
}:
ems_status = {
'open': 'submitted',
'closed': 'cancelled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
reqids2txids[reqid] = txid
oid = ids.inverse.get(reqid)
if not oid:
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Received active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid],
})
continue
msgs = emsflow[oid]
# send BrokerdStatus messages for all
# order state updates
resp = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(), # cuz why not
account=f'kraken.{acctid}',
# everyone doin camel case..
status=ems_status, # force lower case
filled=exec_vlm,
reason='', # why held?
remaining=(
float(submit_vlm)
-
float(exec_vlm)
),
broker_details=dict(
{'name': 'kraken'}, **update_msg
),
)
msgs.append(resp)
await ems_stream.send(resp)
case _:
log.warning(
'Unknown orders msg:\n'
f'{txid}:{order_msg}'
)
case {
'event': etype,
'status': status,
'reqid': reqid,
**rest,
} as event if (
etype in {
'addOrderStatus',
'editOrderStatus',
'cancelOrderStatus',
}
):
oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
txid = rest.get('txid')
if txid:
reqids2txids[reqid] = txid
msgs = emsflow[oid]
last = msgs[-1]
resps, errored = process_status(
event,
oid,
token,
msgs,
last,
)
if resps:
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp)
case _:
log.warning(f'Unhandled trades update msg: {msg}')
def process_status(
event: dict[str, str],
oid: str,
token: str,
msgs: list[MsgUnion],
last: MsgUnion,
) -> tuple[list[MsgUnion], bool]:
'''
Process `'[add/edit/cancel]OrderStatus'` events by translating to
and returning the equivalent EMS-msg responses.
'''
match event:
case {
'event': etype,
'status': 'error',
'reqid': reqid,
'errorMessage': errmsg,
}:
# any of ``{'add', 'edit', 'cancel'}``
action = etype.rstrip('OrderStatus')
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
resp = BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=reqid,
symbol=getattr(last, 'symbol', 'N/A'),
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
)
return [resp], True
# successful request cases
case {
'event': 'addOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'txid': txid,
'descr': descr, # only on success?
}:
log.info(
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
)
return [], False
case {
'event': 'editOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
return [], False
case {
"event": "cancelOrderStatus",
"status": "ok",
'reqid': reqid,
# XXX: sometimes this isn't provided!?
# 'txid': txids,
**rest,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
resps: list[MsgUnion] = []
for txid in rest.get('txid', [last.reqid]):
resp = BrokerdStatus(
reqid=reqid,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=event,
)
resps.append(resp)
return resps, False
def norm_trade_records(
ledger: dict[str, Any],
) -> list[pp.Transaction]:
records: list[pp.Transaction] = []
for tid, record in ledger.items():
size = float(record.get('vol')) * {
'buy': 1,
'sell': -1,
}[record['type']]
bsuid = record['pair']
norm_sym = normalize_symbol(bsuid)
records.append(
pp.Transaction(
fqsn=f'{norm_sym}.kraken',
tid=tid,
size=size,
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),
bsuid=bsuid,
# XXX: there are no derivs on kraken right?
# expiry=expiry,
)
)
return records
@cm
def open_ledger(
acctid: str,
trade_entries: list[dict[str, Any]],
) -> list[pp.Transaction]:
'''
Write recent session's trades to the user's (local) ledger file.
'''
with pp.open_trade_ledger(
'kraken',
acctid,
) as ledger:
# normalize to transaction form
records = norm_trade_records(trade_entries)
yield records
# update on exit
ledger.update(trade_entries)

View File

@ -1,495 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Real-time and historical data feed endpoints.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import asdict
from datetime import datetime
from typing import (
Any,
Optional,
Callable,
)
import time
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from trio_typing import TaskStatus
import tractor
import trio
import wsproto
from piker._cacheables import open_cached_client
from piker.brokers._util import (
BrokerError,
DataThrottle,
DataUnavailable,
)
from piker.log import get_console_log
from piker.data import ShmArray
from piker.data.types import Struct
from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log
from .api import (
Client,
OHLC,
)
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct):
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component
base: str # asset id of base component
aclass_quote: str # asset class of quote component
quote: str # asset id of quote component
lot: str # volume lot size
pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume
# amount to multiply lot volume by to get currency volume
lot_multiplier: float
# array of leverage amounts available when buying
leverage_buy: list[int]
# array of leverage amounts available when selling
leverage_sell: list[int]
# fee schedule array in [volume, percent fee] tuples
fees: list[tuple[int, float]]
# maker fee schedule array in [volume, percent fee] tuples (if on
# maker/taker)
fees_maker: list[tuple[int, float]]
fee_volume_currency: str # volume discount currency
margin_call: str # margin call level
margin_stop: str # stop-out/liquidation margin level
ordermin: float # minimum order volume for pair
async def stream_messages(
ws: NoBsWs,
):
'''
Message stream parser and heartbeat handler.
Deliver ws subscription messages as well as handle heartbeat logic
though a single async generator.
'''
too_slow_count = last_hb = 0
while True:
with trio.move_on_after(5) as cs:
msg = await ws.recv_msg()
# trigger reconnection if heartbeat is laggy
if cs.cancelled_caught:
too_slow_count += 1
if too_slow_count > 20:
log.warning(
"Heartbeat is too slow, resetting ws connection")
await ws._connect()
too_slow_count = 0
continue
match msg:
case {'event': 'heartbeat'}:
now = time.time()
delay = now - last_hb
last_hb = now
# XXX: why tf is this not printing without --tl flag?
log.debug(f"Heartbeat after {delay}")
# print(f"Heartbeat after {delay}")
continue
case {
'connectionID': _,
'event': 'systemStatus',
'status': 'online',
'version': _,
} as msg:
log.info(
'WS connection is up:\n'
f'{msg}'
)
continue
case _:
yield msg
async def process_data_feed_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
'''
async for msg in stream_messages(ws):
match msg:
case {
'errorMessage': errmsg
}:
raise BrokerError(errmsg)
case {
'event': 'subscriptionStatus',
} as sub:
log.info(
'WS subscription is active:\n'
f'{sub}'
)
continue
case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(
chan_id,
chan_name,
pair,
*payload_array[0]
)
elif 'spread' in chan_name:
bid, ask, ts, bsize, asize = map(
float, payload_array[0])
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
case _:
print(f'UNHANDLED MSG: {msg}')
# yield msg
def normalize(
ohlc: OHLC,
) -> dict:
quote = asdict(ohlc)
quote['broker_ts'] = quote['time']
quote['brokerd_ts'] = time.time()
quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '')
quote['last'] = quote['close']
quote['bar_wap'] = ohlc.vwap
# seriously eh? what's with this non-symmetry everywhere
# in subscription systems...
# XXX: piker style is always lowercases symbols.
topic = quote['pair'].replace('/', '').lower()
# print(quote)
return topic, quote
def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]:
'''
Create a request subscription packet dict.
https://docs.kraken.com/websockets/#message-subscribe
'''
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'pair': pairs,
'event': 'subscribe',
'subscription': data,
}
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 1, 'rate': 1}
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
'''
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# backend specific
sub_type: str = 'ohlc',
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
'''
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
ws_pairs = {}
sym_infos = {}
async with open_cached_client('kraken') as client, send_chan as send_chan:
# keep client cached for real-time section
for sym in symbols:
# transform to upper since piker style is always lower
sym = sym.upper()
si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.to_dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto'
sym_infos[sym] = syminfo
ws_pairs[sym] = si.wsname
symbol = symbols[0].lower()
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
symbol: {
'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
@acm
async def subscribe(ws: wsproto.WSConnection):
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(ohlc_sub)
# trade data (aka L1)
l1_sub = make_sub(
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
)
# pull a first quote and deliver
await ws.send_msg(l1_sub)
yield
# unsub from all pairs on teardown
await ws.send_msg({
'pair': list(ws_pairs.values()),
'event': 'unsubscribe',
'subscription': ['ohlc', 'spread'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs
async with open_autorecon_ws(
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws:
# pull a first quote and deliver
msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await anext(msg_gen)
topic, quote = normalize(ohlc_last)
task_status.started((init_msgs, quote))
# lol, only "closes" when they're margin squeezing clients ;P
feed_is_live.set()
# keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
# start streaming
async for typ, ohlc in msg_gen:
if typ == 'ohlc':
# TODO: can get rid of all this by using
# ``trades`` subscription...
# generate tick values to match time & sales pane:
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
# new OHLC sample interval
if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
ohlc_last = ohlc
last = ohlc.close
if tick_volume:
ohlc.ticks.append({
'type': 'trade',
'price': last,
'size': tick_volume,
})
topic, quote = normalize(ohlc)
elif typ == 'l1':
quote = ohlc
topic = quote['symbol'].lower()
await send_chan.send({topic: quote})
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('kraken') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
await ctx.started(cache)
async with ctx.open_stream() as stream:
async for pattern in stream:
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=50,
)
# repack in dict form
await stream.send(
{item[0]['altname']: item[0]
for item in matches}
)

View File

@ -22,10 +22,54 @@ from enum import Enum
from typing import Optional from typing import Optional
from bidict import bidict from bidict import bidict
from pydantic import BaseModel, validator
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct from ._messages import BrokerdPosition, Status
from ..pp import Position
class Position(BaseModel):
'''
Basic pp (personal position) model with attached fills history.
This type should be IPC wire ready?
'''
symbol: Symbol
# last size and avg entry price
size: float
avg_price: float # TODO: contextual pricing
# ordered record of known constituent trade messages
fills: list[Status] = []
def update_from_msg(
self,
msg: BrokerdPosition,
) -> None:
# XXX: better place to do this?
symbol = self.symbol
lot_size_digits = symbol.lot_size_digits
avg_price, size = (
round(msg['avg_price'], ndigits=symbol.tick_size_digits),
round(msg['size'], ndigits=lot_size_digits),
)
self.avg_price = avg_price
self.size = size
@property
def dsize(self) -> float:
'''
The "dollar" size of the pp, normally in trading (fiat) unit
terms.
'''
return self.avg_price * self.size
_size_units = bidict({ _size_units = bidict({
@ -40,30 +84,33 @@ SizeUnit = Enum(
) )
class Allocator(Struct): class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol symbol: Symbol
account: Optional[str] = 'paper' account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set # TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set # a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure # that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage. # unintuitive garbage.
_size_unit: str = 'currency' size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@property @validator('size_unit', pre=True)
def size_unit(self) -> str: def maybe_lookup_key(cls, v):
return self._size_unit # apply the corresponding enum key for the text "description" value
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units: if v not in _size_units:
v = _size_units.inverse[v] return _size_units.inverse[v]
assert v in _size_units assert v in _size_units
self._size_unit = v
return v return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion # TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -126,7 +173,7 @@ class Allocator(Struct):
l_sub_pp = self.units_limit - abs_live_size l_sub_pp = self.units_limit - abs_live_size
elif size_unit == 'currency': elif size_unit == 'currency':
live_cost_basis = abs_live_size * live_pp.be_price live_cost_basis = abs_live_size * live_pp.avg_price
slot_size = currency_per_slot / price slot_size = currency_per_slot / price
l_sub_pp = (self.currency_limit - live_cost_basis) / price l_sub_pp = (self.currency_limit - live_cost_basis) / price
@ -158,7 +205,7 @@ class Allocator(Struct):
if size_unit == 'currency': if size_unit == 'currency':
# compute the "projected" limit's worth of units at the # compute the "projected" limit's worth of units at the
# current pp (weighted) price: # current pp (weighted) price:
slot_size = currency_per_slot / live_pp.be_price slot_size = currency_per_slot / live_pp.avg_price
else: else:
slot_size = u_per_slot slot_size = u_per_slot
@ -197,12 +244,7 @@ class Allocator(Struct):
if order_size < slot_size: if order_size < slot_size:
# compute a fractional slots size to display # compute a fractional slots size to display
slots_used = self.slots_used( slots_used = self.slots_used(
Position( Position(symbol=sym, size=order_size, avg_price=price)
symbol=sym,
size=order_size,
be_price=price,
bsuid=sym,
)
) )
return { return {
@ -229,8 +271,8 @@ class Allocator(Struct):
abs_pp_size = abs(pp.size) abs_pp_size = abs(pp.size)
if self.size_unit == 'currency': if self.size_unit == 'currency':
# live_currency_size = size or (abs_pp_size * pp.be_price) # live_currency_size = size or (abs_pp_size * pp.avg_price)
live_currency_size = abs_pp_size * pp.be_price live_currency_size = abs_pp_size * pp.avg_price
prop = live_currency_size / self.currency_limit prop = live_currency_size / self.currency_limit
else: else:
@ -258,7 +300,7 @@ def mk_allocator(
# default allocation settings # default allocation settings
defaults: dict[str, float] = { defaults: dict[str, float] = {
'account': None, # select paper by default 'account': None, # select paper by default
# 'size_unit': 'currency', 'size_unit': 'currency',
'units_limit': 400, 'units_limit': 400,
'currency_limit': 5e3, 'currency_limit': 5e3,
'slots': 4, 'slots': 4,
@ -297,13 +339,10 @@ def mk_allocator(
# entry step 1.0 # entry step 1.0
alloc.units_limit = alloc.slots alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit # if the current position is already greater then the limit
# settings, increase the limit to the current position # settings, increase the limit to the current position
if alloc.size_unit == 'currency': if alloc.size_unit == 'currency':
startup_size = startup_pp.size * startup_pp.be_price startup_size = startup_pp.size * startup_pp.avg_price
if startup_size > alloc.currency_limit: if startup_size > alloc.currency_limit:
alloc.currency_limit = round(startup_size, ndigits=2) alloc.currency_limit = round(startup_size, ndigits=2)

View File

@ -58,11 +58,11 @@ class OrderBook:
def send( def send(
self, self,
msg: Order | dict, msg: Order,
) -> dict: ) -> dict:
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg.dict())
return msg return msg
def update( def update(
@ -73,8 +73,9 @@ class OrderBook:
) -> dict: ) -> dict:
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.copy(update=data) msg = cmd.dict()
self._sent_orders[uuid] = msg msg.update(data)
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return cmd return cmd
@ -87,7 +88,7 @@ class OrderBook:
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
) )
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg.dict())
_orders: OrderBook = None _orders: OrderBook = None
@ -148,7 +149,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd.symbol == symbol_key: if cmd['symbol'] == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)

View File

@ -20,12 +20,12 @@ In da suit parlances: "Execution management systems"
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from math import isnan
from pprint import pformat from pprint import pformat
import time import time
from typing import AsyncIterator, Callable from typing import AsyncIterator, Callable
from bidict import bidict from bidict import bidict
from pydantic import BaseModel
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
@ -33,7 +33,6 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed from ..data.feed import Feed, maybe_open_feed
from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
@ -88,8 +87,7 @@ def mk_check(
@dataclass @dataclass
class _DarkBook: class _DarkBook:
''' '''EMS-trigger execution book.
EMS-trigger execution book.
Contains conditions for executions (aka "orders" or "triggers") Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are which are not exposed to brokers and thus the market; i.e. these are
@ -232,7 +230,7 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
await brokerd_orders_stream.send(msg) await brokerd_orders_stream.send(msg.dict())
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -248,11 +246,14 @@ async def clear_dark_triggers(
msg = Status( msg = Status(
oid=oid, # ems order id oid=oid, # ems order id
time_ns=time.time_ns(),
resp=resp, resp=resp,
time_ns=time.time_ns(),
symbol=fqsn,
trigger_price=price, trigger_price=price,
brokerd_msg=cmd, broker_details={'name': broker},
) cmd=cmd, # original request message
).dict()
# remove exec-condition from set # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
@ -288,11 +289,7 @@ class TradesRelay:
brokerd_dialogue: tractor.MsgStream brokerd_dialogue: tractor.MsgStream
# map of symbols to dicts of accounts to pp msgs # map of symbols to dicts of accounts to pp msgs
positions: dict[ positions: dict[str, dict[str, BrokerdPosition]]
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
]
# allowed account names # allowed account names
accounts: tuple[str] accounts: tuple[str]
@ -301,7 +298,7 @@ class TradesRelay:
consumers: int = 0 consumers: int = 0
class Router(Struct): class Router(BaseModel):
''' '''
Order router which manages and tracks per-broker dark book, Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management. alerts, clearing and related data feed management.
@ -322,6 +319,10 @@ class Router(Struct):
# brokername to trades-dialogues streams with ``brokerd`` actors # brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {} relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book( def get_dark_book(
self, self,
brokername: str, brokername: str,
@ -460,24 +461,18 @@ async def open_brokerd_trades_dialogue(
# normalizing them to EMS messages and relaying back to # normalizing them to EMS messages and relaying back to
# the piker order client set. # the piker order client set.
# locally cache and track positions per account with # locally cache and track positions per account.
# a table of (brokername, acctid) -> `BrokerdPosition`
# msgs.
pps = {} pps = {}
for msg in positions: for msg in positions:
log.info(f'loading pp: {msg}') log.info(f'loading pp: {msg}')
account = msg['account'] account = msg['account']
# TODO: better value error for this which
# dumps the account and message and states the
# mismatch..
assert account in accounts assert account in accounts
pps.setdefault( pps.setdefault(
(broker, account), f'{msg["symbol"]}.{broker}',
[], {}
).append(msg) )[account] = msg
relay = TradesRelay( relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream, brokerd_dialogue=brokerd_trades_stream,
@ -575,17 +570,19 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg) pos_msg = BrokerdPosition(**brokerd_msg).dict()
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg.symbol, pos_msg.broker sym, broker = pos_msg['symbol'], pos_msg['broker']
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
(broker, sym), f'{sym}.{broker}',
[] {}
).append(pos_msg) ).setdefault(
pos_msg['account'], {}
).update(pos_msg)
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
@ -638,21 +635,14 @@ async def translate_and_relay_brokerd_events(
# something is out of order, we don't have an oid for # something is out of order, we don't have an oid for
# this broker-side message. # this broker-side message.
log.error( log.error(
f'Unknown oid: {oid} for msg:\n' 'Unknown oid:{oid} for msg:\n'
f'{pformat(brokerd_msg)}\n' f'{pformat(brokerd_msg)}'
'Unable to relay message to client side!?' 'Unable to relay message to client side!?'
) )
else: else:
# check for existing live flow entry # check for existing live flow entry
entry = book._ems_entries.get(oid) entry = book._ems_entries.get(oid)
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# initial response to brokerd order request # initial response to brokerd order request
if name == 'ack': if name == 'ack':
@ -663,10 +653,6 @@ async def translate_and_relay_brokerd_events(
# a ``BrokerdOrderAck`` **must** be sent after an order # a ``BrokerdOrderAck`` **must** be sent after an order
# request in order to establish this id mapping. # request in order to establish this id mapping.
book._ems2brokerd_ids[oid] = reqid book._ems2brokerd_ids[oid] = reqid
log.info(
'Rx ACK for order\n'
f'oid: {oid} -> reqid: {reqid}'
)
# new order which has not yet be registered into the # new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases: # local ems book, insert it now and handle 2 cases:
@ -681,7 +667,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid entry.reqid = reqid
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry) await brokerd_trades_stream.send(entry.dict())
# - the order is now active and will be mirrored in # - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -721,7 +707,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel # if 10147 in message: cancel
resp = 'broker_errored' resp = 'broker_errored'
broker_details = msg broker_details = msg.dict()
# don't relay message to order requester client # don't relay message to order requester client
# continue # continue
@ -756,7 +742,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field # pass the BrokerdStatus msg inside the broker details field
broker_details = msg broker_details = msg.dict()
elif name in ( elif name in (
'fill', 'fill',
@ -765,7 +751,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
resp = 'broker_filled' resp = 'broker_filled'
broker_details = msg broker_details = msg.dict()
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -783,7 +769,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=broker_details,
) ).dict()
) )
except KeyError: except KeyError:
log.error( log.error(
@ -848,14 +834,14 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
if reqid is not None: if reqid:
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg.dict())
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
@ -877,7 +863,7 @@ async def process_client_order_cmds(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) ).dict()
) )
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
@ -932,7 +918,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg.dict())
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()`` # with ems order id from the ``trades_dialogue()``
@ -957,12 +943,6 @@ async def process_client_order_cmds(
# like every other shitty tina platform that makes # like every other shitty tina platform that makes
# the user choose the predicate operator. # the user choose the predicate operator.
last = dark_book.lasts[fqsn] last = dark_book.lasts[fqsn]
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.shm.array[-1]['close']
pred = mk_check(trigger_price, last, action) pred = mk_check(trigger_price, last, action)
spread_slap: float = 5 spread_slap: float = 5
@ -1012,7 +992,7 @@ async def process_client_order_cmds(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) ).dict()
) )
@ -1108,12 +1088,15 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_dialogue # .clone() brokerd_stream = relay.brokerd_dialogue # .clone()
# flatten out collected pps from brokerd for delivery
pp_msgs = {
fqsn: list(pps.values())
for fqsn, pps in relay.positions.items()
}
# signal to client that we're started and deliver # signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``. # all known pps and accounts for this ``brokerd``.
await ems_ctx.started(( await ems_ctx.started((pp_msgs, list(relay.accounts)))
relay.positions,
list(relay.accounts),
))
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates # begin handling inbound order requests and updates
@ -1150,14 +1133,8 @@ async def _emsd_main(
) )
finally: finally:
# try to remove client from "registry" # remove client from "registry"
try: _router.clients.remove(ems_client_order_stream)
_router.clients.remove(ems_client_order_stream)
except KeyError:
log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}'
' was already dropped?'
)
dialogues = _router.dialogues dialogues = _router.dialogues

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers) # Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -15,26 +15,21 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Clearing sub-system message and protocols. Clearing system messagingn types and protocols.
""" """
from typing import Optional, Union from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd # Client -> emsd
# --------------
class Cancel(Struct):
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or '''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order. broker-submitted (live) trigger/order.
@ -44,10 +39,8 @@ class Cancel(Struct):
symbol: str symbol: str
class Order(Struct): class Order(BaseModel):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'} action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id" # internal ``emdsd`` unique "order id"
oid: str # uuid4 oid: str # uuid4
@ -55,9 +48,6 @@ class Order(Struct):
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float size: float
brokers: list[str] brokers: list[str]
@ -69,14 +59,20 @@ class Order(Struct):
# the backend broker # the backend broker
exec_mode: str # {'dark', 'live', 'paper'} exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd # Client <- emsd
# --------------
# update msgs from ems which relay state change info # update msgs from ems which relay state change info
# from the active clearing engine. # from the active clearing engine.
class Status(Struct):
class Status(BaseModel):
name: str = 'status' name: str = 'status'
oid: str # uuid4 oid: str # uuid4
@ -99,6 +95,8 @@ class Status(Struct):
# } # }
resp: str # "response", see above resp: str # "response", see above
# symbol: str
# trigger info # trigger info
trigger_price: Optional[float] = None trigger_price: Optional[float] = None
# price: float # price: float
@ -113,12 +111,10 @@ class Status(Struct):
brokerd_msg: dict = {} brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd # emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon # requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(Struct): class BrokerdCancel(BaseModel):
action: str = 'cancel' action: str = 'cancel'
oid: str # piker emsd order id oid: str # piker emsd order id
@ -134,7 +130,7 @@ class BrokerdCancel(Struct):
reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
class BrokerdOrder(Struct): class BrokerdOrder(BaseModel):
action: str # {buy, sell} action: str # {buy, sell}
oid: str oid: str
@ -154,12 +150,11 @@ class BrokerdOrder(Struct):
size: float size: float
# ---------------
# emsd <- brokerd # emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend # requests *received* to ems from broker backend
class BrokerdOrderAck(Struct):
class BrokerdOrderAck(BaseModel):
''' '''
Immediate reponse to a brokerd order request providing the broker Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this specific unique order id so that the EMS can associate this
@ -177,7 +172,7 @@ class BrokerdOrderAck(Struct):
account: str = '' account: str = ''
class BrokerdStatus(Struct): class BrokerdStatus(BaseModel):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: Union[int, str]
@ -210,7 +205,7 @@ class BrokerdStatus(Struct):
} }
class BrokerdFill(Struct): class BrokerdFill(BaseModel):
''' '''
A single message indicating a "fill-details" event from the broker A single message indicating a "fill-details" event from the broker
if avaiable. if avaiable.
@ -235,7 +230,7 @@ class BrokerdFill(Struct):
broker_time: float broker_time: float
class BrokerdError(Struct): class BrokerdError(BaseModel):
''' '''
Optional error type that can be relayed to emsd for error handling. Optional error type that can be relayed to emsd for error handling.
@ -254,7 +249,7 @@ class BrokerdError(Struct):
broker_details: dict = {} broker_details: dict = {}
class BrokerdPosition(Struct): class BrokerdPosition(BaseModel):
'''Position update event from brokerd. '''Position update event from brokerd.
''' '''
@ -263,6 +258,6 @@ class BrokerdPosition(Struct):
broker: str broker: str
account: str account: str
symbol: str symbol: str
currency: str
size: float size: float
avg_price: float avg_price: float
currency: str = ''

View File

@ -31,8 +31,6 @@ import tractor
from dataclasses import dataclass from dataclasses import dataclass
from .. import data from .. import data
from ..data._source import Symbol
from ..pp import Position
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
from ..log import get_logger from ..log import get_logger
@ -117,7 +115,7 @@ class PaperBoi:
reason='paper_trigger', reason='paper_trigger',
remaining=size, remaining=size,
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
# if we're already a clearing price simulate an immediate fill # if we're already a clearing price simulate an immediate fill
if ( if (
@ -173,7 +171,7 @@ class PaperBoi:
broker=self.broker, broker=self.broker,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
async def fake_fill( async def fake_fill(
self, self,
@ -216,7 +214,7 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
if order_complete: if order_complete:
@ -240,7 +238,7 @@ class PaperBoi:
'name': self.broker, 'name': self.broker,
}, },
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
@ -259,16 +257,31 @@ class PaperBoi:
) )
) )
# delegate update to `.pp.Position.lifo_update()` # "avg position price" calcs
pp = Position( # TODO: eventually it'd be nice to have a small set of routines
Symbol(key=symbol), # to do this stuff from a sequence of cleared orders to enable
size=pp_msg.size, # so called "contextual positions".
be_price=pp_msg.avg_price, new_size = size + pp_msg.size
bsuid=symbol,
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg) # old size minus the new size gives us size differential with
# +ve -> increase in pp size
# -ve -> decrease in pp size
size_diff = abs(new_size) - abs(pp_msg.size)
if new_size == 0:
pp_msg.avg_price = 0
elif size_diff > 0:
# only update the "average position price" when the position
# size increases not when it decreases (i.e. the position is
# being made smaller)
pp_msg.avg_price = (
abs(size) * price + pp_msg.avg_price * abs(pp_msg.size)
) / abs(new_size)
pp_msg.size = new_size
await self.ems_trades_stream.send(pp_msg.dict())
async def simulate_fills( async def simulate_fills(
@ -377,14 +390,13 @@ async def handle_order_requests(
account = request_msg['account'] account = request_msg['account']
if account != 'paper': if account != 'paper':
log.error( log.error(
'This is a paper account,' 'This is a paper account, only a `paper` selection is valid'
' only a `paper` selection is valid'
) )
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
)) ).dict())
continue continue
# validate # validate
@ -416,7 +428,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
) ).dict()
) )
elif action == 'cancel': elif action == 'cancel':
@ -452,7 +464,7 @@ async def trades_dialogue(
# TODO: load paper positions per broker from .toml config file # TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]`` # and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions) # await ctx.started(all_positions)
await ctx.started(({}, ['paper'])) await ctx.started(({}, {'paper',}))
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,

View File

@ -83,9 +83,9 @@ def pikerd(loglevel, host, tl, pdb, tsdb):
) )
log.info( log.info(
f'`marketstored` up!\n' f'`marketstore` up!\n'
f'pid: {pid}\n' f'`marketstored` pid: {pid}\n'
f'container id: {cid[:12]}\n' f'docker container id: {cid}\n'
f'config: {pformat(config)}' f'config: {pformat(config)}'
) )

View File

@ -21,7 +21,6 @@ Broker configuration mgmt.
import platform import platform
import sys import sys
import os import os
from os import path
from os.path import dirname from os.path import dirname
import shutil import shutil
from typing import Optional from typing import Optional
@ -112,7 +111,6 @@ if _parent_user:
_conf_names: set[str] = { _conf_names: set[str] = {
'brokers', 'brokers',
'pps',
'trades', 'trades',
'watchlists', 'watchlists',
} }
@ -149,21 +147,19 @@ def get_conf_path(
conf_name: str = 'brokers', conf_name: str = 'brokers',
) -> str: ) -> str:
''' """Return the default config path normally under
Return the top-level default config path normally under ``~/.config/piker`` on linux.
``~/.config/piker`` on linux for a given ``conf_name``, the config
name.
Contains files such as: Contains files such as:
- brokers.toml - brokers.toml
- pp.toml
- watchlists.toml - watchlists.toml
- trades.toml
# maybe coming soon ;) # maybe coming soon ;)
- signals.toml - signals.toml
- strats.toml - strats.toml
''' """
assert conf_name in _conf_names assert conf_name in _conf_names
fn = _conf_fn_w_ext(conf_name) fn = _conf_fn_w_ext(conf_name)
return os.path.join( return os.path.join(
@ -177,7 +173,7 @@ def repodir():
Return the abspath to the repo directory. Return the abspath to the repo directory.
''' '''
dirpath = path.abspath( dirpath = os.path.abspath(
# we're 3 levels down in **this** module file # we're 3 levels down in **this** module file
dirname(dirname(os.path.realpath(__file__))) dirname(dirname(os.path.realpath(__file__)))
) )
@ -186,9 +182,7 @@ def repodir():
def load( def load(
conf_name: str = 'brokers', conf_name: str = 'brokers',
path: str = None, path: str = None
**tomlkws,
) -> (dict, str): ) -> (dict, str):
''' '''
@ -196,7 +190,6 @@ def load(
''' '''
path = path or get_conf_path(conf_name) path = path or get_conf_path(conf_name)
if not os.path.isfile(path): if not os.path.isfile(path):
fn = _conf_fn_w_ext(conf_name) fn = _conf_fn_w_ext(conf_name)
@ -209,11 +202,8 @@ def load(
# if one exists. # if one exists.
if os.path.isfile(template): if os.path.isfile(template):
shutil.copyfile(template, path) shutil.copyfile(template, path)
else:
with open(path, 'w'):
pass # touch
config = toml.load(path, **tomlkws) config = toml.load(path)
log.debug(f"Read config file {path}") log.debug(f"Read config file {path}")
return config, path return config, path
@ -222,7 +212,6 @@ def write(
config: dict, # toml config as dict config: dict, # toml config as dict
name: str = 'brokers', name: str = 'brokers',
path: str = None, path: str = None,
**toml_kwargs,
) -> None: ) -> None:
'''' ''''
@ -246,14 +235,11 @@ def write(
f"{path}" f"{path}"
) )
with open(path, 'w') as cf: with open(path, 'w') as cf:
return toml.dump( return toml.dump(config, cf)
config,
cf,
**toml_kwargs,
)
def load_accounts( def load_accounts(
providers: Optional[list[str]] = None providers: Optional[list[str]] = None
) -> bidict[str, Optional[str]]: ) -> bidict[str, Optional[str]]:

View File

@ -37,7 +37,6 @@ from docker.models.containers import Container as DockerContainer
from docker.errors import ( from docker.errors import (
DockerException, DockerException,
APIError, APIError,
# ContainerError,
) )
from requests.exceptions import ConnectionError, ReadTimeout from requests.exceptions import ConnectionError, ReadTimeout
@ -51,8 +50,8 @@ class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh' 'Prolly you dint start da daemon bruh'
class ApplicationLogError(Exception): class ContainerError(RuntimeError):
'App in container reported an error in logs' 'Error reported via app-container logging level'
@acm @acm
@ -97,9 +96,9 @@ async def open_docker(
# not perms? # not perms?
raise raise
# finally: finally:
# if client: if client:
# client.close() client.close()
class Container: class Container:
@ -157,7 +156,7 @@ class Container:
# print(f'level: {level}') # print(f'level: {level}')
if level in ('error', 'fatal'): if level in ('error', 'fatal'):
raise ApplicationLogError(msg) raise ContainerError(msg)
if patt in msg: if patt in msg:
return True return True
@ -186,21 +185,6 @@ class Container:
if 'is not running' in err.explanation: if 'is not running' in err.explanation:
return False return False
def hard_kill(self, start: float) -> None:
delay = time.time() - start
log.error(
f'Failed to kill container {self.cntr.id} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
condition='not-running',
)
async def cancel( async def cancel(
self, self,
stop_msg: str, stop_msg: str,
@ -247,9 +231,21 @@ class Container:
ConnectionError, ConnectionError,
): ):
log.exception('Docker connection failure') log.exception('Docker connection failure')
self.hard_kill(start) break
else: else:
self.hard_kill(start) delay = time.time() - start
log.error(
f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
condition='not-running',
)
log.cancel(f'Container stopped: {cid}') log.cancel(f'Container stopped: {cid}')

View File

@ -27,14 +27,13 @@ from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
if _USE_POSIX: if _USE_POSIX:
from _posixshmem import shm_unlink from _posixshmem import shm_unlink
# import msgspec
import numpy as np
from numpy.lib import recfunctions as rfn
import tractor import tractor
import numpy as np
from pydantic import BaseModel
from numpy.lib import recfunctions as rfn
from ..log import get_logger from ..log import get_logger
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
from .types import Struct
log = get_logger(__name__) log = get_logger(__name__)
@ -108,12 +107,15 @@ class SharedInt:
log.warning(f'Shm for {name} already unlinked?') log.warning(f'Shm for {name} already unlinked?')
class _Token(Struct, frozen=True): class _Token(BaseModel):
''' '''
Internal represenation of a shared memory "token" Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry. which can be used to key a system wide post shm entry.
''' '''
class Config:
frozen = True
shm_name: str # this servers as a "key" value shm_name: str # this servers as a "key" value
shm_first_index_name: str shm_first_index_name: str
shm_last_index_name: str shm_last_index_name: str
@ -124,22 +126,17 @@ class _Token(Struct, frozen=True):
return np.dtype(list(map(tuple, self.dtype_descr))).descr return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self): def as_msg(self):
return self.to_dict() return self.dict()
@classmethod @classmethod
def from_msg(cls, msg: dict) -> _Token: def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token): if isinstance(msg, _Token):
return msg return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg) return _Token(**msg)
# _token_dec = msgspec.msgpack.Decoder(_Token)
# TODO: this api? # TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {}) # _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', ) # _known_tokens = tractor.ContextStack('_known_tokens', )
@ -170,7 +167,7 @@ def _make_token(
shm_name=key, shm_name=key,
shm_first_index_name=key + "_first", shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last", shm_last_index_name=key + "_last",
dtype_descr=tuple(np.dtype(dtype).descr) dtype_descr=np.dtype(dtype).descr
) )

View File

@ -23,7 +23,7 @@ import decimal
from bidict import bidict from bidict import bidict
import numpy as np import numpy as np
from msgspec import Struct from pydantic import BaseModel
# from numba import from_dtype # from numba import from_dtype
@ -126,7 +126,7 @@ def unpack_fqsn(fqsn: str) -> tuple[str, str, str]:
) )
class Symbol(Struct): class Symbol(BaseModel):
''' '''
I guess this is some kinda container thing for dealing with I guess this is some kinda container thing for dealing with
all the different meta-data formats from brokers? all the different meta-data formats from brokers?
@ -152,7 +152,9 @@ class Symbol(Struct):
info: dict[str, Any], info: dict[str, Any],
suffix: str = '', suffix: str = '',
) -> Symbol: # XXX: like wtf..
# ) -> 'Symbol':
) -> None:
tick_size = info.get('price_tick_size', 0.01) tick_size = info.get('price_tick_size', 0.01)
lot_tick_size = info.get('lot_tick_size', 0.0) lot_tick_size = info.get('lot_tick_size', 0.0)
@ -173,7 +175,9 @@ class Symbol(Struct):
fqsn: str, fqsn: str,
info: dict[str, Any], info: dict[str, Any],
) -> Symbol: # XXX: like wtf..
# ) -> 'Symbol':
) -> None:
broker, key, suffix = unpack_fqsn(fqsn) broker, key, suffix = unpack_fqsn(fqsn)
return cls.from_broker_info( return cls.from_broker_info(
broker, broker,
@ -236,7 +240,7 @@ class Symbol(Struct):
''' '''
tokens = self.tokens() tokens = self.tokens()
fqsn = '.'.join(map(str.lower, tokens)) fqsn = '.'.join(tokens)
return fqsn return fqsn
def iterfqsns(self) -> list[str]: def iterfqsns(self) -> list[str]:

View File

@ -53,11 +53,13 @@ class NoBsWs:
def __init__( def __init__(
self, self,
url: str, url: str,
token: str,
stack: AsyncExitStack, stack: AsyncExitStack,
fixture: Callable, fixture: Callable,
serializer: ModuleType = json, serializer: ModuleType = json,
): ):
self.url = url self.url = url
self.token = token
self.fixture = fixture self.fixture = fixture
self._stack = stack self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa self._ws: 'WebSocketConnection' = None # noqa
@ -81,9 +83,14 @@ class NoBsWs:
trio_websocket.open_websocket_url(self.url) trio_websocket.open_websocket_url(self.url)
) )
# rerun user code fixture # rerun user code fixture
ret = await self._stack.enter_async_context( if self.token == '':
self.fixture(self) ret = await self._stack.enter_async_context(
) self.fixture(self)
)
else:
ret = await self._stack.enter_async_context(
self.fixture(self, self.token)
)
assert ret is None assert ret is None
@ -128,13 +135,14 @@ async def open_autorecon_ws(
# TODO: proper type annot smh # TODO: proper type annot smh
fixture: Callable, fixture: Callable,
# used for authenticated websockets
token: str = '',
) -> AsyncGenerator[tuple[...], NoBsWs]: ) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em. """Apparently we can QoS for all sorts of reasons..so catch em.
""" """
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
ws = NoBsWs(url, stack, fixture=fixture) ws = NoBsWs(url, token, stack, fixture=fixture)
await ws._connect() await ws._connect()
try: try:

View File

@ -42,6 +42,7 @@ from trio_typing import TaskStatus
import trimeter import trimeter
import tractor import tractor
from tractor.trionics import maybe_open_context from tractor.trionics import maybe_open_context
from pydantic import BaseModel
import pendulum import pendulum
import numpy as np import numpy as np
@ -58,7 +59,6 @@ from ._sharedmem import (
ShmArray, ShmArray,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from .types import Struct
from ._source import ( from ._source import (
base_iohlc_dtype, base_iohlc_dtype,
Symbol, Symbol,
@ -84,7 +84,7 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
class _FeedsBus(Struct): class _FeedsBus(BaseModel):
''' '''
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
@ -100,6 +100,10 @@ class _FeedsBus(Struct):
a dedicated cancel scope. a dedicated cancel scope.
''' '''
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
brokername: str brokername: str
nursery: trio.Nursery nursery: trio.Nursery
feeds: dict[str, tuple[dict, dict]] = {} feeds: dict[str, tuple[dict, dict]] = {}

View File

@ -1,68 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -114,7 +114,7 @@ async def fsp_compute(
dict[str, np.ndarray], # multi-output case dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case np.ndarray, # single output case
] ]
history_output = await anext(out_stream) history_output = await out_stream.__anext__()
func_name = func.__name__ func_name = func.__name__
profiler(f'{func_name} generated history') profiler(f'{func_name} generated history')
@ -374,8 +374,7 @@ async def cascade(
'key': dst_shm_token, 'key': dst_shm_token,
'first': dst._first.value, 'first': dst._first.value,
'last': dst._last.value, 'last': dst._last.value,
} }})
})
return tracker, index return tracker, index
def is_synced( def is_synced(

View File

@ -1,788 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Personal/Private position parsing, calculating, summarizing in a way
that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends)
'''
from collections import deque
from contextlib import contextmanager as cm
# from pprint import pformat
import os
from os import path
from math import copysign
import re
import time
from typing import (
Any,
Optional,
Union,
)
from msgspec import Struct
import pendulum
from pendulum import datetime, now
import tomli
import toml
from . import config
from .brokers import get_brokermod
from .clearing._messages import BrokerdPosition, Status
from .data._source import Symbol
from .log import get_logger
log = get_logger(__name__)
@cm
def open_trade_ledger(
broker: str,
account: str,
) -> str:
'''
Indempotently create and read in a trade log file from the
``<configuration_dir>/ledgers/`` directory.
Files are named per broker account of the form
``<brokername>_<accountname>.toml``. The ``accountname`` here is the
name as defined in the user's ``brokers.toml`` config.
'''
ldir = path.join(config._config_dir, 'ledgers')
if not path.isdir(ldir):
os.makedirs(ldir)
fname = f'trades_{broker}_{account}.toml'
tradesfile = path.join(ldir, fname)
if not path.isfile(tradesfile):
log.info(
f'Creating new local trades ledger: {tradesfile}'
)
with open(tradesfile, 'w') as cf:
pass # touch
with open(tradesfile, 'rb') as cf:
start = time.time()
ledger = tomli.load(cf)
print(f'Ledger load took {time.time() - start}s')
cpy = ledger.copy()
try:
yield cpy
finally:
if cpy != ledger:
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ledger for {tradesfile}:\n')
ledger.update(cpy)
# we write on close the mutated ledger data
with open(tradesfile, 'w') as cf:
return toml.dump(ledger, cf)
class Transaction(Struct):
# TODO: should this be ``.to`` (see below)?
fqsn: str
tid: Union[str, int] # unique transaction id
size: float
price: float
cost: float # commisions or other additional costs
dt: datetime
expiry: Optional[datetime] = None
# optional key normally derived from the broker
# backend which ensures the instrument-symbol this record
# is for is truly unique.
bsuid: Optional[Union[str, int]] = None
# optional fqsn for the source "asset"/money symbol?
# from: Optional[str] = None
class Position(Struct):
'''
Basic pp (personal/piker position) model with attached clearing
transaction history.
'''
symbol: Symbol
# can be +ve or -ve for long/short
size: float
# "breakeven price" above or below which pnl moves above and below
# zero for the entirety of the current "trade state".
be_price: float
# unique backend symbol id
bsuid: str
# ordered record of known constituent trade messages
clears: dict[
Union[str, int, Status], # trade id
dict[str, Any], # transaction history summaries
] = {}
expiry: Optional[datetime] = None
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def to_pretoml(self) -> dict:
'''
Prep this position's data contents for export to toml including
re-structuring of the ``.clears`` table to an array of
inline-subtables for better ``pps.toml`` compactness.
'''
d = self.to_dict()
clears = d.pop('clears')
expiry = d.pop('expiry')
if expiry:
d['expiry'] = str(expiry)
clears_list = []
for tid, data in clears.items():
inline_table = toml.TomlDecoder().get_empty_inline_table()
inline_table['tid'] = tid
for k, v in data.items():
inline_table[k] = v
clears_list.append(inline_table)
d['clears'] = clears_list
return d
def update_from_msg(
self,
msg: BrokerdPosition,
) -> None:
# XXX: better place to do this?
symbol = self.symbol
lot_size_digits = symbol.lot_size_digits
be_price, size = (
round(
msg['avg_price'],
ndigits=symbol.tick_size_digits
),
round(
msg['size'],
ndigits=lot_size_digits
),
)
self.be_price = be_price
self.size = size
@property
def dsize(self) -> float:
'''
The "dollar" size of the pp, normally in trading (fiat) unit
terms.
'''
return self.be_price * self.size
def update(
self,
t: Transaction,
) -> None:
self.clears[t.tid] = {
'cost': t.cost,
'price': t.price,
'size': t.size,
'dt': str(t.dt),
}
def lifo_update(
self,
size: float,
price: float,
cost: float = 0,
# TODO: idea: "real LIFO" dynamic positioning.
# - when a trade takes place where the pnl for
# the (set of) trade(s) is below the breakeven price
# it may be that the trader took a +ve pnl on a short(er)
# term trade in the same account.
# - in this case we could recalc the be price to
# be reverted back to it's prior value before the nearest term
# trade was opened.?
# dynamic_breakeven_price: bool = False,
) -> (float, float):
'''
Incremental update using a LIFO-style weighted mean.
'''
# "avg position price" calcs
# TODO: eventually it'd be nice to have a small set of routines
# to do this stuff from a sequence of cleared orders to enable
# so called "contextual positions".
new_size = self.size + size
# old size minus the new size gives us size diff with
# +ve -> increase in pp size
# -ve -> decrease in pp size
size_diff = abs(new_size) - abs(self.size)
if new_size == 0:
self.be_price = 0
elif size_diff > 0:
# XXX: LOFI incremental update:
# only update the "average price" when
# the size increases not when it decreases (i.e. the
# position is being made smaller)
self.be_price = (
# weight of current exec = (size * price) + cost
(abs(size) * price)
+
(copysign(1, new_size) * cost) # transaction cost
+
# weight of existing be price
self.be_price * abs(self.size) # weight of previous pp
) / abs(new_size) # normalized by the new size: weighted mean.
self.size = new_size
return new_size, self.be_price
def minimize_clears(
self,
) -> dict[str, dict]:
'''
Minimize the position's clears entries by removing
all transactions before the last net zero size to avoid
unecessary history irrelevant to the current pp state.
'''
size: float = self.size
clears_since_zero: deque[tuple(str, dict)] = deque()
# scan for the last "net zero" position by
# iterating clears in reverse.
for tid, clear in reversed(self.clears.items()):
size -= clear['size']
clears_since_zero.appendleft((tid, clear))
if size == 0:
break
self.clears = dict(clears_since_zero)
return self.clears
def update_pps(
records: dict[str, Transaction],
pps: Optional[dict[str, Position]] = None
) -> dict[str, Position]:
'''
Compile a set of positions from a trades ledger.
'''
pps: dict[str, Position] = pps or {}
# lifo update all pps from records
for r in records:
pp = pps.setdefault(
r.bsuid,
# if no existing pp, allocate fresh one.
Position(
Symbol.from_fqsn(
r.fqsn,
info={},
),
size=0.0,
be_price=0.0,
bsuid=r.bsuid,
expiry=r.expiry,
)
)
# don't do updates for ledger records we already have
# included in the current pps state.
if r.tid in pp.clears:
# NOTE: likely you'll see repeats of the same
# ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from
# the current session, so we need to make sure we don't
# "double count" these in pp calculations.
continue
# lifo style "breakeven" price calc
pp.lifo_update(
r.size,
r.price,
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
# it will be dynamic based on exit stratetgy).
cost=2*r.cost,
)
# track clearing data
pp.update(r)
return pps
def load_pps_from_ledger(
brokername: str,
acctname: str,
# post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None,
) -> dict[str, Position]:
'''
Open a ledger file by broker name and account and read in and
process any trade records into our normalized ``Transaction``
form and then pass these into the position processing routine
and deliver the two dict-sets of the active and closed pps.
'''
with open_trade_ledger(
brokername,
acctname,
) as ledger:
if not ledger:
# null case, no ledger file with content
return {}
brokermod = get_brokermod(brokername)
src_records = brokermod.norm_trade_records(ledger)
if filter_by:
bsuids = set(filter_by)
records = list(filter(lambda r: r.bsuid in bsuids, src_records))
else:
records = src_records
return update_pps(records)
def get_pps(
brokername: str,
acctids: Optional[set[str]] = set(),
) -> dict[str, dict[str, Position]]:
'''
Read out broker-specific position entries from
incremental update file: ``pps.toml``.
'''
conf, path = config.load(
'pps',
# load dicts as inlines to preserve compactness
# _dict=toml.decoder.InlineTableDict,
)
all_active = {}
all_closed = {}
# try to load any ledgers if no section found
bconf, path = config.load('brokers')
accounts = bconf[brokername]['accounts']
for account in accounts:
# TODO: instead of this filter we could
# always send all known pps but just not audit
# them since an active client might not be up?
if (
acctids and
f'{brokername}.{account}' not in acctids
):
continue
active, closed = update_pps_conf(brokername, account)
all_active.setdefault(account, {}).update(active)
all_closed.setdefault(account, {}).update(closed)
return all_active, all_closed
# TODO: instead see if we can hack tomli and tomli-w to do the same:
# - https://github.com/hukkin/tomli
# - https://github.com/hukkin/tomli-w
class PpsEncoder(toml.TomlEncoder):
'''
Special "styled" encoder that makes a ``pps.toml`` redable and
compact by putting `.clears` tables inline and everything else
flat-ish.
'''
separator = ','
def dump_list(self, v):
'''
Dump an inline list with a newline after every element and
with consideration for denoted inline table types.
'''
retval = "[\n"
for u in v:
if isinstance(u, toml.decoder.InlineTableDict):
out = self.dump_inline_table(u)
else:
out = str(self.dump_value(u))
retval += " " + out + "," + "\n"
retval += "]"
return retval
def dump_inline_table(self, section):
"""Preserve inline table in its compact syntax instead of expanding
into subsection.
https://github.com/toml-lang/toml#user-content-inline-table
"""
val_list = []
for k, v in section.items():
# if isinstance(v, toml.decoder.InlineTableDict):
if isinstance(v, dict):
val = self.dump_inline_table(v)
else:
val = str(self.dump_value(v))
val_list.append(k + " = " + val)
retval = "{ " + ", ".join(val_list) + " }"
return retval
def dump_sections(self, o, sup):
retstr = ""
if sup != "" and sup[-1] != ".":
sup += '.'
retdict = self._dict()
arraystr = ""
for section in o:
qsection = str(section)
value = o[section]
if not re.match(r'^[A-Za-z0-9_-]+$', section):
qsection = toml.encoder._dump_str(section)
# arrayoftables = False
if (
self.preserve
and isinstance(value, toml.decoder.InlineTableDict)
):
retstr += (
qsection
+
" = "
+
self.dump_inline_table(o[section])
+
'\n' # only on the final terminating left brace
)
# XXX: this code i'm pretty sure is just blatantly bad
# and/or wrong..
# if isinstance(o[section], list):
# for a in o[section]:
# if isinstance(a, dict):
# arrayoftables = True
# if arrayoftables:
# for a in o[section]:
# arraytabstr = "\n"
# arraystr += "[[" + sup + qsection + "]]\n"
# s, d = self.dump_sections(a, sup + qsection)
# if s:
# if s[0] == "[":
# arraytabstr += s
# else:
# arraystr += s
# while d:
# newd = self._dict()
# for dsec in d:
# s1, d1 = self.dump_sections(d[dsec], sup +
# qsection + "." +
# dsec)
# if s1:
# arraytabstr += ("[" + sup + qsection +
# "." + dsec + "]\n")
# arraytabstr += s1
# for s1 in d1:
# newd[dsec + "." + s1] = d1[s1]
# d = newd
# arraystr += arraytabstr
elif isinstance(value, dict):
retdict[qsection] = o[section]
elif o[section] is not None:
retstr += (
qsection
+
" = "
+
str(self.dump_value(o[section]))
)
# if not isinstance(value, dict):
if not isinstance(value, toml.decoder.InlineTableDict):
# inline tables should not contain newlines:
# https://toml.io/en/v1.0.0#inline-table
retstr += '\n'
else:
raise ValueError(value)
retstr += arraystr
return (retstr, retdict)
def load_pps_from_toml(
brokername: str,
acctid: str,
# XXX: there is an edge case here where we may want to either audit
# the retrieved ``pps.toml`` output or reprocess it since there was
# an error on write on the last attempt to update the state file
# even though the ledger *was* updated. For this cases we allow the
# caller to pass in a symbol set they'd like to reload from the
# underlying ledger to be reprocessed in computing pps state.
reload_records: Optional[dict[str, str]] = None,
update_from_ledger: bool = False,
) -> tuple[dict, dict[str, Position]]:
'''
Load and marshal to objects all pps from either an existing
``pps.toml`` config, or from scratch from a ledger file when
none yet exists.
'''
conf, path = config.load('pps')
brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {})
pp_objs = {}
# no pps entry yet for this broker/account so parse any available
# ledgers to build a brand new pps state.
if not pps or update_from_ledger:
pp_objs = load_pps_from_ledger(
brokername,
acctid,
)
# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
elif (
pps and reload_records
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
pp_objs = load_pps_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
if not pps:
log.warning(
f'No `pps.toml` positions could be loaded {brokername}:{acctid}'
)
# unmarshal/load ``pps.toml`` config entries into object form.
for fqsn, entry in pps.items():
bsuid = entry['bsuid']
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our
# ``pps.toml``).
pp = pp_objs.get(bsuid)
if pp:
clears = pp.clears
else:
clears = {}
for clears_table in clears_list:
tid = clears_table.pop('tid')
clears[tid] = clears_table
size = entry['size']
# TODO: an audit system for existing pps entries?
# if not len(clears) == abs(size):
# pp_objs = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=reload_records,
# )
# reason = 'size <-> len(clears) mismatch'
# raise ValueError(
# '`pps.toml` entry is invalid:\n'
# f'{fqsn}\n'
# f'{pformat(entry)}'
# )
expiry = entry.get('expiry')
if expiry:
expiry = pendulum.parse(expiry)
pp_objs[bsuid] = Position(
Symbol.from_fqsn(fqsn, info={}),
size=size,
be_price=entry['be_price'],
expiry=expiry,
bsuid=entry['bsuid'],
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
# already included in the current incremental update
# state, since today's records may have already been
# processed!
clears=clears,
)
return conf, pp_objs
def update_pps_conf(
brokername: str,
acctid: str,
trade_records: Optional[list[Transaction]] = None,
ledger_reload: Optional[dict[str, str]] = None,
) -> tuple[
dict[str, Position],
dict[str, Position],
]:
# this maps `.bsuid` values to positions
pp_objs: dict[Union[str, int], Position]
if trade_records and ledger_reload:
for r in trade_records:
ledger_reload[r.bsuid] = r.fqsn
conf, pp_objs = load_pps_from_toml(
brokername,
acctid,
reload_records=ledger_reload,
)
# update all pp objects from any (new) trade records which
# were passed in (aka incremental update case).
if trade_records:
pp_objs = update_pps(
trade_records,
pps=pp_objs,
)
pp_entries = {} # dict-serialize all active pps
# NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react
# based on the closure.
closed_pp_objs: dict[str, Position] = {}
for bsuid in list(pp_objs):
pp = pp_objs[bsuid]
# XXX: debug hook for size mismatches
# if bsuid == 447767096:
# breakpoint()
pp.minimize_clears()
if (
pp.size == 0
# drop time-expired positions (normally derivatives)
or (pp.expiry and pp.expiry < now())
):
# if expired the position is closed
pp.size = 0
# position is already closed aka "net zero"
closed_pp = pp_objs.pop(bsuid, None)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
else:
# serialize to pre-toml form
asdict = pp.to_pretoml()
if pp.expiry is None:
asdict.pop('expiry', None)
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.removeprefix(f'{brokername}.')
pp_entries[brokerless_key] = asdict
conf[brokername][acctid] = pp_entries
# TODO: why tf haven't they already done this for inline tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[toml.decoder.InlineTableDict] = enc.dump_inline_table
config.write(
conf,
'pps',
encoder=enc,
)
# deliver object form of all pps in table to caller
return pp_objs, closed_pp_objs
if __name__ == '__main__':
import sys
args = sys.argv
assert len(args) > 1, 'Specifiy account(s) from `brokers.toml`'
args = args[1:]
for acctid in args:
broker, name = acctid.split('.')
update_pps_conf(broker, name)

View File

@ -230,26 +230,25 @@ class GodWidget(QWidget):
# - we'll probably want per-instrument/provider state here? # - we'll probably want per-instrument/provider state here?
# change the order config form over to the new chart # change the order config form over to the new chart
# XXX: since the pp config is a singleton widget we have to
# also switch it over to the new chart's interal-layout
# self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane)
chart = linkedsplits.chart
# chart is already in memory so just focus it # chart is already in memory so just focus it
linkedsplits.show() linkedsplits.show()
linkedsplits.focus() linkedsplits.focus()
linkedsplits.graphics_cycle() linkedsplits.graphics_cycle()
await trio.sleep(0) await trio.sleep(0)
# XXX: since the pp config is a singleton widget we have to
# also switch it over to the new chart's interal-layout
# self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane)
chart = linkedsplits.chart
# resume feeds *after* rendering chart view asap # resume feeds *after* rendering chart view asap
if chart: chart.resume_all_feeds()
chart.resume_all_feeds()
# TODO: we need a check to see if the chart # TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's # last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then # still in view, if the user was viewing history then
# do nothing yah? # do nothing yah?
chart.default_view() chart.default_view()
self.linkedsplits = linkedsplits self.linkedsplits = linkedsplits
symbol = linkedsplits.symbol symbol = linkedsplits.symbol
@ -761,18 +760,9 @@ class ChartPlotWidget(pg.PlotWidget):
self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem)
# indempotent startup flag for auto-yrange subsys
# to detect the "first time" y-domain graphics begin
# to be shown in the (main) graphics view.
self._on_screen: bool = False
def resume_all_feeds(self): def resume_all_feeds(self):
try: for feed in self._feeds.values():
for feed in self._feeds.values(): self.linked.godwidget._root_n.start_soon(feed.resume)
self.linked.godwidget._root_n.start_soon(feed.resume)
except RuntimeError:
# TODO: cancel the qtractor runtime here?
raise
def pause_all_feeds(self): def pause_all_feeds(self):
for feed in self._feeds.values(): for feed in self._feeds.values():
@ -869,8 +859,7 @@ class ChartPlotWidget(pg.PlotWidget):
def default_view( def default_view(
self, self,
bars_from_y: int = 616, bars_from_y: int = 3000,
do_ds: bool = True,
) -> None: ) -> None:
''' '''
@ -931,11 +920,8 @@ class ChartPlotWidget(pg.PlotWidget):
max=end, max=end,
padding=0, padding=0,
) )
self.view.maybe_downsample_graphics()
if do_ds: view._set_yrange()
self.view.maybe_downsample_graphics()
view._set_yrange()
try: try:
self.linked.graphics_cycle() self.linked.graphics_cycle()
except IndexError: except IndexError:
@ -1269,6 +1255,7 @@ class ChartPlotWidget(pg.PlotWidget):
If ``bars_range`` is provided use that range. If ``bars_range`` is provided use that range.
''' '''
# print(f'Chart[{self.name}].maxmin()')
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
msg=f'`{str(self)}.maxmin(name={name})`: `{self.name}`', msg=f'`{str(self)}.maxmin(name={name})`: `{self.name}`',
disabled=not pg_profile_enabled(), disabled=not pg_profile_enabled(),
@ -1300,18 +1287,11 @@ class ChartPlotWidget(pg.PlotWidget):
key = round(lbar), round(rbar) key = round(lbar), round(rbar)
res = flow.maxmin(*key) res = flow.maxmin(*key)
if res == (None, None):
if ( log.error(
res is None
):
log.warning(
f"{flow_key} no mxmn for bars_range => {key} !?" f"{flow_key} no mxmn for bars_range => {key} !?"
) )
res = 0, 0 res = 0, 0
if not self._on_screen:
self.default_view(do_ds=False)
self._on_screen = True
profiler(f'yrange mxmn: {key} -> {res}') profiler(f'yrange mxmn: {key} -> {res}')
# print(f'{flow_key} yrange mxmn: {key} -> {res}')
return res return res

View File

@ -223,20 +223,14 @@ def ds_m4(
assert frames >= (xrange / uppx) assert frames >= (xrange / uppx)
# call into ``numba`` # call into ``numba``
( nb, i_win, y_out = _m4(
nb,
x_out,
y_out,
ymn,
ymx,
) = _m4(
x, x,
y, y,
frames, frames,
# TODO: see func below.. # TODO: see func below..
# x_out, # i_win,
# y_out, # y_out,
# first index in x data to start at # first index in x data to start at
@ -249,11 +243,10 @@ def ds_m4(
# filter out any overshoot in the input allocation arrays by # filter out any overshoot in the input allocation arrays by
# removing zero-ed tail entries which should start at a certain # removing zero-ed tail entries which should start at a certain
# index. # index.
x_out = x_out[x_out != 0] i_win = i_win[i_win != 0]
y_out = y_out[:x_out.size] y_out = y_out[:i_win.size]
# print(f'M4 output ymn, ymx: {ymn},{ymx}') return nb, i_win, y_out
return nb, x_out, y_out, ymn, ymx
@jit( @jit(
@ -267,8 +260,8 @@ def _m4(
frames: int, frames: int,
# TODO: using this approach, having the ``.zeros()`` alloc lines # TODO: using this approach by having the ``.zeros()`` alloc lines
# below in pure python, there were segs faults and alloc crashes.. # below, in put python was causing segs faults and alloc crashes..
# we might need to see how it behaves with shm arrays and consider # we might need to see how it behaves with shm arrays and consider
# allocating them once at startup? # allocating them once at startup?
@ -281,22 +274,14 @@ def _m4(
x_start: int, x_start: int,
step: float, step: float,
) -> tuple[ ) -> int:
int, # nbins = len(i_win)
np.ndarray, # count = len(xs)
np.ndarray,
float,
float,
]:
'''
Implementation of the m4 algorithm in ``numba``:
http://www.vldb.org/pvldb/vol7/p797-jugel.pdf
'''
# these are pre-allocated and mutated by ``numba`` # these are pre-allocated and mutated by ``numba``
# code in-place. # code in-place.
y_out = np.zeros((frames, 4), ys.dtype) y_out = np.zeros((frames, 4), ys.dtype)
x_out = np.zeros(frames, xs.dtype) i_win = np.zeros(frames, xs.dtype)
bincount = 0 bincount = 0
x_left = x_start x_left = x_start
@ -310,34 +295,24 @@ def _m4(
# set all bins in the left-most entry to the starting left-most x value # set all bins in the left-most entry to the starting left-most x value
# (aka a row broadcast). # (aka a row broadcast).
x_out[bincount] = x_left i_win[bincount] = x_left
# set all y-values to the first value passed in. # set all y-values to the first value passed in.
y_out[bincount] = ys[0] y_out[bincount] = ys[0]
# full input y-data mx and mn
mx: float = -np.inf
mn: float = np.inf
# compute OHLC style max / min values per window sized x-frame.
for i in range(len(xs)): for i in range(len(xs)):
x = xs[i] x = xs[i]
y = ys[i] y = ys[i]
if x < x_left + step: # the current window "step" is [bin, bin+1) if x < x_left + step: # the current window "step" is [bin, bin+1)
ymn = y_out[bincount, 1] = min(y, y_out[bincount, 1]) y_out[bincount, 1] = min(y, y_out[bincount, 1])
ymx = y_out[bincount, 2] = max(y, y_out[bincount, 2]) y_out[bincount, 2] = max(y, y_out[bincount, 2])
y_out[bincount, 3] = y y_out[bincount, 3] = y
mx = max(mx, ymx)
mn = min(mn, ymn)
else: else:
# Find the next bin # Find the next bin
while x >= x_left + step: while x >= x_left + step:
x_left += step x_left += step
bincount += 1 bincount += 1
x_out[bincount] = x_left i_win[bincount] = x_left
y_out[bincount] = y y_out[bincount] = y
return bincount, x_out, y_out, mn, mx return bincount, i_win, y_out

View File

@ -105,10 +105,6 @@ def chart_maxmin(
mn, mx = out mn, mx = out
mx_vlm_in_view = 0 mx_vlm_in_view = 0
# TODO: we need to NOT call this to avoid a manual
# np.max/min trigger and especially on the vlm_chart
# flows which aren't shown.. like vlm?
if vlm_chart: if vlm_chart:
out = vlm_chart.maxmin() out = vlm_chart.maxmin()
if out: if out:
@ -226,9 +222,33 @@ async def graphics_update_loop(
tick_margin = 3 * tick_size tick_margin = 3 * tick_size
chart.show() chart.show()
# view = chart.view
last_quote = time.time() last_quote = time.time()
i_last = ohlcv.index i_last = ohlcv.index
# async def iter_drain_quotes():
# # NOTE: all code below this loop is expected to be synchronous
# # and thus draw instructions are not picked up jntil the next
# # wait / iteration.
# async for quotes in stream:
# while True:
# try:
# moar = stream.receive_nowait()
# except trio.WouldBlock:
# yield quotes
# break
# else:
# for sym, quote in moar.items():
# ticks_frame = quote.get('ticks')
# if ticks_frame:
# quotes[sym].setdefault(
# 'ticks', []).extend(ticks_frame)
# print('pulled extra')
# yield quotes
# async for quotes in iter_drain_quotes():
ds = linked.display_state = DisplayState(**{ ds = linked.display_state = DisplayState(**{
'quotes': {}, 'quotes': {},
'linked': linked, 'linked': linked,
@ -273,7 +293,6 @@ async def graphics_update_loop(
# chart isn't active/shown so skip render cycle and pause feed(s) # chart isn't active/shown so skip render cycle and pause feed(s)
if chart.linked.isHidden(): if chart.linked.isHidden():
print('skipping update')
chart.pause_all_feeds() chart.pause_all_feeds()
continue continue
@ -397,8 +416,10 @@ def graphics_update_cycle(
) )
or trigger_all or trigger_all
): ):
# TODO: we should track and compute whether the last
# pixel in a curve should show new data based on uppx
# and then iff update curves and shift?
chart.increment_view(steps=i_diff) chart.increment_view(steps=i_diff)
# chart.increment_view(steps=i_diff + round(append_diff - uppx))
if vlm_chart: if vlm_chart:
vlm_chart.increment_view(steps=i_diff) vlm_chart.increment_view(steps=i_diff)
@ -456,6 +477,7 @@ def graphics_update_cycle(
): ):
chart.update_graphics_from_flow( chart.update_graphics_from_flow(
chart.name, chart.name,
# do_append=uppx < update_uppx,
do_append=do_append, do_append=do_append,
) )

View File

@ -21,6 +21,7 @@ Qt event proxying and processing using ``trio`` mem chans.
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from typing import Callable from typing import Callable
from pydantic import BaseModel
import trio import trio
from PyQt5 import QtCore from PyQt5 import QtCore
from PyQt5.QtCore import QEvent, pyqtBoundSignal from PyQt5.QtCore import QEvent, pyqtBoundSignal
@ -29,8 +30,6 @@ from PyQt5.QtWidgets import (
QGraphicsSceneMouseEvent as gs_mouse, QGraphicsSceneMouseEvent as gs_mouse,
) )
from ..data.types import Struct
MOUSE_EVENTS = { MOUSE_EVENTS = {
gs_mouse.GraphicsSceneMousePress, gs_mouse.GraphicsSceneMousePress,
@ -44,10 +43,13 @@ MOUSE_EVENTS = {
# TODO: maybe consider some constrained ints down the road? # TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types # https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(Struct): class KeyboardMsg(BaseModel):
'''Unpacked Qt keyboard event data. '''Unpacked Qt keyboard event data.
''' '''
class Config:
arbitrary_types_allowed = True
event: QEvent event: QEvent
etype: int etype: int
key: int key: int
@ -55,13 +57,16 @@ class KeyboardMsg(Struct):
txt: str txt: str
def to_tuple(self) -> tuple: def to_tuple(self) -> tuple:
return tuple(self.to_dict().values()) return tuple(self.dict().values())
class MouseMsg(Struct): class MouseMsg(BaseModel):
'''Unpacked Qt keyboard event data. '''Unpacked Qt keyboard event data.
''' '''
class Config:
arbitrary_types_allowed = True
event: QEvent event: QEvent
etype: int etype: int
button: int button: int

View File

@ -337,7 +337,6 @@ class Flow(msgspec.Struct): # , frozen=True):
name: str name: str
plot: pg.PlotItem plot: pg.PlotItem
graphics: Union[Curve, BarItems] graphics: Union[Curve, BarItems]
yrange: tuple[float, float] = None
# in some cases a flow may want to change its # in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling, # graphical "type" or, "form" when downsampling,
@ -387,11 +386,10 @@ class Flow(msgspec.Struct): # , frozen=True):
lbar: int, lbar: int,
rbar: int, rbar: int,
) -> Optional[tuple[float, float]]: ) -> tuple[float, float]:
''' '''
Compute the cached max and min y-range values for a given Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar`` or ``None`` x-range determined by ``lbar`` and ``rbar``.
if no range can be determined (yet).
''' '''
rkey = (lbar, rbar) rkey = (lbar, rbar)
@ -401,44 +399,40 @@ class Flow(msgspec.Struct): # , frozen=True):
shm = self.shm shm = self.shm
if shm is None: if shm is None:
return None mxmn = None
arr = shm.array else: # new block for profiling?..
arr = shm.array
# build relative indexes into shm array # build relative indexes into shm array
# TODO: should we just add/use a method # TODO: should we just add/use a method
# on the shm to do this? # on the shm to do this?
ifirst = arr[0]['index'] ifirst = arr[0]['index']
slice_view = arr[ slice_view = arr[
lbar - ifirst: lbar - ifirst:
(rbar - ifirst) + 1 (rbar - ifirst) + 1
] ]
if not slice_view.size: if not slice_view.size:
return None mxmn = None
elif self.yrange:
mxmn = self.yrange
# print(f'{self.name} M4 maxmin: {mxmn}')
else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else: else:
view = slice_view[self.name] if self.is_ohlc:
ylow = np.min(view) ylow = np.min(slice_view['low'])
yhigh = np.max(view) yhigh = np.max(slice_view['high'])
mxmn = ylow, yhigh else:
# print(f'{self.name} MANUAL maxmin: {mxmin}') view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
# cache result for input range mxmn = ylow, yhigh
assert mxmn
self._mxmns[rkey] = mxmn
return mxmn if mxmn is not None:
# cache new mxmn result
self._mxmns[rkey] = mxmn
return mxmn
def view_range(self) -> tuple[int, int]: def view_range(self) -> tuple[int, int]:
''' '''
@ -634,13 +628,10 @@ class Flow(msgspec.Struct): # , frozen=True):
# source data so we clear our path data in prep # source data so we clear our path data in prep
# to generate a new one from original source data. # to generate a new one from original source data.
new_sample_rate = True new_sample_rate = True
showing_src_data = True
should_ds = False should_ds = False
should_redraw = True should_redraw = True
showing_src_data = True
# reset yrange to be computed from source data
self.yrange = None
# MAIN RENDER LOGIC: # MAIN RENDER LOGIC:
# - determine in view data and redraw on range change # - determine in view data and redraw on range change
# - determine downsampling ops if needed # - determine downsampling ops if needed
@ -666,10 +657,6 @@ class Flow(msgspec.Struct): # , frozen=True):
**rkwargs, **rkwargs,
) )
if showing_src_data:
# print(f"{self.name} SHOWING SOURCE")
# reset yrange to be computed from source data
self.yrange = None
if not out: if not out:
log.warning(f'{self.name} failed to render!?') log.warning(f'{self.name} failed to render!?')
@ -677,9 +664,6 @@ class Flow(msgspec.Struct): # , frozen=True):
path, data, reset = out path, data, reset = out
# if self.yrange:
# print(f'flow {self.name} yrange from m4: {self.yrange}')
# XXX: SUPER UGGGHHH... without this we get stale cache # XXX: SUPER UGGGHHH... without this we get stale cache
# graphics that don't update until you downsampler again.. # graphics that don't update until you downsampler again..
if reset: if reset:
@ -1074,7 +1058,6 @@ class Renderer(msgspec.Struct):
# xy-path data transform: convert source data to a format # xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine. # able to be passed to a `QPainterPath` rendering routine.
if not len(hist): if not len(hist):
# XXX: this might be why the profiler only has exits?
return return
x_out, y_out, connect = self.format_xy( x_out, y_out, connect = self.format_xy(
@ -1161,14 +1144,11 @@ class Renderer(msgspec.Struct):
elif should_ds and uppx > 1: elif should_ds and uppx > 1:
x_out, y_out, ymn, ymx = xy_downsample( x_out, y_out = xy_downsample(
x_out, x_out,
y_out, y_out,
uppx, uppx,
) )
self.flow.yrange = ymn, ymx
# print(f'{self.flow.name} post ds: ymn, ymx: {ymn},{ymx}')
reset = True reset = True
profiler(f'FULL PATH downsample redraw={should_ds}') profiler(f'FULL PATH downsample redraw={should_ds}')
self._in_ds = True self._in_ds = True

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D; # color: #19232D;
# width: 10px; # width: 10px;
self.setRange(0, int(slots)) self.setRange(0, slots)
self.setValue(value) self.setValue(value)

View File

@ -639,25 +639,20 @@ async def open_vlm_displays(
names: list[str], names: list[str],
) -> tuple[float, float]: ) -> tuple[float, float]:
'''
Flows "group" maxmin loop; assumes all named flows
are in the same co-domain and thus can be sorted
as one set.
Iterates all the named flows and calls the chart
api to find their range values and return.
TODO: really we should probably have a more built-in API
for this?
'''
mx = 0 mx = 0
for name in names: for name in names:
ymn, ymx = chart.maxmin(name=name)
mx = max(mx, ymx) mxmn = chart.maxmin(name=name)
if mxmn:
ymax = mxmn[1]
if ymax > mx:
mx = ymax
return 0, mx return 0, mx
chart.view.maxmin = partial(multi_maxmin, names=['volume'])
# TODO: fix the x-axis label issue where if you put # TODO: fix the x-axis label issue where if you put
# the axis on the left it's totally not lined up... # the axis on the left it's totally not lined up...
# show volume units value on LHS (for dinkus) # show volume units value on LHS (for dinkus)
@ -781,7 +776,6 @@ async def open_vlm_displays(
) -> None: ) -> None:
for name in names: for name in names:
if 'dark' in name: if 'dark' in name:
color = dark_vlm_color color = dark_vlm_color
elif 'rate' in name: elif 'rate' in name:

View File

@ -923,7 +923,6 @@ class ChartView(ViewBox):
# XXX: super important to be aware of this. # XXX: super important to be aware of this.
# or not flow.graphics.isVisible() # or not flow.graphics.isVisible()
): ):
# print(f'skipping {flow.name}')
continue continue
# pass in no array which will read and render from the last # pass in no array which will read and render from the last

View File

@ -22,9 +22,12 @@ from __future__ import annotations
from typing import ( from typing import (
Optional, Generic, Optional, Generic,
TypeVar, Callable, TypeVar, Callable,
Literal,
) )
import enum
import sys
# from pydantic import BaseModel, validator from pydantic import BaseModel, validator
from pydantic.generics import GenericModel from pydantic.generics import GenericModel
from PyQt5.QtWidgets import ( from PyQt5.QtWidgets import (
QWidget, QWidget,
@ -35,7 +38,6 @@ from ._forms import (
# FontScaledDelegate, # FontScaledDelegate,
Edit, Edit,
) )
from ..data.types import Struct
DataType = TypeVar('DataType') DataType = TypeVar('DataType')
@ -60,7 +62,7 @@ class Selection(Field[DataType], Generic[DataType]):
options: dict[str, DataType] options: dict[str, DataType]
# value: DataType = None # value: DataType = None
# @validator('value') # , always=True) @validator('value') # , always=True)
def set_value_first( def set_value_first(
cls, cls,
@ -98,7 +100,7 @@ class Edit(Field[DataType], Generic[DataType]):
widget_factory = Edit widget_factory = Edit
class AllocatorPane(Struct): class AllocatorPane(BaseModel):
account = Selection[str]( account = Selection[str](
options=dict.fromkeys( options=dict.fromkeys(

View File

@ -49,17 +49,12 @@ def xy_downsample(
x_spacer: float = 0.5, x_spacer: float = 0.5,
) -> tuple[ ) -> tuple[np.ndarray, np.ndarray]:
np.ndarray,
np.ndarray,
float,
float,
]:
# downsample whenever more then 1 pixels per datum can be shown. # downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing # always refresh data bounds until we get diffing
# working properly, see above.. # working properly, see above..
bins, x, y, ymn, ymx = ds_m4( bins, x, y = ds_m4(
x, x,
y, y,
uppx, uppx,
@ -72,7 +67,7 @@ def xy_downsample(
)).flatten() )).flatten()
y = y.flatten() y = y.flatten()
return x, y, ymn, ymx return x, y
@njit( @njit(

View File

@ -19,7 +19,6 @@ Position info and display
""" """
from __future__ import annotations from __future__ import annotations
from copy import copy
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial from functools import partial
from math import floor, copysign from math import floor, copysign
@ -106,8 +105,8 @@ async def update_pnl_from_feed(
# compute and display pnl status # compute and display pnl status
order_mode.pane.pnl_label.format( order_mode.pane.pnl_label.format(
pnl=copysign(1, size) * pnl( pnl=copysign(1, size) * pnl(
# live.be_price, # live.avg_price,
order_mode.current_pp.live_pp.be_price, order_mode.current_pp.live_pp.avg_price,
tick['price'], tick['price'],
), ),
) )
@ -357,7 +356,7 @@ class SettingsPane:
# last historical close price # last historical close price
last = feed.shm.array[-1][['close']][0] last = feed.shm.array[-1][['close']][0]
pnl_value = copysign(1, size) * pnl( pnl_value = copysign(1, size) * pnl(
tracker.live_pp.be_price, tracker.live_pp.avg_price,
last, last,
) )
@ -477,7 +476,7 @@ class PositionTracker:
self.alloc = alloc self.alloc = alloc
self.startup_pp = startup_pp self.startup_pp = startup_pp
self.live_pp = copy(startup_pp) self.live_pp = startup_pp.copy()
view = chart.getViewBox() view = chart.getViewBox()
@ -557,7 +556,7 @@ class PositionTracker:
pp = position or self.live_pp pp = position or self.live_pp
self.update_line( self.update_line(
pp.be_price, pp.avg_price,
pp.size, pp.size,
self.chart.linked.symbol.lot_size_digits, self.chart.linked.symbol.lot_size_digits,
) )
@ -571,7 +570,7 @@ class PositionTracker:
self.hide() self.hide()
else: else:
self._level_marker.level = pp.be_price self._level_marker.level = pp.avg_price
# these updates are critical to avoid lag on view/scene changes # these updates are critical to avoid lag on view/scene changes
self._level_marker.update() # trigger paint self._level_marker.update() # trigger paint

View File

@ -27,20 +27,20 @@ import time
from typing import Optional, Dict, Callable, Any from typing import Optional, Dict, Callable, Any
import uuid import uuid
from pydantic import BaseModel
import tractor import tractor
import trio import trio
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
from .. import config from .. import config
from ..pp import Position
from ..clearing._client import open_ems, OrderBook from ..clearing._client import open_ems, OrderBook
from ..clearing._allocate import ( from ..clearing._allocate import (
mk_allocator, mk_allocator,
Position,
) )
from ._style import _font from ._style import _font
from ..data._source import Symbol from ..data._source import Symbol
from ..data.feed import Feed from ..data.feed import Feed
from ..data.types import Struct
from ..log import get_logger from ..log import get_logger
from ._editors import LineEditor, ArrowEditor from ._editors import LineEditor, ArrowEditor
from ._lines import order_line, LevelLine from ._lines import order_line, LevelLine
@ -58,9 +58,8 @@ from ._forms import open_form_input_handling
log = get_logger(__name__) log = get_logger(__name__)
class OrderDialog(Struct): class OrderDialog(BaseModel):
''' '''Trade dialogue meta-data describing the lifetime
Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart. of an order submission to ``emsd`` from a chart.
''' '''
@ -73,6 +72,10 @@ class OrderDialog(Struct):
msgs: dict[str, dict] = {} msgs: dict[str, dict] = {}
fills: Dict[str, Any] = {} fills: Dict[str, Any] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def on_level_change_update_next_order_info( def on_level_change_update_next_order_info(
@ -84,8 +87,7 @@ def on_level_change_update_next_order_info(
tracker: PositionTracker, tracker: PositionTracker,
) -> None: ) -> None:
''' '''A callback applied for each level change to the line
A callback applied for each level change to the line
which will recompute the order size based on allocator which will recompute the order size based on allocator
settings. this is assigned inside settings. this is assigned inside
``OrderMode.line_from_order()`` ``OrderMode.line_from_order()``
@ -264,8 +266,7 @@ class OrderMode:
self, self,
) -> OrderDialog: ) -> OrderDialog:
''' '''Send execution order to EMS return a level line to
Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
@ -274,9 +275,13 @@ class OrderMode:
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
order = staged.copy() fqsn = symbol.front_fqsn()
order.oid = oid order = staged.copy(
order.symbol = symbol.front_fqsn() update={
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order( line = self.line_from_order(
order, order,
@ -572,9 +577,9 @@ async def open_order_mode(
providers=symbol.brokers providers=symbol.brokers
) )
# XXX: ``brokerd`` delivers a set of account names that it # XXX: ``brokerd`` delivers a set of account names that it allows
# allows use of but the user also can define the accounts they'd # use of but the user also can define the accounts they'd like
# like to use, in order, in their `brokers.toml` file. # to use, in order, in their `brokers.toml` file.
accounts = {} accounts = {}
for name in brokerd_accounts: for name in brokerd_accounts:
# ensure name is in ``brokers.toml`` # ensure name is in ``brokers.toml``
@ -587,21 +592,10 @@ async def open_order_mode(
iter(accounts.keys()) iter(accounts.keys())
) if accounts else 'paper' ) if accounts else 'paper'
# Pack position messages by account, should only be one-to-one.
# NOTE: requires the backend exactly specifies # NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg. # the expected symbol key in its positions msg.
pps_by_account = {} pp_msgs = position_msgs.get(symkey, ())
for (broker, acctid), msgs in position_msgs.items(): pps_by_account = {msg['account']: msg for msg in pp_msgs}
for msg in msgs:
sym = msg['symbol']
if (
sym == symkey or
# mega-UGH, i think we need to fix the FQSN stuff sooner
# then later..
sym == symkey.removesuffix(f'.{broker}')
):
pps_by_account[acctid] = msg
# update pp trackers with data relayed from ``brokerd``. # update pp trackers with data relayed from ``brokerd``.
for account_name in accounts: for account_name in accounts:
@ -610,10 +604,7 @@ async def open_order_mode(
startup_pp = Position( startup_pp = Position(
symbol=symbol, symbol=symbol,
size=0, size=0,
be_price=0, avg_price=0,
# XXX: BLEH, do we care about this on the client side?
bsuid=symbol,
) )
msg = pps_by_account.get(account_name) msg = pps_by_account.get(account_name)
if msg: if msg:
@ -851,9 +842,7 @@ async def process_trades_and_update_ui(
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
broker_msg = msg['brokerd_msg'] broker_msg = msg['brokerd_msg']
log.warning( log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}')
f'Order {oid} failed with:\n{pformat(broker_msg)}'
)
elif resp in ( elif resp in (
'dark_triggered' 'dark_triggered'

View File

@ -41,7 +41,6 @@ setup(
}, },
install_requires=[ install_requires=[
'toml', 'toml',
'tomli', # fastest pure py reader
'click', 'click',
'colorlog', 'colorlog',
'attrs', 'attrs',