Compare commits
28 Commits
gitea_feat
...
dpbackup
| Author | SHA1 | Date |
|---|---|---|
|
|
72072b5737 | |
|
|
381b3121d6 | |
|
|
8984c1b60b | |
|
|
2e3cac1407 | |
|
|
b250a48b8d | |
|
|
3dfe7ef8dd | |
|
|
2774a7e6ec | |
|
|
7b6318f025 | |
|
|
b0ee764423 | |
|
|
a63f0ee1c0 | |
|
|
7d49335f8b | |
|
|
9d176c2dda | |
|
|
6cc02bd8f5 | |
|
|
693c7ce12a | |
|
|
5e60c79664 | |
|
|
7f779dda19 | |
|
|
5100036e10 | |
|
|
78b9d90202 | |
|
|
9300b3d6db | |
|
|
6d13c8255f | |
|
|
3765c61f2d | |
|
|
cb7a9b9449 | |
|
|
f1192dff09 | |
|
|
9e8d32cdff | |
|
|
c74741228f | |
|
|
f38eef2bf4 | |
|
|
e757e1f277 | |
|
|
4823f87422 |
|
|
@ -22,10 +22,10 @@ from typing import Optional, Union, Callable, Any
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from msgspec import Struct
|
||||||
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
|
||||||
|
|
||||||
from .log import get_logger, get_console_log
|
from .log import get_logger, get_console_log
|
||||||
from .brokers import get_brokermod
|
from .brokers import get_brokermod
|
||||||
|
|
@ -47,16 +47,13 @@ _root_modules = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class Services(BaseModel):
|
class Services(Struct):
|
||||||
|
|
||||||
actor_n: tractor._supervise.ActorNursery
|
actor_n: tractor._supervise.ActorNursery
|
||||||
service_n: trio.Nursery
|
service_n: trio.Nursery
|
||||||
debug_mode: bool # tractor sub-actor debug mode flag
|
debug_mode: bool # tractor sub-actor debug mode flag
|
||||||
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
|
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
|
||||||
|
|
||||||
class Config:
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
|
|
||||||
async def start_service_task(
|
async def start_service_task(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
|
|
||||||
|
|
@ -34,13 +34,13 @@ from fuzzywuzzy import process as fuzzy
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import tractor
|
import tractor
|
||||||
from pydantic.dataclasses import dataclass
|
from pydantic.dataclasses import dataclass
|
||||||
from pydantic import BaseModel
|
|
||||||
import wsproto
|
import wsproto
|
||||||
|
|
||||||
from .._cacheables import open_cached_client
|
from .._cacheables import open_cached_client
|
||||||
from ._util import resproc, SymbolNotFound
|
from ._util import resproc, SymbolNotFound
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from ..data import ShmArray
|
from ..data import ShmArray
|
||||||
|
from ..data.types import Struct
|
||||||
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
@ -79,12 +79,14 @@ _show_wap_in_history = False
|
||||||
|
|
||||||
|
|
||||||
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
|
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
|
||||||
class Pair(BaseModel):
|
class Pair(Struct, frozen=True):
|
||||||
symbol: str
|
symbol: str
|
||||||
status: str
|
status: str
|
||||||
|
|
||||||
baseAsset: str
|
baseAsset: str
|
||||||
baseAssetPrecision: int
|
baseAssetPrecision: int
|
||||||
|
cancelReplaceAllowed: bool
|
||||||
|
allowTrailingStop: bool
|
||||||
quoteAsset: str
|
quoteAsset: str
|
||||||
quotePrecision: int
|
quotePrecision: int
|
||||||
quoteAssetPrecision: int
|
quoteAssetPrecision: int
|
||||||
|
|
@ -287,7 +289,7 @@ async def get_client() -> Client:
|
||||||
|
|
||||||
|
|
||||||
# validation type
|
# validation type
|
||||||
class AggTrade(BaseModel):
|
class AggTrade(Struct):
|
||||||
e: str # Event type
|
e: str # Event type
|
||||||
E: int # Event time
|
E: int # Event time
|
||||||
s: str # Symbol
|
s: str # Symbol
|
||||||
|
|
@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
|
||||||
|
|
||||||
elif msg.get('e') == 'aggTrade':
|
elif msg.get('e') == 'aggTrade':
|
||||||
|
|
||||||
# validate
|
# NOTE: this is purely for a definition, ``msgspec.Struct``
|
||||||
|
# does not runtime-validate until you decode/encode.
|
||||||
|
# see: https://jcristharif.com/msgspec/structs.html#type-validation
|
||||||
msg = AggTrade(**msg)
|
msg = AggTrade(**msg)
|
||||||
|
|
||||||
# TODO: type out and require this quote format
|
# TODO: type out and require this quote format
|
||||||
|
|
@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
|
||||||
'brokerd_ts': time.time(),
|
'brokerd_ts': time.time(),
|
||||||
'ticks': [{
|
'ticks': [{
|
||||||
'type': 'trade',
|
'type': 'trade',
|
||||||
'price': msg.p,
|
'price': float(msg.p),
|
||||||
'size': msg.q,
|
'size': float(msg.q),
|
||||||
'broker_ts': msg.T,
|
'broker_ts': msg.T,
|
||||||
}],
|
}],
|
||||||
}
|
}
|
||||||
|
|
@ -448,7 +452,7 @@ async def stream_quotes(
|
||||||
d = cache[sym.upper()]
|
d = cache[sym.upper()]
|
||||||
syminfo = Pair(**d) # validation
|
syminfo = Pair(**d) # validation
|
||||||
|
|
||||||
si = sym_infos[sym] = syminfo.dict()
|
si = sym_infos[sym] = syminfo.to_dict()
|
||||||
|
|
||||||
# XXX: after manually inspecting the response format we
|
# XXX: after manually inspecting the response format we
|
||||||
# just directly pick out the info we need
|
# just directly pick out the info we need
|
||||||
|
|
|
||||||
|
|
@ -148,7 +148,7 @@ async def handle_order_requests(
|
||||||
oid=request_msg['oid'],
|
oid=request_msg['oid'],
|
||||||
symbol=request_msg['symbol'],
|
symbol=request_msg['symbol'],
|
||||||
reason=f'No account found: `{account}` ?',
|
reason=f'No account found: `{account}` ?',
|
||||||
).dict())
|
))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
client = _accounts2clients.get(account)
|
client = _accounts2clients.get(account)
|
||||||
|
|
@ -161,7 +161,7 @@ async def handle_order_requests(
|
||||||
oid=request_msg['oid'],
|
oid=request_msg['oid'],
|
||||||
symbol=request_msg['symbol'],
|
symbol=request_msg['symbol'],
|
||||||
reason=f'No api client loaded for account: `{account}` ?',
|
reason=f'No api client loaded for account: `{account}` ?',
|
||||||
).dict())
|
))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if action in {'buy', 'sell'}:
|
if action in {'buy', 'sell'}:
|
||||||
|
|
@ -188,7 +188,7 @@ async def handle_order_requests(
|
||||||
oid=request_msg['oid'],
|
oid=request_msg['oid'],
|
||||||
symbol=request_msg['symbol'],
|
symbol=request_msg['symbol'],
|
||||||
reason='Order already active?',
|
reason='Order already active?',
|
||||||
).dict())
|
))
|
||||||
|
|
||||||
# deliver ack that order has been submitted to broker routing
|
# deliver ack that order has been submitted to broker routing
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
|
|
@ -197,9 +197,8 @@ async def handle_order_requests(
|
||||||
oid=order.oid,
|
oid=order.oid,
|
||||||
# broker specific request id
|
# broker specific request id
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
time_ns=time.time_ns(),
|
|
||||||
account=account,
|
account=account,
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
elif action == 'cancel':
|
elif action == 'cancel':
|
||||||
|
|
@ -559,7 +558,7 @@ async def trades_dialogue(
|
||||||
cids2pps,
|
cids2pps,
|
||||||
validate=True,
|
validate=True,
|
||||||
)
|
)
|
||||||
all_positions.extend(msg.dict() for msg in msgs)
|
all_positions.extend(msg for msg in msgs)
|
||||||
|
|
||||||
if not all_positions and cids2pps:
|
if not all_positions and cids2pps:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
|
@ -665,7 +664,7 @@ async def emit_pp_update(
|
||||||
msg = msgs[0]
|
msg = msgs[0]
|
||||||
break
|
break
|
||||||
|
|
||||||
await ems_stream.send(msg.dict())
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
|
|
||||||
async def deliver_trade_events(
|
async def deliver_trade_events(
|
||||||
|
|
@ -743,7 +742,7 @@ async def deliver_trade_events(
|
||||||
|
|
||||||
broker_details={'name': 'ib'},
|
broker_details={'name': 'ib'},
|
||||||
)
|
)
|
||||||
await ems_stream.send(msg.dict())
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
case 'fill':
|
case 'fill':
|
||||||
|
|
||||||
|
|
@ -803,7 +802,7 @@ async def deliver_trade_events(
|
||||||
broker_time=trade_entry['broker_time'],
|
broker_time=trade_entry['broker_time'],
|
||||||
|
|
||||||
)
|
)
|
||||||
await ems_stream.send(msg.dict())
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
# 2 cases:
|
# 2 cases:
|
||||||
# - fill comes first or
|
# - fill comes first or
|
||||||
|
|
@ -879,7 +878,7 @@ async def deliver_trade_events(
|
||||||
cid, msg = pack_position(item)
|
cid, msg = pack_position(item)
|
||||||
# acctid = msg.account = accounts_def.inverse[msg.account]
|
# acctid = msg.account = accounts_def.inverse[msg.account]
|
||||||
# cuck ib and it's shitty fifo sys for pps!
|
# cuck ib and it's shitty fifo sys for pps!
|
||||||
# await ems_stream.send(msg.dict())
|
# await ems_stream.send(msg)
|
||||||
|
|
||||||
case 'event':
|
case 'event':
|
||||||
|
|
||||||
|
|
@ -891,7 +890,7 @@ async def deliver_trade_events(
|
||||||
# level...
|
# level...
|
||||||
# reqid = item.get('reqid', 0)
|
# reqid = item.get('reqid', 0)
|
||||||
# if getattr(msg, 'reqid', 0) < -1:
|
# if getattr(msg, 'reqid', 0) < -1:
|
||||||
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
|
# log.info(f"TWS triggered trade\n{pformat(msg)}")
|
||||||
|
|
||||||
# msg.reqid = 'tws-' + str(-1 * reqid)
|
# msg.reqid = 'tws-' + str(-1 * reqid)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,64 @@
|
||||||
|
``kraken`` backend
|
||||||
|
------------------
|
||||||
|
though they don't have the most liquidity of all the cexes they sure are
|
||||||
|
accommodating to those of us who appreciate a little ``xmr``.
|
||||||
|
|
||||||
|
status
|
||||||
|
******
|
||||||
|
current support is *production grade* and both real-time data and order
|
||||||
|
management should be correct and fast. this backend is used by core devs
|
||||||
|
for live trading.
|
||||||
|
|
||||||
|
|
||||||
|
config
|
||||||
|
******
|
||||||
|
In order to get order mode support your ``brokers.toml``
|
||||||
|
needs to have something like the following:
|
||||||
|
|
||||||
|
.. code:: toml
|
||||||
|
|
||||||
|
[kraken]
|
||||||
|
accounts.spot = 'spot'
|
||||||
|
key_descr = "spot"
|
||||||
|
api_key = "69696969696969696696969696969696969696969696969696969696"
|
||||||
|
secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696"
|
||||||
|
|
||||||
|
|
||||||
|
If everything works correctly you should see any current positions
|
||||||
|
loaded in the pps pane on chart load and you should also be able to
|
||||||
|
check your trade records in the file::
|
||||||
|
|
||||||
|
<pikerk_conf_dir>/ledgers/trades_kraken_spot.toml
|
||||||
|
|
||||||
|
|
||||||
|
An example ledger file will have entries written verbatim from the
|
||||||
|
trade events schema:
|
||||||
|
|
||||||
|
.. code:: toml
|
||||||
|
|
||||||
|
[TFJBKK-SMBZS-VJ4UWS]
|
||||||
|
ordertxid = "SMBZSA-7CNQU-3HWLNJ"
|
||||||
|
postxid = "SMBZSE-M7IF5-CFI7LT"
|
||||||
|
pair = "XXMRZEUR"
|
||||||
|
time = 1655691993.4133966
|
||||||
|
type = "buy"
|
||||||
|
ordertype = "limit"
|
||||||
|
price = "103.97000000"
|
||||||
|
cost = "499.99999977"
|
||||||
|
fee = "0.80000000"
|
||||||
|
vol = "4.80907954"
|
||||||
|
margin = "0.00000000"
|
||||||
|
misc = ""
|
||||||
|
|
||||||
|
|
||||||
|
your ``pps.toml`` file will have position entries like,
|
||||||
|
|
||||||
|
.. code:: toml
|
||||||
|
|
||||||
|
[kraken.spot."xmreur.kraken"]
|
||||||
|
size = 4.80907954
|
||||||
|
be_price = 103.97000000
|
||||||
|
bsuid = "XXMRZEUR"
|
||||||
|
clears = [
|
||||||
|
{ tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" },
|
||||||
|
]
|
||||||
|
|
@ -282,6 +282,7 @@ class Client:
|
||||||
"volume": str(size),
|
"volume": str(size),
|
||||||
}
|
}
|
||||||
return await self.endpoint('AddOrder', data)
|
return await self.endpoint('AddOrder', data)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Edit order data for kraken api
|
# Edit order data for kraken api
|
||||||
data["txid"] = reqid
|
data["txid"] = reqid
|
||||||
|
|
|
||||||
|
|
@ -18,21 +18,23 @@
|
||||||
Order api and machinery
|
Order api and machinery
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
contextmanager as cm,
|
||||||
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import chain
|
from itertools import chain, count
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
# Callable,
|
Union,
|
||||||
# Optional,
|
|
||||||
# Union,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from async_generator import aclosing
|
||||||
|
from bidict import bidict
|
||||||
import pendulum
|
import pendulum
|
||||||
from pydantic import BaseModel
|
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
import wsproto
|
import wsproto
|
||||||
|
|
@ -61,179 +63,137 @@ from .feed import (
|
||||||
stream_messages,
|
stream_messages,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
MsgUnion = Union[
|
||||||
class Trade(BaseModel):
|
BrokerdCancel,
|
||||||
'''
|
BrokerdError,
|
||||||
Trade class that helps parse and validate ownTrades stream
|
BrokerdFill,
|
||||||
|
BrokerdOrder,
|
||||||
'''
|
BrokerdOrderAck,
|
||||||
reqid: str # kraken order transaction id
|
BrokerdPosition,
|
||||||
action: str # buy or sell
|
BrokerdStatus,
|
||||||
price: float # price of asset
|
]
|
||||||
size: float # vol of asset
|
|
||||||
broker_time: str # e.g GTC, GTD
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_order_requests(
|
async def handle_order_requests(
|
||||||
|
|
||||||
|
ws: NoBsWs,
|
||||||
client: Client,
|
client: Client,
|
||||||
ems_order_stream: tractor.MsgStream,
|
ems_order_stream: tractor.MsgStream,
|
||||||
|
token: str,
|
||||||
|
emsflow: dict[str, list[MsgUnion]],
|
||||||
|
ids: bidict[str, int],
|
||||||
|
reqids2txids: dict[int, str],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
Process new order submission requests from the EMS
|
||||||
|
and deliver acks or errors.
|
||||||
|
|
||||||
request_msg: dict
|
'''
|
||||||
|
# XXX: UGH, let's unify this.. with ``msgspec``.
|
||||||
|
msg: dict[str, Any]
|
||||||
order: BrokerdOrder
|
order: BrokerdOrder
|
||||||
|
counter = count()
|
||||||
|
|
||||||
async for request_msg in ems_order_stream:
|
async for msg in ems_order_stream:
|
||||||
log.info(
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||||
'Received order request:\n'
|
match msg:
|
||||||
f'{pformat(request_msg)}'
|
case {
|
||||||
)
|
'action': 'cancel',
|
||||||
|
}:
|
||||||
|
cancel = BrokerdCancel(**msg)
|
||||||
|
last = emsflow[cancel.oid]
|
||||||
|
reqid = ids[cancel.oid]
|
||||||
|
txid = reqids2txids[reqid]
|
||||||
|
|
||||||
action = request_msg['action']
|
# call ws api to cancel:
|
||||||
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
|
await ws.send_msg({
|
||||||
|
'event': 'cancelOrder',
|
||||||
|
'token': token,
|
||||||
|
'reqid': reqid,
|
||||||
|
'txid': [txid], # should be txid from submission
|
||||||
|
})
|
||||||
|
|
||||||
if action in {'buy', 'sell'}:
|
case {
|
||||||
|
'account': 'kraken.spot' as account,
|
||||||
|
'action': action,
|
||||||
|
} if action in {'buy', 'sell'}:
|
||||||
|
|
||||||
account = request_msg['account']
|
# validate
|
||||||
if account != 'kraken.spot':
|
order = BrokerdOrder(**msg)
|
||||||
log.error(
|
|
||||||
'This is a kraken account, \
|
|
||||||
only a `kraken.spot` selection is valid'
|
|
||||||
)
|
|
||||||
await ems_order_stream.send(BrokerdError(
|
|
||||||
oid=request_msg['oid'],
|
|
||||||
symbol=request_msg['symbol'],
|
|
||||||
|
|
||||||
# reason=f'Kraken only, No account found: `{account}` ?',
|
# logic from old `Client.submit_limit()`
|
||||||
reason=(
|
if order.oid in ids:
|
||||||
'Kraken only, order mode disabled due to '
|
ep = 'editOrder'
|
||||||
'https://github.com/pikers/piker/issues/299'
|
reqid = ids[order.oid] # integer not txid
|
||||||
),
|
txid = reqids2txids[reqid]
|
||||||
|
extra = {
|
||||||
|
'orderid': txid, # txid
|
||||||
|
}
|
||||||
|
|
||||||
).dict())
|
|
||||||
continue
|
|
||||||
|
|
||||||
# validate
|
|
||||||
order = BrokerdOrder(**request_msg)
|
|
||||||
# call our client api to submit the order
|
|
||||||
resp = await client.submit_limit(
|
|
||||||
symbol=order.symbol,
|
|
||||||
price=order.price,
|
|
||||||
action=order.action,
|
|
||||||
size=order.size,
|
|
||||||
reqid=order.reqid,
|
|
||||||
)
|
|
||||||
|
|
||||||
err = resp['error']
|
|
||||||
if err:
|
|
||||||
oid = order.oid
|
|
||||||
log.error(f'Failed to submit order: {oid}')
|
|
||||||
|
|
||||||
await ems_order_stream.send(
|
|
||||||
BrokerdError(
|
|
||||||
oid=order.oid,
|
|
||||||
reqid=order.reqid,
|
|
||||||
symbol=order.symbol,
|
|
||||||
reason="Failed order submission",
|
|
||||||
broker_details=resp
|
|
||||||
).dict()
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# TODO: handle multiple orders (cancels?)
|
|
||||||
# txid is an array of strings
|
|
||||||
if order.reqid is None:
|
|
||||||
reqid = resp['result']['txid'][0]
|
|
||||||
else:
|
else:
|
||||||
# update the internal pairing of oid to krakens
|
ep = 'addOrder'
|
||||||
# txid with the new txid that is returned on edit
|
reqid = next(counter)
|
||||||
reqid = resp['result']['txid']
|
ids[order.oid] = reqid
|
||||||
|
log.debug(
|
||||||
|
f"Adding order {reqid}\n"
|
||||||
|
f'{ids}'
|
||||||
|
)
|
||||||
|
extra = {
|
||||||
|
'ordertype': 'limit',
|
||||||
|
'type': order.action,
|
||||||
|
}
|
||||||
|
|
||||||
# deliver ack that order has been submitted to broker routing
|
psym = order.symbol.upper()
|
||||||
await ems_order_stream.send(
|
pair = f'{psym[:3]}/{psym[3:]}'
|
||||||
BrokerdOrderAck(
|
|
||||||
|
|
||||||
# ems order request id
|
# call ws api to submit the order:
|
||||||
oid=order.oid,
|
# https://docs.kraken.com/websockets/#message-addOrder
|
||||||
|
req = {
|
||||||
|
'event': ep,
|
||||||
|
'token': token,
|
||||||
|
|
||||||
# broker specific request id
|
'reqid': reqid, # remapped-to-int uid from ems
|
||||||
reqid=reqid,
|
'pair': pair,
|
||||||
|
'price': str(order.price),
|
||||||
|
'volume': str(order.size),
|
||||||
|
|
||||||
# account the made the order
|
# only ensures request is valid, nothing more
|
||||||
account=order.account
|
# validate: 'true',
|
||||||
|
|
||||||
).dict()
|
} | extra
|
||||||
|
log.info(f'Submitting WS order request:\n{pformat(req)}')
|
||||||
|
await ws.send_msg(req)
|
||||||
|
|
||||||
|
resp = BrokerdOrderAck(
|
||||||
|
oid=order.oid, # ems order request id
|
||||||
|
reqid=reqid, # our custom int mapping
|
||||||
|
account=account, # piker account
|
||||||
)
|
)
|
||||||
|
await ems_order_stream.send(resp)
|
||||||
|
|
||||||
elif action == 'cancel':
|
# placehold for sanity checking in relay loop
|
||||||
msg = BrokerdCancel(**request_msg)
|
emsflow.setdefault(order.oid, []).append(order)
|
||||||
|
|
||||||
# Send order cancellation to kraken
|
case _:
|
||||||
resp = await client.submit_cancel(
|
account = msg.get('account')
|
||||||
reqid=msg.reqid
|
if account != 'kraken.spot':
|
||||||
)
|
log.error(
|
||||||
|
'This is a kraken account, \
|
||||||
# Check to make sure there was no error returned by
|
only a `kraken.spot` selection is valid'
|
||||||
# the kraken endpoint. Assert one order was cancelled.
|
)
|
||||||
try:
|
|
||||||
result = resp['result']
|
|
||||||
count = result['count']
|
|
||||||
|
|
||||||
# check for 'error' key if we received no 'result'
|
|
||||||
except KeyError:
|
|
||||||
error = resp.get('error')
|
|
||||||
|
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdError(
|
BrokerdError(
|
||||||
oid=msg.oid,
|
oid=msg['oid'],
|
||||||
reqid=msg.reqid,
|
symbol=msg['symbol'],
|
||||||
symbol=msg.symbol,
|
reason=(
|
||||||
reason="Failed order cancel",
|
'Invalid request msg:\n{msg}'
|
||||||
broker_details=resp
|
),
|
||||||
).dict()
|
|
||||||
)
|
|
||||||
|
|
||||||
if not error:
|
|
||||||
raise BrokerError(f'Unknown order cancel response: {resp}')
|
|
||||||
|
|
||||||
else:
|
|
||||||
if not count: # no orders were cancelled?
|
|
||||||
|
|
||||||
# XXX: what exactly is this from and why would we care?
|
|
||||||
# there doesn't seem to be any docs here?
|
|
||||||
# https://docs.kraken.com/rest/#operation/cancelOrder
|
|
||||||
|
|
||||||
# Check to make sure the cancellation is NOT pending,
|
|
||||||
# then send the confirmation to the ems order stream
|
|
||||||
pending = result.get('pending')
|
|
||||||
if pending:
|
|
||||||
log.error(f'Order {oid} cancel was not yet successful')
|
|
||||||
|
|
||||||
await ems_order_stream.send(
|
|
||||||
BrokerdError(
|
|
||||||
oid=msg.oid,
|
|
||||||
reqid=msg.reqid,
|
|
||||||
symbol=msg.symbol,
|
|
||||||
# TODO: maybe figure out if pending
|
|
||||||
# cancels will eventually get cancelled
|
|
||||||
reason="Order cancel is still pending?",
|
|
||||||
broker_details=resp
|
|
||||||
).dict()
|
|
||||||
)
|
|
||||||
|
|
||||||
else: # order cancel success case.
|
|
||||||
|
|
||||||
await ems_order_stream.send(
|
|
||||||
BrokerdStatus(
|
|
||||||
reqid=msg.reqid,
|
|
||||||
account=msg.account,
|
|
||||||
time_ns=time.time_ns(),
|
|
||||||
status='cancelled',
|
|
||||||
reason='Order cancelled',
|
|
||||||
broker_details={'name': 'kraken'}
|
|
||||||
).dict()
|
|
||||||
)
|
)
|
||||||
else:
|
)
|
||||||
log.error(f'Unknown order command: {request_msg}')
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
|
@ -310,13 +270,13 @@ async def trades_dialogue(
|
||||||
log.info(
|
log.info(
|
||||||
f'Loaded {len(trades)} trades from account `{acc_name}`'
|
f'Loaded {len(trades)} trades from account `{acc_name}`'
|
||||||
)
|
)
|
||||||
trans = await update_ledger(acctid, trades)
|
with open_ledger(acctid, trades) as trans:
|
||||||
active, closed = pp.update_pps_conf(
|
active, closed = pp.update_pps_conf(
|
||||||
'kraken',
|
'kraken',
|
||||||
acctid,
|
acctid,
|
||||||
trade_records=trans,
|
trade_records=trans,
|
||||||
ledger_reload={}.fromkeys(t.bsuid for t in trans),
|
ledger_reload={}.fromkeys(t.bsuid for t in trans),
|
||||||
)
|
)
|
||||||
|
|
||||||
position_msgs: list[dict] = []
|
position_msgs: list[dict] = []
|
||||||
pps: dict[int, pp.Position]
|
pps: dict[int, pp.Position]
|
||||||
|
|
@ -330,7 +290,7 @@ async def trades_dialogue(
|
||||||
avg_price=p.be_price,
|
avg_price=p.be_price,
|
||||||
currency='',
|
currency='',
|
||||||
)
|
)
|
||||||
position_msgs.append(msg.dict())
|
position_msgs.append(msg)
|
||||||
|
|
||||||
await ctx.started(
|
await ctx.started(
|
||||||
(position_msgs, [acc_name])
|
(position_msgs, [acc_name])
|
||||||
|
|
@ -357,135 +317,441 @@ async def trades_dialogue(
|
||||||
),
|
),
|
||||||
) as ws,
|
) as ws,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
|
aclosing(stream_messages(ws)) as stream,
|
||||||
):
|
):
|
||||||
|
# task local msg dialog tracking
|
||||||
|
emsflow: dict[
|
||||||
|
str,
|
||||||
|
list[MsgUnion],
|
||||||
|
] = {}
|
||||||
|
|
||||||
|
# 2way map for ems ids to kraken int reqids..
|
||||||
|
ids: bidict[str, int] = bidict()
|
||||||
|
reqids2txids: dict[int, str] = {}
|
||||||
|
|
||||||
# task for processing inbound requests from ems
|
# task for processing inbound requests from ems
|
||||||
n.start_soon(handle_order_requests, client, ems_stream)
|
n.start_soon(
|
||||||
|
handle_order_requests,
|
||||||
|
ws,
|
||||||
|
client,
|
||||||
|
ems_stream,
|
||||||
|
token,
|
||||||
|
emsflow,
|
||||||
|
ids,
|
||||||
|
reqids2txids,
|
||||||
|
)
|
||||||
|
|
||||||
count: int = 0
|
# enter relay loop
|
||||||
|
await handle_order_updates(
|
||||||
|
ws,
|
||||||
|
stream,
|
||||||
|
ems_stream,
|
||||||
|
emsflow,
|
||||||
|
ids,
|
||||||
|
reqids2txids,
|
||||||
|
trans,
|
||||||
|
acctid,
|
||||||
|
acc_name,
|
||||||
|
token,
|
||||||
|
)
|
||||||
|
|
||||||
# process and relay trades events to ems
|
|
||||||
|
async def handle_order_updates(
|
||||||
|
ws: NoBsWs,
|
||||||
|
ws_stream: AsyncIterator,
|
||||||
|
ems_stream: tractor.MsgStream,
|
||||||
|
emsflow: dict[str, list[MsgUnion]],
|
||||||
|
ids: bidict[str, int],
|
||||||
|
reqids2txids: dict[int, str],
|
||||||
|
trans: list[pp.Transaction],
|
||||||
|
acctid: str,
|
||||||
|
acc_name: str,
|
||||||
|
token: str,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Main msg handling loop for all things order management.
|
||||||
|
|
||||||
|
This code is broken out to make the context explicit and state variables
|
||||||
|
defined in the signature clear to the reader.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# transaction records which will be updated
|
||||||
|
# on new trade clearing events (aka order "fills")
|
||||||
|
trans: list[pp.Transaction]
|
||||||
|
|
||||||
|
async for msg in ws_stream:
|
||||||
|
match msg:
|
||||||
|
# process and relay clearing trade events to ems
|
||||||
# https://docs.kraken.com/websockets/#message-ownTrades
|
# https://docs.kraken.com/websockets/#message-ownTrades
|
||||||
async for msg in stream_messages(ws):
|
case [
|
||||||
match msg:
|
trades_msgs,
|
||||||
case [
|
'ownTrades',
|
||||||
trades_msgs,
|
# won't exist for historical values?
|
||||||
'ownTrades',
|
# 'userref': reqid,
|
||||||
{'sequence': seq},
|
{'sequence': seq},
|
||||||
]:
|
]:
|
||||||
# XXX: do we actually need this orrr?
|
# flatten msgs to an {id -> data} table for processing
|
||||||
# ensure that we are only processing new trades?
|
trades = {
|
||||||
assert seq > count
|
tid: trade
|
||||||
count += 1
|
for entry in trades_msgs
|
||||||
|
for (tid, trade) in entry.items()
|
||||||
|
|
||||||
# flatten msgs for processing
|
# only emit entries which are already not-in-ledger
|
||||||
trades = {
|
if tid not in {r.tid for r in trans}
|
||||||
tid: trade
|
}
|
||||||
for entry in trades_msgs
|
for tid, trade in trades.items():
|
||||||
for (tid, trade) in entry.items()
|
|
||||||
|
|
||||||
# only emit entries which are already not-in-ledger
|
# NOTE: try to get the requid sent in the order
|
||||||
if tid not in {r.tid for r in trans}
|
# request message if posssible; it may not be
|
||||||
}
|
# provided since this sub also returns generic
|
||||||
for tid, trade in trades.items():
|
# historical trade events.
|
||||||
|
reqid = trade.get('userref', trade['ordertxid'])
|
||||||
|
|
||||||
# parse-cast
|
action = trade['type']
|
||||||
reqid = trade['ordertxid']
|
price = float(trade['price'])
|
||||||
action = trade['type']
|
size = float(trade['vol'])
|
||||||
price = float(trade['price'])
|
broker_time = float(trade['time'])
|
||||||
size = float(trade['vol'])
|
|
||||||
broker_time = float(trade['time'])
|
# send a fill msg for gui update
|
||||||
|
fill_msg = BrokerdFill(
|
||||||
|
reqid=reqid,
|
||||||
|
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
|
||||||
|
action=action,
|
||||||
|
size=size,
|
||||||
|
price=price,
|
||||||
|
# TODO: maybe capture more msg data
|
||||||
|
# i.e fees?
|
||||||
|
broker_details={'name': 'kraken'},
|
||||||
|
broker_time=broker_time
|
||||||
|
)
|
||||||
|
await ems_stream.send(fill_msg)
|
||||||
|
|
||||||
|
filled_msg = BrokerdStatus(
|
||||||
|
reqid=reqid,
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
|
||||||
|
account=acc_name,
|
||||||
|
status='filled',
|
||||||
|
filled=size,
|
||||||
|
reason='Order filled by kraken',
|
||||||
|
broker_details={
|
||||||
|
'name': 'kraken',
|
||||||
|
'broker_time': broker_time
|
||||||
|
},
|
||||||
|
|
||||||
|
# TODO: figure out if kraken gives a count
|
||||||
|
# of how many units of underlying were
|
||||||
|
# filled. Alternatively we can decrement
|
||||||
|
# this value ourselves by associating and
|
||||||
|
# calcing from the diff with the original
|
||||||
|
# client-side request, see:
|
||||||
|
# https://github.com/pikers/piker/issues/296
|
||||||
|
remaining=0,
|
||||||
|
)
|
||||||
|
await ems_stream.send(filled_msg)
|
||||||
|
|
||||||
|
# update ledger and position tracking
|
||||||
|
with open_ledger(acctid, trades) as trans:
|
||||||
|
active, closed = pp.update_pps_conf(
|
||||||
|
'kraken',
|
||||||
|
acctid,
|
||||||
|
trade_records=trans,
|
||||||
|
ledger_reload={}.fromkeys(
|
||||||
|
t.bsuid for t in trans),
|
||||||
|
)
|
||||||
|
|
||||||
|
# emit any new pp msgs to ems
|
||||||
|
for pos in filter(
|
||||||
|
bool,
|
||||||
|
chain(active.values(), closed.values()),
|
||||||
|
):
|
||||||
|
pp_msg = BrokerdPosition(
|
||||||
|
broker='kraken',
|
||||||
|
|
||||||
|
# XXX: ok so this is annoying, we're
|
||||||
|
# relaying an account name with the
|
||||||
|
# backend suffix prefixed but when
|
||||||
|
# reading accounts from ledgers we
|
||||||
|
# don't need it and/or it's prefixed
|
||||||
|
# in the section table.. we should
|
||||||
|
# just strip this from the message
|
||||||
|
# right since `.broker` is already
|
||||||
|
# included?
|
||||||
|
account=f'kraken.{acctid}',
|
||||||
|
symbol=pos.symbol.front_fqsn(),
|
||||||
|
size=pos.size,
|
||||||
|
avg_price=pos.be_price,
|
||||||
|
|
||||||
|
# TODO
|
||||||
|
# currency=''
|
||||||
|
)
|
||||||
|
await ems_stream.send(pp_msg)
|
||||||
|
|
||||||
|
# process and relay order state change events
|
||||||
|
# https://docs.kraken.com/websockets/#message-openOrders
|
||||||
|
case [
|
||||||
|
order_msgs,
|
||||||
|
'openOrders',
|
||||||
|
{'sequence': seq},
|
||||||
|
]:
|
||||||
|
for order_msg in order_msgs:
|
||||||
|
log.info(
|
||||||
|
f'Order msg update_{seq}:\n'
|
||||||
|
f'{pformat(order_msg)}'
|
||||||
|
)
|
||||||
|
txid, update_msg = list(order_msg.items())[0]
|
||||||
|
match update_msg:
|
||||||
|
|
||||||
|
# we ignore internal order updates triggered by
|
||||||
|
# kraken's "edit" endpoint.
|
||||||
|
case {
|
||||||
|
'cancel_reason': 'Order replaced',
|
||||||
|
'status': status,
|
||||||
|
'userref': reqid,
|
||||||
|
**rest,
|
||||||
|
}:
|
||||||
|
continue
|
||||||
|
|
||||||
|
case {
|
||||||
|
'status': status,
|
||||||
|
'userref': reqid,
|
||||||
|
**rest,
|
||||||
|
|
||||||
|
# XXX: eg. of remaining msg schema:
|
||||||
|
# 'avg_price': _,
|
||||||
|
# 'cost': _,
|
||||||
|
# 'descr': {
|
||||||
|
# 'close': None,
|
||||||
|
# 'leverage': None,
|
||||||
|
# 'order': descr,
|
||||||
|
# 'ordertype': 'limit',
|
||||||
|
# 'pair': 'XMR/EUR',
|
||||||
|
# 'price': '74.94000000',
|
||||||
|
# 'price2': '0.00000000',
|
||||||
|
# 'type': 'buy'
|
||||||
|
# },
|
||||||
|
# 'expiretm': None,
|
||||||
|
# 'fee': '0.00000000',
|
||||||
|
# 'limitprice': '0.00000000',
|
||||||
|
# 'misc': '',
|
||||||
|
# 'oflags': 'fciq',
|
||||||
|
# 'opentm': '1656966131.337344',
|
||||||
|
# 'refid': None,
|
||||||
|
# 'starttm': None,
|
||||||
|
# 'stopprice': '0.00000000',
|
||||||
|
# 'timeinforce': 'GTC',
|
||||||
|
# 'vol': submit_vlm, # '13.34400854',
|
||||||
|
# 'vol_exec': exec_vlm, # 0.0000
|
||||||
|
}:
|
||||||
|
ems_status = {
|
||||||
|
'open': 'submitted',
|
||||||
|
'closed': 'cancelled',
|
||||||
|
'canceled': 'cancelled',
|
||||||
|
# do we even need to forward
|
||||||
|
# this state to the ems?
|
||||||
|
'pending': 'pending',
|
||||||
|
}[status]
|
||||||
|
|
||||||
|
submit_vlm = rest.get('vol', 0)
|
||||||
|
exec_vlm = rest.get('vol_exec', 0)
|
||||||
|
|
||||||
|
reqids2txids[reqid] = txid
|
||||||
|
|
||||||
|
oid = ids.inverse.get(reqid)
|
||||||
|
if not oid:
|
||||||
|
# TODO: handle these and relay them
|
||||||
|
# through the EMS to the client / UI
|
||||||
|
# side!
|
||||||
|
log.warning(
|
||||||
|
f'Received active order {txid}:\n'
|
||||||
|
f'{update_msg}\n'
|
||||||
|
'Cancelling order for now!..'
|
||||||
|
)
|
||||||
|
|
||||||
|
# call ws api to cancel:
|
||||||
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
|
await ws.send_msg({
|
||||||
|
'event': 'cancelOrder',
|
||||||
|
'token': token,
|
||||||
|
'reqid': reqid,
|
||||||
|
'txid': [txid],
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
|
msgs = emsflow[oid]
|
||||||
|
|
||||||
|
# send BrokerdStatus messages for all
|
||||||
|
# order state updates
|
||||||
|
resp = BrokerdStatus(
|
||||||
|
|
||||||
# send a fill msg for gui update
|
|
||||||
fill_msg = BrokerdFill(
|
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(), # cuz why not
|
||||||
|
|
||||||
action=action,
|
|
||||||
size=size,
|
|
||||||
price=price,
|
|
||||||
# TODO: maybe capture more msg data
|
|
||||||
# i.e fees?
|
|
||||||
broker_details={'name': 'kraken'},
|
|
||||||
broker_time=broker_time
|
|
||||||
)
|
|
||||||
await ems_stream.send(fill_msg.dict())
|
|
||||||
|
|
||||||
filled_msg = BrokerdStatus(
|
|
||||||
reqid=reqid,
|
|
||||||
time_ns=time.time_ns(),
|
|
||||||
|
|
||||||
account=acc_name,
|
|
||||||
status='filled',
|
|
||||||
filled=size,
|
|
||||||
reason='Order filled by kraken',
|
|
||||||
broker_details={
|
|
||||||
'name': 'kraken',
|
|
||||||
'broker_time': broker_time
|
|
||||||
},
|
|
||||||
|
|
||||||
# TODO: figure out if kraken gives a count
|
|
||||||
# of how many units of underlying were
|
|
||||||
# filled. Alternatively we can decrement
|
|
||||||
# this value ourselves by associating and
|
|
||||||
# calcing from the diff with the original
|
|
||||||
# client-side request, see:
|
|
||||||
# https://github.com/pikers/piker/issues/296
|
|
||||||
remaining=0,
|
|
||||||
)
|
|
||||||
await ems_stream.send(filled_msg.dict())
|
|
||||||
|
|
||||||
# update ledger and position tracking
|
|
||||||
trans = await update_ledger(acctid, trades)
|
|
||||||
active, closed = pp.update_pps_conf(
|
|
||||||
'kraken',
|
|
||||||
acctid,
|
|
||||||
trade_records=trans,
|
|
||||||
ledger_reload={}.fromkeys(
|
|
||||||
t.bsuid for t in trans),
|
|
||||||
)
|
|
||||||
|
|
||||||
# emit pp msgs
|
|
||||||
for pos in filter(
|
|
||||||
bool,
|
|
||||||
chain(active.values(), closed.values()),
|
|
||||||
):
|
|
||||||
pp_msg = BrokerdPosition(
|
|
||||||
broker='kraken',
|
|
||||||
|
|
||||||
# XXX: ok so this is annoying, we're
|
|
||||||
# relaying an account name with the
|
|
||||||
# backend suffix prefixed but when
|
|
||||||
# reading accounts from ledgers we
|
|
||||||
# don't need it and/or it's prefixed
|
|
||||||
# in the section table.. we should
|
|
||||||
# just strip this from the message
|
|
||||||
# right since `.broker` is already
|
|
||||||
# included?
|
|
||||||
account=f'kraken.{acctid}',
|
account=f'kraken.{acctid}',
|
||||||
symbol=pos.symbol.front_fqsn(),
|
|
||||||
size=pos.size,
|
|
||||||
avg_price=pos.be_price,
|
|
||||||
|
|
||||||
# TODO
|
# everyone doin camel case..
|
||||||
# currency=''
|
status=ems_status, # force lower case
|
||||||
|
|
||||||
|
filled=exec_vlm,
|
||||||
|
reason='', # why held?
|
||||||
|
remaining=(
|
||||||
|
float(submit_vlm)
|
||||||
|
-
|
||||||
|
float(exec_vlm)
|
||||||
|
),
|
||||||
|
|
||||||
|
broker_details=dict(
|
||||||
|
{'name': 'kraken'}, **update_msg
|
||||||
|
),
|
||||||
)
|
)
|
||||||
await ems_stream.send(pp_msg.dict())
|
msgs.append(resp)
|
||||||
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
case [
|
case _:
|
||||||
trades_msgs,
|
log.warning(
|
||||||
'openOrders',
|
'Unknown orders msg:\n'
|
||||||
{'sequence': seq},
|
f'{txid}:{order_msg}'
|
||||||
]:
|
)
|
||||||
# TODO: async order update handling which we
|
|
||||||
# should remove from `handle_order_requests()`
|
|
||||||
# above:
|
|
||||||
# https://github.com/pikers/piker/issues/293
|
|
||||||
# https://github.com/pikers/piker/issues/310
|
|
||||||
log.info(f'Order update {seq}:{trades_msgs}')
|
|
||||||
|
|
||||||
case _:
|
case {
|
||||||
log.warning(f'Unhandled trades msg: {msg}')
|
'event': etype,
|
||||||
await tractor.breakpoint()
|
'status': status,
|
||||||
|
'reqid': reqid,
|
||||||
|
**rest,
|
||||||
|
} as event if (
|
||||||
|
etype in {
|
||||||
|
'addOrderStatus',
|
||||||
|
'editOrderStatus',
|
||||||
|
'cancelOrderStatus',
|
||||||
|
}
|
||||||
|
):
|
||||||
|
oid = ids.inverse.get(reqid)
|
||||||
|
if not oid:
|
||||||
|
log.warning(
|
||||||
|
'Unknown order status update?:\n'
|
||||||
|
f'{event}'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
txid = rest.get('txid')
|
||||||
|
if txid:
|
||||||
|
reqids2txids[reqid] = txid
|
||||||
|
|
||||||
|
msgs = emsflow[oid]
|
||||||
|
last = msgs[-1]
|
||||||
|
resps, errored = process_status(
|
||||||
|
event,
|
||||||
|
oid,
|
||||||
|
token,
|
||||||
|
msgs,
|
||||||
|
last,
|
||||||
|
)
|
||||||
|
if resps:
|
||||||
|
msgs.extend(resps)
|
||||||
|
for resp in resps:
|
||||||
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
|
case _:
|
||||||
|
log.warning(f'Unhandled trades update msg: {msg}')
|
||||||
|
|
||||||
|
|
||||||
|
def process_status(
|
||||||
|
event: dict[str, str],
|
||||||
|
oid: str,
|
||||||
|
token: str,
|
||||||
|
msgs: list[MsgUnion],
|
||||||
|
last: MsgUnion,
|
||||||
|
|
||||||
|
) -> tuple[list[MsgUnion], bool]:
|
||||||
|
'''
|
||||||
|
Process `'[add/edit/cancel]OrderStatus'` events by translating to
|
||||||
|
and returning the equivalent EMS-msg responses.
|
||||||
|
|
||||||
|
'''
|
||||||
|
match event:
|
||||||
|
case {
|
||||||
|
'event': etype,
|
||||||
|
'status': 'error',
|
||||||
|
'reqid': reqid,
|
||||||
|
'errorMessage': errmsg,
|
||||||
|
}:
|
||||||
|
# any of ``{'add', 'edit', 'cancel'}``
|
||||||
|
action = etype.rstrip('OrderStatus')
|
||||||
|
log.error(
|
||||||
|
f'Failed to {action} order {reqid}:\n'
|
||||||
|
f'{errmsg}'
|
||||||
|
)
|
||||||
|
resp = BrokerdError(
|
||||||
|
oid=oid,
|
||||||
|
# XXX: use old reqid in case it changed?
|
||||||
|
reqid=reqid,
|
||||||
|
symbol=getattr(last, 'symbol', 'N/A'),
|
||||||
|
|
||||||
|
reason=f'Failed {action}:\n{errmsg}',
|
||||||
|
broker_details=event
|
||||||
|
)
|
||||||
|
return [resp], True
|
||||||
|
|
||||||
|
# successful request cases
|
||||||
|
case {
|
||||||
|
'event': 'addOrderStatus',
|
||||||
|
'status': "ok",
|
||||||
|
'reqid': reqid, # oid from ems side
|
||||||
|
'txid': txid,
|
||||||
|
'descr': descr, # only on success?
|
||||||
|
}:
|
||||||
|
log.info(
|
||||||
|
f'Submitting order: {descr}\n'
|
||||||
|
f'ems oid: {oid}\n'
|
||||||
|
f're-mapped reqid: {reqid}\n'
|
||||||
|
f'txid: {txid}\n'
|
||||||
|
)
|
||||||
|
return [], False
|
||||||
|
|
||||||
|
case {
|
||||||
|
'event': 'editOrderStatus',
|
||||||
|
'status': "ok",
|
||||||
|
'reqid': reqid, # oid from ems side
|
||||||
|
'descr': descr,
|
||||||
|
|
||||||
|
# NOTE: for edit request this is a new value
|
||||||
|
'txid': txid,
|
||||||
|
'originaltxid': origtxid,
|
||||||
|
}:
|
||||||
|
log.info(
|
||||||
|
f'Editting order {oid}[requid={reqid}]:\n'
|
||||||
|
f'txid: {origtxid} -> {txid}\n'
|
||||||
|
f'{descr}'
|
||||||
|
)
|
||||||
|
# deliver another ack to update the ems-side `.reqid`.
|
||||||
|
return [], False
|
||||||
|
|
||||||
|
case {
|
||||||
|
"event": "cancelOrderStatus",
|
||||||
|
"status": "ok",
|
||||||
|
'reqid': reqid,
|
||||||
|
|
||||||
|
# XXX: sometimes this isn't provided!?
|
||||||
|
# 'txid': txids,
|
||||||
|
**rest,
|
||||||
|
}:
|
||||||
|
# TODO: should we support "batch" acking of
|
||||||
|
# multiple cancels thus avoiding the below loop?
|
||||||
|
resps: list[MsgUnion] = []
|
||||||
|
for txid in rest.get('txid', [last.reqid]):
|
||||||
|
resp = BrokerdStatus(
|
||||||
|
reqid=reqid,
|
||||||
|
account=last.account,
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
status='cancelled',
|
||||||
|
reason='Cancel success: {oid}@{txid}',
|
||||||
|
broker_details=event,
|
||||||
|
)
|
||||||
|
resps.append(resp)
|
||||||
|
|
||||||
|
return resps, False
|
||||||
|
|
||||||
|
|
||||||
def norm_trade_records(
|
def norm_trade_records(
|
||||||
|
|
@ -494,10 +760,9 @@ def norm_trade_records(
|
||||||
) -> list[pp.Transaction]:
|
) -> list[pp.Transaction]:
|
||||||
|
|
||||||
records: list[pp.Transaction] = []
|
records: list[pp.Transaction] = []
|
||||||
|
|
||||||
for tid, record in ledger.items():
|
for tid, record in ledger.items():
|
||||||
|
|
||||||
size = record.get('vol') * {
|
size = float(record.get('vol')) * {
|
||||||
'buy': 1,
|
'buy': 1,
|
||||||
'sell': -1,
|
'sell': -1,
|
||||||
}[record['type']]
|
}[record['type']]
|
||||||
|
|
@ -508,7 +773,7 @@ def norm_trade_records(
|
||||||
pp.Transaction(
|
pp.Transaction(
|
||||||
fqsn=f'{norm_sym}.kraken',
|
fqsn=f'{norm_sym}.kraken',
|
||||||
tid=tid,
|
tid=tid,
|
||||||
size=float(size),
|
size=size,
|
||||||
price=float(record['price']),
|
price=float(record['price']),
|
||||||
cost=float(record['fee']),
|
cost=float(record['fee']),
|
||||||
dt=pendulum.from_timestamp(float(record['time'])),
|
dt=pendulum.from_timestamp(float(record['time'])),
|
||||||
|
|
@ -522,19 +787,24 @@ def norm_trade_records(
|
||||||
return records
|
return records
|
||||||
|
|
||||||
|
|
||||||
async def update_ledger(
|
@cm
|
||||||
|
def open_ledger(
|
||||||
acctid: str,
|
acctid: str,
|
||||||
trade_entries: list[dict[str, Any]],
|
trade_entries: list[dict[str, Any]],
|
||||||
|
|
||||||
) -> list[pp.Transaction]:
|
) -> list[pp.Transaction]:
|
||||||
|
'''
|
||||||
|
Write recent session's trades to the user's (local) ledger file.
|
||||||
|
|
||||||
# write recent session's trades to the user's (local) ledger file.
|
'''
|
||||||
with pp.open_trade_ledger(
|
with pp.open_trade_ledger(
|
||||||
'kraken',
|
'kraken',
|
||||||
acctid,
|
acctid,
|
||||||
) as ledger:
|
) as ledger:
|
||||||
ledger.update(trade_entries)
|
|
||||||
|
|
||||||
# normalize to transaction form
|
# normalize to transaction form
|
||||||
records = norm_trade_records(trade_entries)
|
records = norm_trade_records(trade_entries)
|
||||||
return records
|
yield records
|
||||||
|
|
||||||
|
# update on exit
|
||||||
|
ledger.update(trade_entries)
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,6 @@ import time
|
||||||
from fuzzywuzzy import process as fuzzy
|
from fuzzywuzzy import process as fuzzy
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pendulum
|
import pendulum
|
||||||
from pydantic import BaseModel
|
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
|
|
@ -45,6 +44,7 @@ from piker.brokers._util import (
|
||||||
)
|
)
|
||||||
from piker.log import get_console_log
|
from piker.log import get_console_log
|
||||||
from piker.data import ShmArray
|
from piker.data import ShmArray
|
||||||
|
from piker.data.types import Struct
|
||||||
from piker.data._web_bs import open_autorecon_ws, NoBsWs
|
from piker.data._web_bs import open_autorecon_ws, NoBsWs
|
||||||
from . import log
|
from . import log
|
||||||
from .api import (
|
from .api import (
|
||||||
|
|
@ -54,7 +54,7 @@ from .api import (
|
||||||
|
|
||||||
|
|
||||||
# https://www.kraken.com/features/api#get-tradable-pairs
|
# https://www.kraken.com/features/api#get-tradable-pairs
|
||||||
class Pair(BaseModel):
|
class Pair(Struct):
|
||||||
altname: str # alternate pair name
|
altname: str # alternate pair name
|
||||||
wsname: str # WebSocket pair name (if available)
|
wsname: str # WebSocket pair name (if available)
|
||||||
aclass_base: str # asset class of base component
|
aclass_base: str # asset class of base component
|
||||||
|
|
@ -117,9 +117,8 @@ async def stream_messages(
|
||||||
too_slow_count = 0
|
too_slow_count = 0
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if isinstance(msg, dict):
|
match msg:
|
||||||
if msg.get('event') == 'heartbeat':
|
case {'event': 'heartbeat'}:
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
delay = now - last_hb
|
delay = now - last_hb
|
||||||
last_hb = now
|
last_hb = now
|
||||||
|
|
@ -130,11 +129,20 @@ async def stream_messages(
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
err = msg.get('errorMessage')
|
case {
|
||||||
if err:
|
'connectionID': _,
|
||||||
raise BrokerError(err)
|
'event': 'systemStatus',
|
||||||
else:
|
'status': 'online',
|
||||||
yield msg
|
'version': _,
|
||||||
|
} as msg:
|
||||||
|
log.info(
|
||||||
|
'WS connection is up:\n'
|
||||||
|
f'{msg}'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
case _:
|
||||||
|
yield msg
|
||||||
|
|
||||||
|
|
||||||
async def process_data_feed_msgs(
|
async def process_data_feed_msgs(
|
||||||
|
|
@ -145,37 +153,60 @@ async def process_data_feed_msgs(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async for msg in stream_messages(ws):
|
async for msg in stream_messages(ws):
|
||||||
|
match msg:
|
||||||
|
case {
|
||||||
|
'errorMessage': errmsg
|
||||||
|
}:
|
||||||
|
raise BrokerError(errmsg)
|
||||||
|
|
||||||
chan_id, *payload_array, chan_name, pair = msg
|
case {
|
||||||
|
'event': 'subscriptionStatus',
|
||||||
|
} as sub:
|
||||||
|
log.info(
|
||||||
|
'WS subscription is active:\n'
|
||||||
|
f'{sub}'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
if 'ohlc' in chan_name:
|
case [
|
||||||
|
chan_id,
|
||||||
|
*payload_array,
|
||||||
|
chan_name,
|
||||||
|
pair
|
||||||
|
]:
|
||||||
|
if 'ohlc' in chan_name:
|
||||||
|
yield 'ohlc', OHLC(
|
||||||
|
chan_id,
|
||||||
|
chan_name,
|
||||||
|
pair,
|
||||||
|
*payload_array[0]
|
||||||
|
)
|
||||||
|
|
||||||
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
|
elif 'spread' in chan_name:
|
||||||
|
|
||||||
elif 'spread' in chan_name:
|
bid, ask, ts, bsize, asize = map(
|
||||||
|
float, payload_array[0])
|
||||||
|
|
||||||
bid, ask, ts, bsize, asize = map(float, payload_array[0])
|
# TODO: really makes you think IB has a horrible API...
|
||||||
|
quote = {
|
||||||
|
'symbol': pair.replace('/', ''),
|
||||||
|
'ticks': [
|
||||||
|
{'type': 'bid', 'price': bid, 'size': bsize},
|
||||||
|
{'type': 'bsize', 'price': bid, 'size': bsize},
|
||||||
|
|
||||||
# TODO: really makes you think IB has a horrible API...
|
{'type': 'ask', 'price': ask, 'size': asize},
|
||||||
quote = {
|
{'type': 'asize', 'price': ask, 'size': asize},
|
||||||
'symbol': pair.replace('/', ''),
|
],
|
||||||
'ticks': [
|
}
|
||||||
{'type': 'bid', 'price': bid, 'size': bsize},
|
yield 'l1', quote
|
||||||
{'type': 'bsize', 'price': bid, 'size': bsize},
|
|
||||||
|
|
||||||
{'type': 'ask', 'price': ask, 'size': asize},
|
# elif 'book' in msg[-2]:
|
||||||
{'type': 'asize', 'price': ask, 'size': asize},
|
# chan_id, *payload_array, chan_name, pair = msg
|
||||||
],
|
# print(msg)
|
||||||
}
|
|
||||||
yield 'l1', quote
|
|
||||||
|
|
||||||
# elif 'book' in msg[-2]:
|
case _:
|
||||||
# chan_id, *payload_array, chan_name, pair = msg
|
print(f'UNHANDLED MSG: {msg}')
|
||||||
# print(msg)
|
# yield msg
|
||||||
|
|
||||||
else:
|
|
||||||
print(f'UNHANDLED MSG: {msg}')
|
|
||||||
yield msg
|
|
||||||
|
|
||||||
|
|
||||||
def normalize(
|
def normalize(
|
||||||
|
|
@ -316,7 +347,7 @@ async def stream_quotes(
|
||||||
sym = sym.upper()
|
sym = sym.upper()
|
||||||
|
|
||||||
si = Pair(**await client.symbol_info(sym)) # validation
|
si = Pair(**await client.symbol_info(sym)) # validation
|
||||||
syminfo = si.dict()
|
syminfo = si.to_dict()
|
||||||
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
|
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
|
||||||
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
|
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
|
||||||
syminfo['asset_type'] = 'crypto'
|
syminfo['asset_type'] = 'crypto'
|
||||||
|
|
@ -385,7 +416,7 @@ async def stream_quotes(
|
||||||
msg_gen = process_data_feed_msgs(ws)
|
msg_gen = process_data_feed_msgs(ws)
|
||||||
|
|
||||||
# TODO: use ``anext()`` when it lands in 3.10!
|
# TODO: use ``anext()`` when it lands in 3.10!
|
||||||
typ, ohlc_last = await msg_gen.__anext__()
|
typ, ohlc_last = await anext(msg_gen)
|
||||||
|
|
||||||
topic, quote = normalize(ohlc_last)
|
topic, quote = normalize(ohlc_last)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,10 +22,9 @@ from enum import Enum
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
from pydantic import BaseModel, validator
|
|
||||||
# from msgspec import Struct
|
|
||||||
|
|
||||||
from ..data._source import Symbol
|
from ..data._source import Symbol
|
||||||
|
from ..data.types import Struct
|
||||||
from ..pp import Position
|
from ..pp import Position
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -41,33 +40,30 @@ SizeUnit = Enum(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class Allocator(BaseModel):
|
class Allocator(Struct):
|
||||||
|
|
||||||
class Config:
|
|
||||||
validate_assignment = True
|
|
||||||
copy_on_model_validation = False
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
|
|
||||||
# required to get the account validator lookup working?
|
|
||||||
extra = 'allow'
|
|
||||||
underscore_attrs_are_private = False
|
|
||||||
|
|
||||||
symbol: Symbol
|
symbol: Symbol
|
||||||
account: Optional[str] = 'paper'
|
account: Optional[str] = 'paper'
|
||||||
|
|
||||||
|
_size_units: bidict[str, Optional[str]] = _size_units
|
||||||
|
|
||||||
# TODO: for enums this clearly doesn't fucking work, you can't set
|
# TODO: for enums this clearly doesn't fucking work, you can't set
|
||||||
# a default at startup by passing in a `dict` but yet you can set
|
# a default at startup by passing in a `dict` but yet you can set
|
||||||
# that value through assignment..for wtv cucked reason.. honestly, pure
|
# that value through assignment..for wtv cucked reason.. honestly, pure
|
||||||
# unintuitive garbage.
|
# unintuitive garbage.
|
||||||
size_unit: str = 'currency'
|
_size_unit: str = 'currency'
|
||||||
_size_units: dict[str, Optional[str]] = _size_units
|
|
||||||
|
|
||||||
@validator('size_unit', pre=True)
|
@property
|
||||||
def maybe_lookup_key(cls, v):
|
def size_unit(self) -> str:
|
||||||
# apply the corresponding enum key for the text "description" value
|
return self._size_unit
|
||||||
|
|
||||||
|
@size_unit.setter
|
||||||
|
def size_unit(self, v: str) -> Optional[str]:
|
||||||
if v not in _size_units:
|
if v not in _size_units:
|
||||||
return _size_units.inverse[v]
|
v = _size_units.inverse[v]
|
||||||
|
|
||||||
assert v in _size_units
|
assert v in _size_units
|
||||||
|
self._size_unit = v
|
||||||
return v
|
return v
|
||||||
|
|
||||||
# TODO: if we ever want ot support non-uniform entry-slot-proportion
|
# TODO: if we ever want ot support non-uniform entry-slot-proportion
|
||||||
|
|
@ -262,7 +258,7 @@ def mk_allocator(
|
||||||
# default allocation settings
|
# default allocation settings
|
||||||
defaults: dict[str, float] = {
|
defaults: dict[str, float] = {
|
||||||
'account': None, # select paper by default
|
'account': None, # select paper by default
|
||||||
'size_unit': 'currency',
|
# 'size_unit': 'currency',
|
||||||
'units_limit': 400,
|
'units_limit': 400,
|
||||||
'currency_limit': 5e3,
|
'currency_limit': 5e3,
|
||||||
'slots': 4,
|
'slots': 4,
|
||||||
|
|
@ -301,6 +297,9 @@ def mk_allocator(
|
||||||
# entry step 1.0
|
# entry step 1.0
|
||||||
alloc.units_limit = alloc.slots
|
alloc.units_limit = alloc.slots
|
||||||
|
|
||||||
|
else:
|
||||||
|
alloc.size_unit = 'currency'
|
||||||
|
|
||||||
# if the current position is already greater then the limit
|
# if the current position is already greater then the limit
|
||||||
# settings, increase the limit to the current position
|
# settings, increase the limit to the current position
|
||||||
if alloc.size_unit == 'currency':
|
if alloc.size_unit == 'currency':
|
||||||
|
|
|
||||||
|
|
@ -58,11 +58,11 @@ class OrderBook:
|
||||||
|
|
||||||
def send(
|
def send(
|
||||||
self,
|
self,
|
||||||
msg: Order,
|
msg: Order | dict,
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
self._sent_orders[msg.oid] = msg
|
self._sent_orders[msg.oid] = msg
|
||||||
self._to_ems.send_nowait(msg.dict())
|
self._to_ems.send_nowait(msg)
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
def update(
|
def update(
|
||||||
|
|
@ -73,9 +73,8 @@ class OrderBook:
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
cmd = self._sent_orders[uuid]
|
cmd = self._sent_orders[uuid]
|
||||||
msg = cmd.dict()
|
msg = cmd.copy(update=data)
|
||||||
msg.update(data)
|
self._sent_orders[uuid] = msg
|
||||||
self._sent_orders[uuid] = Order(**msg)
|
|
||||||
self._to_ems.send_nowait(msg)
|
self._to_ems.send_nowait(msg)
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
|
|
@ -88,7 +87,7 @@ class OrderBook:
|
||||||
oid=uuid,
|
oid=uuid,
|
||||||
symbol=cmd.symbol,
|
symbol=cmd.symbol,
|
||||||
)
|
)
|
||||||
self._to_ems.send_nowait(msg.dict())
|
self._to_ems.send_nowait(msg)
|
||||||
|
|
||||||
|
|
||||||
_orders: OrderBook = None
|
_orders: OrderBook = None
|
||||||
|
|
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
|
||||||
book = get_orders()
|
book = get_orders()
|
||||||
async with book._from_order_book.subscribe() as orders_stream:
|
async with book._from_order_book.subscribe() as orders_stream:
|
||||||
async for cmd in orders_stream:
|
async for cmd in orders_stream:
|
||||||
if cmd['symbol'] == symbol_key:
|
if cmd.symbol == symbol_key:
|
||||||
log.info(f'Send order cmd:\n{pformat(cmd)}')
|
log.info(f'Send order cmd:\n{pformat(cmd)}')
|
||||||
# send msg over IPC / wire
|
# send msg over IPC / wire
|
||||||
await to_ems_stream.send(cmd)
|
await to_ems_stream.send(cmd)
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,6 @@ import time
|
||||||
from typing import AsyncIterator, Callable
|
from typing import AsyncIterator, Callable
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
from pydantic import BaseModel
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
|
|
@ -34,6 +33,7 @@ import tractor
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ..data._normalize import iterticks
|
from ..data._normalize import iterticks
|
||||||
from ..data.feed import Feed, maybe_open_feed
|
from ..data.feed import Feed, maybe_open_feed
|
||||||
|
from ..data.types import Struct
|
||||||
from .._daemon import maybe_spawn_brokerd
|
from .._daemon import maybe_spawn_brokerd
|
||||||
from . import _paper_engine as paper
|
from . import _paper_engine as paper
|
||||||
from ._messages import (
|
from ._messages import (
|
||||||
|
|
@ -88,7 +88,8 @@ def mk_check(
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class _DarkBook:
|
class _DarkBook:
|
||||||
'''EMS-trigger execution book.
|
'''
|
||||||
|
EMS-trigger execution book.
|
||||||
|
|
||||||
Contains conditions for executions (aka "orders" or "triggers")
|
Contains conditions for executions (aka "orders" or "triggers")
|
||||||
which are not exposed to brokers and thus the market; i.e. these are
|
which are not exposed to brokers and thus the market; i.e. these are
|
||||||
|
|
@ -231,7 +232,7 @@ async def clear_dark_triggers(
|
||||||
price=submit_price,
|
price=submit_price,
|
||||||
size=cmd['size'],
|
size=cmd['size'],
|
||||||
)
|
)
|
||||||
await brokerd_orders_stream.send(msg.dict())
|
await brokerd_orders_stream.send(msg)
|
||||||
|
|
||||||
# mark this entry as having sent an order
|
# mark this entry as having sent an order
|
||||||
# request. the entry will be replaced once the
|
# request. the entry will be replaced once the
|
||||||
|
|
@ -247,14 +248,11 @@ async def clear_dark_triggers(
|
||||||
|
|
||||||
msg = Status(
|
msg = Status(
|
||||||
oid=oid, # ems order id
|
oid=oid, # ems order id
|
||||||
resp=resp,
|
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
symbol=fqsn,
|
resp=resp,
|
||||||
trigger_price=price,
|
trigger_price=price,
|
||||||
broker_details={'name': broker},
|
brokerd_msg=cmd,
|
||||||
cmd=cmd, # original request message
|
)
|
||||||
|
|
||||||
).dict()
|
|
||||||
|
|
||||||
# remove exec-condition from set
|
# remove exec-condition from set
|
||||||
log.info(f'removing pred for {oid}')
|
log.info(f'removing pred for {oid}')
|
||||||
|
|
@ -303,7 +301,7 @@ class TradesRelay:
|
||||||
consumers: int = 0
|
consumers: int = 0
|
||||||
|
|
||||||
|
|
||||||
class Router(BaseModel):
|
class Router(Struct):
|
||||||
'''
|
'''
|
||||||
Order router which manages and tracks per-broker dark book,
|
Order router which manages and tracks per-broker dark book,
|
||||||
alerts, clearing and related data feed management.
|
alerts, clearing and related data feed management.
|
||||||
|
|
@ -324,10 +322,6 @@ class Router(BaseModel):
|
||||||
# brokername to trades-dialogues streams with ``brokerd`` actors
|
# brokername to trades-dialogues streams with ``brokerd`` actors
|
||||||
relays: dict[str, TradesRelay] = {}
|
relays: dict[str, TradesRelay] = {}
|
||||||
|
|
||||||
class Config:
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
underscore_attrs_are_private = False
|
|
||||||
|
|
||||||
def get_dark_book(
|
def get_dark_book(
|
||||||
self,
|
self,
|
||||||
brokername: str,
|
brokername: str,
|
||||||
|
|
@ -581,11 +575,11 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
if name == 'position':
|
if name == 'position':
|
||||||
|
|
||||||
pos_msg = BrokerdPosition(**brokerd_msg).dict()
|
pos_msg = BrokerdPosition(**brokerd_msg)
|
||||||
|
|
||||||
# XXX: this will be useful for automatic strats yah?
|
# XXX: this will be useful for automatic strats yah?
|
||||||
# keep pps per account up to date locally in ``emsd`` mem
|
# keep pps per account up to date locally in ``emsd`` mem
|
||||||
sym, broker = pos_msg['symbol'], pos_msg['broker']
|
sym, broker = pos_msg.symbol, pos_msg.broker
|
||||||
|
|
||||||
relay.positions.setdefault(
|
relay.positions.setdefault(
|
||||||
# NOTE: translate to a FQSN!
|
# NOTE: translate to a FQSN!
|
||||||
|
|
@ -652,6 +646,13 @@ async def translate_and_relay_brokerd_events(
|
||||||
else:
|
else:
|
||||||
# check for existing live flow entry
|
# check for existing live flow entry
|
||||||
entry = book._ems_entries.get(oid)
|
entry = book._ems_entries.get(oid)
|
||||||
|
old_reqid = entry.reqid
|
||||||
|
|
||||||
|
if old_reqid and old_reqid != reqid:
|
||||||
|
log.warning(
|
||||||
|
f'Brokerd order id change for {oid}:\n'
|
||||||
|
f'{old_reqid} -> {reqid}'
|
||||||
|
)
|
||||||
|
|
||||||
# initial response to brokerd order request
|
# initial response to brokerd order request
|
||||||
if name == 'ack':
|
if name == 'ack':
|
||||||
|
|
@ -662,6 +663,10 @@ async def translate_and_relay_brokerd_events(
|
||||||
# a ``BrokerdOrderAck`` **must** be sent after an order
|
# a ``BrokerdOrderAck`` **must** be sent after an order
|
||||||
# request in order to establish this id mapping.
|
# request in order to establish this id mapping.
|
||||||
book._ems2brokerd_ids[oid] = reqid
|
book._ems2brokerd_ids[oid] = reqid
|
||||||
|
log.info(
|
||||||
|
'Rx ACK for order\n'
|
||||||
|
f'oid: {oid} -> reqid: {reqid}'
|
||||||
|
)
|
||||||
|
|
||||||
# new order which has not yet be registered into the
|
# new order which has not yet be registered into the
|
||||||
# local ems book, insert it now and handle 2 cases:
|
# local ems book, insert it now and handle 2 cases:
|
||||||
|
|
@ -676,7 +681,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
entry.reqid = reqid
|
entry.reqid = reqid
|
||||||
|
|
||||||
# tell broker to cancel immediately
|
# tell broker to cancel immediately
|
||||||
await brokerd_trades_stream.send(entry.dict())
|
await brokerd_trades_stream.send(entry)
|
||||||
|
|
||||||
# - the order is now active and will be mirrored in
|
# - the order is now active and will be mirrored in
|
||||||
# our book -> registered as live flow
|
# our book -> registered as live flow
|
||||||
|
|
@ -716,7 +721,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
# if 10147 in message: cancel
|
# if 10147 in message: cancel
|
||||||
|
|
||||||
resp = 'broker_errored'
|
resp = 'broker_errored'
|
||||||
broker_details = msg.dict()
|
broker_details = msg
|
||||||
|
|
||||||
# don't relay message to order requester client
|
# don't relay message to order requester client
|
||||||
# continue
|
# continue
|
||||||
|
|
@ -751,7 +756,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
resp = 'broker_' + msg.status
|
resp = 'broker_' + msg.status
|
||||||
|
|
||||||
# pass the BrokerdStatus msg inside the broker details field
|
# pass the BrokerdStatus msg inside the broker details field
|
||||||
broker_details = msg.dict()
|
broker_details = msg
|
||||||
|
|
||||||
elif name in (
|
elif name in (
|
||||||
'fill',
|
'fill',
|
||||||
|
|
@ -760,7 +765,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# proxy through the "fill" result(s)
|
# proxy through the "fill" result(s)
|
||||||
resp = 'broker_filled'
|
resp = 'broker_filled'
|
||||||
broker_details = msg.dict()
|
broker_details = msg
|
||||||
|
|
||||||
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
|
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
|
||||||
|
|
||||||
|
|
@ -778,7 +783,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
broker_reqid=reqid,
|
broker_reqid=reqid,
|
||||||
brokerd_msg=broker_details,
|
brokerd_msg=broker_details,
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.error(
|
log.error(
|
||||||
|
|
@ -843,14 +848,14 @@ async def process_client_order_cmds(
|
||||||
|
|
||||||
# NOTE: cancel response will be relayed back in messages
|
# NOTE: cancel response will be relayed back in messages
|
||||||
# from corresponding broker
|
# from corresponding broker
|
||||||
if reqid:
|
if reqid is not None:
|
||||||
|
|
||||||
# send cancel to brokerd immediately!
|
# send cancel to brokerd immediately!
|
||||||
log.info(
|
log.info(
|
||||||
f'Submitting cancel for live order {reqid}'
|
f'Submitting cancel for live order {reqid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
await brokerd_order_stream.send(msg.dict())
|
await brokerd_order_stream.send(msg)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# this might be a cancel for an order that hasn't been
|
# this might be a cancel for an order that hasn't been
|
||||||
|
|
@ -872,7 +877,7 @@ async def process_client_order_cmds(
|
||||||
resp='dark_cancelled',
|
resp='dark_cancelled',
|
||||||
oid=oid,
|
oid=oid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
# de-register this client dialogue
|
# de-register this client dialogue
|
||||||
router.dialogues.pop(oid)
|
router.dialogues.pop(oid)
|
||||||
|
|
@ -927,7 +932,7 @@ async def process_client_order_cmds(
|
||||||
# handle relaying the ems side responses back to
|
# handle relaying the ems side responses back to
|
||||||
# the client/cmd sender from this request
|
# the client/cmd sender from this request
|
||||||
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
|
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
|
||||||
await brokerd_order_stream.send(msg.dict())
|
await brokerd_order_stream.send(msg)
|
||||||
|
|
||||||
# an immediate response should be ``BrokerdOrderAck``
|
# an immediate response should be ``BrokerdOrderAck``
|
||||||
# with ems order id from the ``trades_dialogue()``
|
# with ems order id from the ``trades_dialogue()``
|
||||||
|
|
@ -1007,7 +1012,7 @@ async def process_client_order_cmds(
|
||||||
resp=resp,
|
resp=resp,
|
||||||
oid=oid,
|
oid=oid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
# piker: trading gear for hackers
|
# piker: trading gear for hackers
|
||||||
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
|
@ -15,21 +15,26 @@
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Clearing system messagingn types and protocols.
|
Clearing sub-system message and protocols.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Optional, Union
|
from typing import Optional, Union
|
||||||
|
|
||||||
# TODO: try out just encoding/send direction for now?
|
|
||||||
# import msgspec
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from ..data._source import Symbol
|
from ..data._source import Symbol
|
||||||
|
from ..data.types import Struct
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: ``msgspec`` stuff worth paying attention to:
|
||||||
|
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
|
||||||
|
# - use literals for a common msg determined by diff keys?
|
||||||
|
# - https://jcristharif.com/msgspec/usage.html#literal
|
||||||
|
# - for eg. ``BrokerdStatus``, instead just have separate messages?
|
||||||
|
|
||||||
|
# --------------
|
||||||
# Client -> emsd
|
# Client -> emsd
|
||||||
|
# --------------
|
||||||
|
|
||||||
|
class Cancel(Struct):
|
||||||
class Cancel(BaseModel):
|
|
||||||
'''Cancel msg for removing a dark (ems triggered) or
|
'''Cancel msg for removing a dark (ems triggered) or
|
||||||
broker-submitted (live) trigger/order.
|
broker-submitted (live) trigger/order.
|
||||||
|
|
||||||
|
|
@ -39,8 +44,10 @@ class Cancel(BaseModel):
|
||||||
symbol: str
|
symbol: str
|
||||||
|
|
||||||
|
|
||||||
class Order(BaseModel):
|
class Order(Struct):
|
||||||
|
|
||||||
|
# TODO: use ``msgspec.Literal``
|
||||||
|
# https://jcristharif.com/msgspec/usage.html#literal
|
||||||
action: str # {'buy', 'sell', 'alert'}
|
action: str # {'buy', 'sell', 'alert'}
|
||||||
# internal ``emdsd`` unique "order id"
|
# internal ``emdsd`` unique "order id"
|
||||||
oid: str # uuid4
|
oid: str # uuid4
|
||||||
|
|
@ -48,6 +55,9 @@ class Order(BaseModel):
|
||||||
account: str # should we set a default as '' ?
|
account: str # should we set a default as '' ?
|
||||||
|
|
||||||
price: float
|
price: float
|
||||||
|
# TODO: could we drop the ``.action`` field above and instead just
|
||||||
|
# use +/- values here? Would make the msg smaller at the sake of a
|
||||||
|
# teensie fp precision?
|
||||||
size: float
|
size: float
|
||||||
brokers: list[str]
|
brokers: list[str]
|
||||||
|
|
||||||
|
|
@ -59,20 +69,14 @@ class Order(BaseModel):
|
||||||
# the backend broker
|
# the backend broker
|
||||||
exec_mode: str # {'dark', 'live', 'paper'}
|
exec_mode: str # {'dark', 'live', 'paper'}
|
||||||
|
|
||||||
class Config:
|
|
||||||
# just for pre-loading a ``Symbol`` when used
|
|
||||||
# in the order mode staging process
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
# don't copy this model instance when used in
|
|
||||||
# a recursive model
|
|
||||||
copy_on_model_validation = False
|
|
||||||
|
|
||||||
|
# --------------
|
||||||
# Client <- emsd
|
# Client <- emsd
|
||||||
|
# --------------
|
||||||
# update msgs from ems which relay state change info
|
# update msgs from ems which relay state change info
|
||||||
# from the active clearing engine.
|
# from the active clearing engine.
|
||||||
|
|
||||||
|
class Status(Struct):
|
||||||
class Status(BaseModel):
|
|
||||||
|
|
||||||
name: str = 'status'
|
name: str = 'status'
|
||||||
oid: str # uuid4
|
oid: str # uuid4
|
||||||
|
|
@ -95,8 +99,6 @@ class Status(BaseModel):
|
||||||
# }
|
# }
|
||||||
resp: str # "response", see above
|
resp: str # "response", see above
|
||||||
|
|
||||||
# symbol: str
|
|
||||||
|
|
||||||
# trigger info
|
# trigger info
|
||||||
trigger_price: Optional[float] = None
|
trigger_price: Optional[float] = None
|
||||||
# price: float
|
# price: float
|
||||||
|
|
@ -111,10 +113,12 @@ class Status(BaseModel):
|
||||||
brokerd_msg: dict = {}
|
brokerd_msg: dict = {}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------
|
||||||
# emsd -> brokerd
|
# emsd -> brokerd
|
||||||
|
# ---------------
|
||||||
# requests *sent* from ems to respective backend broker daemon
|
# requests *sent* from ems to respective backend broker daemon
|
||||||
|
|
||||||
class BrokerdCancel(BaseModel):
|
class BrokerdCancel(Struct):
|
||||||
|
|
||||||
action: str = 'cancel'
|
action: str = 'cancel'
|
||||||
oid: str # piker emsd order id
|
oid: str # piker emsd order id
|
||||||
|
|
@ -130,7 +134,7 @@ class BrokerdCancel(BaseModel):
|
||||||
reqid: Optional[Union[int, str]] = None
|
reqid: Optional[Union[int, str]] = None
|
||||||
|
|
||||||
|
|
||||||
class BrokerdOrder(BaseModel):
|
class BrokerdOrder(Struct):
|
||||||
|
|
||||||
action: str # {buy, sell}
|
action: str # {buy, sell}
|
||||||
oid: str
|
oid: str
|
||||||
|
|
@ -150,11 +154,12 @@ class BrokerdOrder(BaseModel):
|
||||||
size: float
|
size: float
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------
|
||||||
# emsd <- brokerd
|
# emsd <- brokerd
|
||||||
|
# ---------------
|
||||||
# requests *received* to ems from broker backend
|
# requests *received* to ems from broker backend
|
||||||
|
|
||||||
|
class BrokerdOrderAck(Struct):
|
||||||
class BrokerdOrderAck(BaseModel):
|
|
||||||
'''
|
'''
|
||||||
Immediate reponse to a brokerd order request providing the broker
|
Immediate reponse to a brokerd order request providing the broker
|
||||||
specific unique order id so that the EMS can associate this
|
specific unique order id so that the EMS can associate this
|
||||||
|
|
@ -172,7 +177,7 @@ class BrokerdOrderAck(BaseModel):
|
||||||
account: str = ''
|
account: str = ''
|
||||||
|
|
||||||
|
|
||||||
class BrokerdStatus(BaseModel):
|
class BrokerdStatus(Struct):
|
||||||
|
|
||||||
name: str = 'status'
|
name: str = 'status'
|
||||||
reqid: Union[int, str]
|
reqid: Union[int, str]
|
||||||
|
|
@ -205,7 +210,7 @@ class BrokerdStatus(BaseModel):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class BrokerdFill(BaseModel):
|
class BrokerdFill(Struct):
|
||||||
'''
|
'''
|
||||||
A single message indicating a "fill-details" event from the broker
|
A single message indicating a "fill-details" event from the broker
|
||||||
if avaiable.
|
if avaiable.
|
||||||
|
|
@ -230,7 +235,7 @@ class BrokerdFill(BaseModel):
|
||||||
broker_time: float
|
broker_time: float
|
||||||
|
|
||||||
|
|
||||||
class BrokerdError(BaseModel):
|
class BrokerdError(Struct):
|
||||||
'''
|
'''
|
||||||
Optional error type that can be relayed to emsd for error handling.
|
Optional error type that can be relayed to emsd for error handling.
|
||||||
|
|
||||||
|
|
@ -249,7 +254,7 @@ class BrokerdError(BaseModel):
|
||||||
broker_details: dict = {}
|
broker_details: dict = {}
|
||||||
|
|
||||||
|
|
||||||
class BrokerdPosition(BaseModel):
|
class BrokerdPosition(Struct):
|
||||||
'''Position update event from brokerd.
|
'''Position update event from brokerd.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
|
||||||
|
|
@ -117,7 +117,7 @@ class PaperBoi:
|
||||||
reason='paper_trigger',
|
reason='paper_trigger',
|
||||||
remaining=size,
|
remaining=size,
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg.dict())
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
# if we're already a clearing price simulate an immediate fill
|
# if we're already a clearing price simulate an immediate fill
|
||||||
if (
|
if (
|
||||||
|
|
@ -173,7 +173,7 @@ class PaperBoi:
|
||||||
broker=self.broker,
|
broker=self.broker,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg.dict())
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
async def fake_fill(
|
async def fake_fill(
|
||||||
self,
|
self,
|
||||||
|
|
@ -216,7 +216,7 @@ class PaperBoi:
|
||||||
'name': self.broker + '_paper',
|
'name': self.broker + '_paper',
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg.dict())
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
if order_complete:
|
if order_complete:
|
||||||
|
|
||||||
|
|
@ -240,7 +240,7 @@ class PaperBoi:
|
||||||
'name': self.broker,
|
'name': self.broker,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg.dict())
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
# lookup any existing position
|
# lookup any existing position
|
||||||
token = f'{symbol}.{self.broker}'
|
token = f'{symbol}.{self.broker}'
|
||||||
|
|
@ -268,7 +268,7 @@ class PaperBoi:
|
||||||
)
|
)
|
||||||
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
|
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
|
||||||
|
|
||||||
await self.ems_trades_stream.send(pp_msg.dict())
|
await self.ems_trades_stream.send(pp_msg)
|
||||||
|
|
||||||
|
|
||||||
async def simulate_fills(
|
async def simulate_fills(
|
||||||
|
|
@ -384,7 +384,7 @@ async def handle_order_requests(
|
||||||
oid=request_msg['oid'],
|
oid=request_msg['oid'],
|
||||||
symbol=request_msg['symbol'],
|
symbol=request_msg['symbol'],
|
||||||
reason=f'Paper only. No account found: `{account}` ?',
|
reason=f'Paper only. No account found: `{account}` ?',
|
||||||
).dict())
|
))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# validate
|
# validate
|
||||||
|
|
@ -416,7 +416,7 @@ async def handle_order_requests(
|
||||||
# broker specific request id
|
# broker specific request id
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
|
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
elif action == 'cancel':
|
elif action == 'cancel':
|
||||||
|
|
|
||||||
|
|
@ -27,13 +27,14 @@ from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
|
||||||
if _USE_POSIX:
|
if _USE_POSIX:
|
||||||
from _posixshmem import shm_unlink
|
from _posixshmem import shm_unlink
|
||||||
|
|
||||||
import tractor
|
# import msgspec
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pydantic import BaseModel
|
|
||||||
from numpy.lib import recfunctions as rfn
|
from numpy.lib import recfunctions as rfn
|
||||||
|
import tractor
|
||||||
|
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ._source import base_iohlc_dtype
|
from ._source import base_iohlc_dtype
|
||||||
|
from .types import Struct
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
@ -107,15 +108,12 @@ class SharedInt:
|
||||||
log.warning(f'Shm for {name} already unlinked?')
|
log.warning(f'Shm for {name} already unlinked?')
|
||||||
|
|
||||||
|
|
||||||
class _Token(BaseModel):
|
class _Token(Struct, frozen=True):
|
||||||
'''
|
'''
|
||||||
Internal represenation of a shared memory "token"
|
Internal represenation of a shared memory "token"
|
||||||
which can be used to key a system wide post shm entry.
|
which can be used to key a system wide post shm entry.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
class Config:
|
|
||||||
frozen = True
|
|
||||||
|
|
||||||
shm_name: str # this servers as a "key" value
|
shm_name: str # this servers as a "key" value
|
||||||
shm_first_index_name: str
|
shm_first_index_name: str
|
||||||
shm_last_index_name: str
|
shm_last_index_name: str
|
||||||
|
|
@ -126,17 +124,22 @@ class _Token(BaseModel):
|
||||||
return np.dtype(list(map(tuple, self.dtype_descr))).descr
|
return np.dtype(list(map(tuple, self.dtype_descr))).descr
|
||||||
|
|
||||||
def as_msg(self):
|
def as_msg(self):
|
||||||
return self.dict()
|
return self.to_dict()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_msg(cls, msg: dict) -> _Token:
|
def from_msg(cls, msg: dict) -> _Token:
|
||||||
if isinstance(msg, _Token):
|
if isinstance(msg, _Token):
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
# TODO: native struct decoding
|
||||||
|
# return _token_dec.decode(msg)
|
||||||
|
|
||||||
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
|
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
|
||||||
return _Token(**msg)
|
return _Token(**msg)
|
||||||
|
|
||||||
|
|
||||||
|
# _token_dec = msgspec.msgpack.Decoder(_Token)
|
||||||
|
|
||||||
# TODO: this api?
|
# TODO: this api?
|
||||||
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
|
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
|
||||||
# _known_tokens = tractor.ContextStack('_known_tokens', )
|
# _known_tokens = tractor.ContextStack('_known_tokens', )
|
||||||
|
|
@ -167,7 +170,7 @@ def _make_token(
|
||||||
shm_name=key,
|
shm_name=key,
|
||||||
shm_first_index_name=key + "_first",
|
shm_first_index_name=key + "_first",
|
||||||
shm_last_index_name=key + "_last",
|
shm_last_index_name=key + "_last",
|
||||||
dtype_descr=np.dtype(dtype).descr
|
dtype_descr=tuple(np.dtype(dtype).descr)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,6 @@ from trio_typing import TaskStatus
|
||||||
import trimeter
|
import trimeter
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.trionics import maybe_open_context
|
from tractor.trionics import maybe_open_context
|
||||||
from pydantic import BaseModel
|
|
||||||
import pendulum
|
import pendulum
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
|
@ -59,6 +58,7 @@ from ._sharedmem import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
)
|
)
|
||||||
from .ingest import get_ingestormod
|
from .ingest import get_ingestormod
|
||||||
|
from .types import Struct
|
||||||
from ._source import (
|
from ._source import (
|
||||||
base_iohlc_dtype,
|
base_iohlc_dtype,
|
||||||
Symbol,
|
Symbol,
|
||||||
|
|
@ -84,7 +84,7 @@ if TYPE_CHECKING:
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class _FeedsBus(BaseModel):
|
class _FeedsBus(Struct):
|
||||||
'''
|
'''
|
||||||
Data feeds broadcaster and persistence management.
|
Data feeds broadcaster and persistence management.
|
||||||
|
|
||||||
|
|
@ -100,10 +100,6 @@ class _FeedsBus(BaseModel):
|
||||||
a dedicated cancel scope.
|
a dedicated cancel scope.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
class Config:
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
underscore_attrs_are_private = False
|
|
||||||
|
|
||||||
brokername: str
|
brokername: str
|
||||||
nursery: trio.Nursery
|
nursery: trio.Nursery
|
||||||
feeds: dict[str, tuple[dict, dict]] = {}
|
feeds: dict[str, tuple[dict, dict]] = {}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,68 @@
|
||||||
|
# piker: trading gear for hackers
|
||||||
|
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Built-in (extension) types.
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Optional
|
||||||
|
from pprint import pformat
|
||||||
|
|
||||||
|
import msgspec
|
||||||
|
|
||||||
|
|
||||||
|
class Struct(
|
||||||
|
msgspec.Struct,
|
||||||
|
|
||||||
|
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||||
|
# tag='pikerstruct',
|
||||||
|
# tag=True,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
A "human friendlier" (aka repl buddy) struct subtype.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def to_dict(self) -> dict:
|
||||||
|
return {
|
||||||
|
f: getattr(self, f)
|
||||||
|
for f in self.__struct_fields__
|
||||||
|
}
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'Struct({pformat(self.to_dict())})'
|
||||||
|
|
||||||
|
def copy(
|
||||||
|
self,
|
||||||
|
update: Optional[dict] = None,
|
||||||
|
|
||||||
|
) -> msgspec.Struct:
|
||||||
|
'''
|
||||||
|
Validate-typecast all self defined fields, return a copy of us
|
||||||
|
with all such fields.
|
||||||
|
|
||||||
|
This is kinda like the default behaviour in `pydantic.BaseModel`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if update:
|
||||||
|
for k, v in update.items():
|
||||||
|
setattr(self, k, v)
|
||||||
|
|
||||||
|
# roundtrip serialize to validate
|
||||||
|
return msgspec.msgpack.Decoder(
|
||||||
|
type=type(self)
|
||||||
|
).decode(
|
||||||
|
msgspec.msgpack.Encoder().encode(self)
|
||||||
|
)
|
||||||
|
|
@ -21,7 +21,6 @@ Qt event proxying and processing using ``trio`` mem chans.
|
||||||
from contextlib import asynccontextmanager, AsyncExitStack
|
from contextlib import asynccontextmanager, AsyncExitStack
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
from pydantic import BaseModel
|
|
||||||
import trio
|
import trio
|
||||||
from PyQt5 import QtCore
|
from PyQt5 import QtCore
|
||||||
from PyQt5.QtCore import QEvent, pyqtBoundSignal
|
from PyQt5.QtCore import QEvent, pyqtBoundSignal
|
||||||
|
|
@ -30,6 +29,8 @@ from PyQt5.QtWidgets import (
|
||||||
QGraphicsSceneMouseEvent as gs_mouse,
|
QGraphicsSceneMouseEvent as gs_mouse,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from ..data.types import Struct
|
||||||
|
|
||||||
|
|
||||||
MOUSE_EVENTS = {
|
MOUSE_EVENTS = {
|
||||||
gs_mouse.GraphicsSceneMousePress,
|
gs_mouse.GraphicsSceneMousePress,
|
||||||
|
|
@ -43,13 +44,10 @@ MOUSE_EVENTS = {
|
||||||
# TODO: maybe consider some constrained ints down the road?
|
# TODO: maybe consider some constrained ints down the road?
|
||||||
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
|
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
|
||||||
|
|
||||||
class KeyboardMsg(BaseModel):
|
class KeyboardMsg(Struct):
|
||||||
'''Unpacked Qt keyboard event data.
|
'''Unpacked Qt keyboard event data.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
class Config:
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
|
|
||||||
event: QEvent
|
event: QEvent
|
||||||
etype: int
|
etype: int
|
||||||
key: int
|
key: int
|
||||||
|
|
@ -57,16 +55,13 @@ class KeyboardMsg(BaseModel):
|
||||||
txt: str
|
txt: str
|
||||||
|
|
||||||
def to_tuple(self) -> tuple:
|
def to_tuple(self) -> tuple:
|
||||||
return tuple(self.dict().values())
|
return tuple(self.to_dict().values())
|
||||||
|
|
||||||
|
|
||||||
class MouseMsg(BaseModel):
|
class MouseMsg(Struct):
|
||||||
'''Unpacked Qt keyboard event data.
|
'''Unpacked Qt keyboard event data.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
class Config:
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
|
|
||||||
event: QEvent
|
event: QEvent
|
||||||
etype: int
|
etype: int
|
||||||
button: int
|
button: int
|
||||||
|
|
|
||||||
|
|
@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
|
||||||
# color: #19232D;
|
# color: #19232D;
|
||||||
# width: 10px;
|
# width: 10px;
|
||||||
|
|
||||||
self.setRange(0, slots)
|
self.setRange(0, int(slots))
|
||||||
self.setValue(value)
|
self.setValue(value)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,12 +22,9 @@ from __future__ import annotations
|
||||||
from typing import (
|
from typing import (
|
||||||
Optional, Generic,
|
Optional, Generic,
|
||||||
TypeVar, Callable,
|
TypeVar, Callable,
|
||||||
Literal,
|
|
||||||
)
|
)
|
||||||
import enum
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from pydantic import BaseModel, validator
|
# from pydantic import BaseModel, validator
|
||||||
from pydantic.generics import GenericModel
|
from pydantic.generics import GenericModel
|
||||||
from PyQt5.QtWidgets import (
|
from PyQt5.QtWidgets import (
|
||||||
QWidget,
|
QWidget,
|
||||||
|
|
@ -38,6 +35,7 @@ from ._forms import (
|
||||||
# FontScaledDelegate,
|
# FontScaledDelegate,
|
||||||
Edit,
|
Edit,
|
||||||
)
|
)
|
||||||
|
from ..data.types import Struct
|
||||||
|
|
||||||
|
|
||||||
DataType = TypeVar('DataType')
|
DataType = TypeVar('DataType')
|
||||||
|
|
@ -62,7 +60,7 @@ class Selection(Field[DataType], Generic[DataType]):
|
||||||
options: dict[str, DataType]
|
options: dict[str, DataType]
|
||||||
# value: DataType = None
|
# value: DataType = None
|
||||||
|
|
||||||
@validator('value') # , always=True)
|
# @validator('value') # , always=True)
|
||||||
def set_value_first(
|
def set_value_first(
|
||||||
cls,
|
cls,
|
||||||
|
|
||||||
|
|
@ -100,7 +98,7 @@ class Edit(Field[DataType], Generic[DataType]):
|
||||||
widget_factory = Edit
|
widget_factory = Edit
|
||||||
|
|
||||||
|
|
||||||
class AllocatorPane(BaseModel):
|
class AllocatorPane(Struct):
|
||||||
|
|
||||||
account = Selection[str](
|
account = Selection[str](
|
||||||
options=dict.fromkeys(
|
options=dict.fromkeys(
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,6 @@ import time
|
||||||
from typing import Optional, Dict, Callable, Any
|
from typing import Optional, Dict, Callable, Any
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from pydantic import BaseModel
|
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
from PyQt5.QtCore import Qt
|
from PyQt5.QtCore import Qt
|
||||||
|
|
@ -41,6 +40,7 @@ from ..clearing._allocate import (
|
||||||
from ._style import _font
|
from ._style import _font
|
||||||
from ..data._source import Symbol
|
from ..data._source import Symbol
|
||||||
from ..data.feed import Feed
|
from ..data.feed import Feed
|
||||||
|
from ..data.types import Struct
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ._editors import LineEditor, ArrowEditor
|
from ._editors import LineEditor, ArrowEditor
|
||||||
from ._lines import order_line, LevelLine
|
from ._lines import order_line, LevelLine
|
||||||
|
|
@ -58,7 +58,7 @@ from ._forms import open_form_input_handling
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class OrderDialog(BaseModel):
|
class OrderDialog(Struct):
|
||||||
'''
|
'''
|
||||||
Trade dialogue meta-data describing the lifetime
|
Trade dialogue meta-data describing the lifetime
|
||||||
of an order submission to ``emsd`` from a chart.
|
of an order submission to ``emsd`` from a chart.
|
||||||
|
|
@ -73,10 +73,6 @@ class OrderDialog(BaseModel):
|
||||||
msgs: dict[str, dict] = {}
|
msgs: dict[str, dict] = {}
|
||||||
fills: Dict[str, Any] = {}
|
fills: Dict[str, Any] = {}
|
||||||
|
|
||||||
class Config:
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
underscore_attrs_are_private = False
|
|
||||||
|
|
||||||
|
|
||||||
def on_level_change_update_next_order_info(
|
def on_level_change_update_next_order_info(
|
||||||
|
|
||||||
|
|
@ -268,7 +264,8 @@ class OrderMode:
|
||||||
self,
|
self,
|
||||||
|
|
||||||
) -> OrderDialog:
|
) -> OrderDialog:
|
||||||
'''Send execution order to EMS return a level line to
|
'''
|
||||||
|
Send execution order to EMS return a level line to
|
||||||
represent the order on a chart.
|
represent the order on a chart.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
@ -277,13 +274,9 @@ class OrderMode:
|
||||||
oid = str(uuid.uuid4())
|
oid = str(uuid.uuid4())
|
||||||
|
|
||||||
# format order data for ems
|
# format order data for ems
|
||||||
fqsn = symbol.front_fqsn()
|
order = staged.copy()
|
||||||
order = staged.copy(
|
order.oid = oid
|
||||||
update={
|
order.symbol = symbol.front_fqsn()
|
||||||
'symbol': fqsn,
|
|
||||||
'oid': oid,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
line = self.line_from_order(
|
line = self.line_from_order(
|
||||||
order,
|
order,
|
||||||
|
|
@ -858,7 +851,9 @@ async def process_trades_and_update_ui(
|
||||||
# delete level line from view
|
# delete level line from view
|
||||||
mode.on_cancel(oid)
|
mode.on_cancel(oid)
|
||||||
broker_msg = msg['brokerd_msg']
|
broker_msg = msg['brokerd_msg']
|
||||||
log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}')
|
log.warning(
|
||||||
|
f'Order {oid} failed with:\n{pformat(broker_msg)}'
|
||||||
|
)
|
||||||
|
|
||||||
elif resp in (
|
elif resp in (
|
||||||
'dark_triggered'
|
'dark_triggered'
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue