Compare commits

...

28 Commits

Author SHA1 Message Date
Tyler Goodlet 869aa8251a Import adjustments to allow msg codec overriding in `tractor` 2022-07-08 10:56:50 -04:00
Tyler Goodlet d31c38ef51 Mucking with custom `msgspec.Struct` codecs
Syncs with https://github.com/goodboy/tractor/pull/311
which is nowhere near ready and this approach didn't end up being
as straight forward as hoped. We're going to need a top level
`Msg`-boxing type/protocol in `tractor` first...
2022-07-08 10:56:50 -04:00
Tyler Goodlet de91c2196d Drop remaining `BaseModel` api usage from rest of codebase 2022-07-08 10:55:02 -04:00
Tyler Goodlet 583fa79e5e Add `Struct.copy()` which does a rountrip validate 2022-07-08 10:54:04 -04:00
Tyler Goodlet 6887d4d1b0 Change all clearing msgs over to `msgspec` 2022-07-08 10:53:33 -04:00
Tyler Goodlet c87704e593 Cast slots to `int` before range set 2022-07-07 21:08:46 -04:00
Tyler Goodlet cfc08a5814 Drop pydantic from allocator 2022-07-07 21:04:53 -04:00
Tyler Goodlet c10a85a8f3 Add a custom `msgspec.Struct` with some humanizing 2022-07-07 19:36:39 -04:00
Tyler Goodlet 4e9ff65465 Finally solve the last-price-is-`nan` issue..
Not sure why I put this off for so long but the check is in now such
that if the market isn't open or no rt quote comes in from the first
query, we just pull from the last shm history 'close' value.
Includes another fix to avoid raising when a double remove on the client
side stream from the registry sometimes happens.
2022-07-07 19:15:01 -04:00
Tyler Goodlet bf5fcfe896 Fix missing container id, drop custom exception 2022-07-07 17:10:06 -04:00
Tyler Goodlet 861826dd7b Lol, gotta `float()` that vlm before `*` XD 2022-07-07 16:27:14 -04:00
Tyler Goodlet 2664360474 Remove `BaseModel` use from all dataclass-like uses 2022-07-06 12:54:03 -04:00
Tyler Goodlet 5f24c57220 Use struct for shm tokens 2022-07-06 12:42:46 -04:00
Tyler Goodlet 4660b57f3c Use our struct for kraken `Pair` type 2022-07-06 10:30:32 -04:00
Tyler Goodlet da54c80cac Hard kill container on both a timeout or connection error 2022-07-06 10:13:27 -04:00
Tyler Goodlet 7ec95ec00a Drop `pydantic` from service mngr 2022-07-06 08:58:52 -04:00
Tyler Goodlet 92d24b7123 Use our struct in binance backend 2022-07-06 08:58:07 -04:00
Tyler Goodlet de1b4739f8 Add ledger and `pps.toml` snippets 2022-07-05 20:58:55 -04:00
Tyler Goodlet 2c3307a879 Try out a backend readme 2022-07-05 20:58:55 -04:00
Tyler Goodlet ef36c5ff52 Don't require an ems msg symbol on error statuses 2022-07-05 20:58:55 -04:00
Tyler Goodlet 65ff9a1fa1 Update ledger *after* pps updates from new trades
Addressing same issue as in #350 where we need to compute position
updates using the *first read* from the ledger **before** we update it
to make sure `Position.lifo_update()` gets called and **not skipped**
because new trades were read as clears entries but haven't actually been
included in update calcs yet.. aka we call `Position.lifo_update()`.

Main change here is to convert `update_ledger()` into a context mngr so
that the ledger write is committed after pps updates using
`pp.update_pps_conf()`..

This is basically a hotfix to #346 as well.
2022-07-05 20:58:55 -04:00
Tyler Goodlet e901547e9f Factor status handling into a new `process_status()` helper 2022-07-05 20:58:55 -04:00
Tyler Goodlet 4d19c0f910 Factor msg loop into new func: `handle_order_updates()` 2022-07-05 20:58:55 -04:00
Tyler Goodlet 9723d2737a Drop uneeded count-sequencec verification 2022-07-05 20:58:55 -04:00
Tyler Goodlet 204d3b7214 Get order "editing" working fully
Turns out the EMS can support this as originally expected: you can
update a `brokerd`-side `.reqid` through a `BrokerdAck` msg and the ems
which update its cross-dialog (leg) tracking correctly! The issue was
a bug in the `editOrderStatus` msg handling and appropriate tracking
of the correct `.oid` (ems uid) on the kraken side. This unfortunately
required adding a `emsflow: dict[str, list[BrokerdOrder]]` msg flow
tracing table which means the broker daemon is tracking all the msg flow
with the ems, though I'm wondering now if this is just good practise
anyway and maybe we should offer a small primitive type from our msging
utils to aid with this? I've used such constructs in event handling
systems prior.

There's a lot more factoring that can be done after these changes as
well but the quick detailed summary is,
- rework the `handle_order_requests()` loop to use `match:` syntax and
  update the new `emsflow` table on every new request from the ems.
- fix the `editOrderStatus` case pattern to not include an error msg and
  thus actually be triggered to respond to the ems with a `BrokerdAck`
  containing the new `.reqid`, the new kraken side `txid`.
- skip any `openOrders` msgs which are detected as being kraken's
  internal order "edits" by matching on the `cancel_reason` field.
- update the `emsflow` table in all ws-stream msg handling blocks
  with responses sent to the ems.

Relates to #290
2022-07-05 20:58:55 -04:00
Tyler Goodlet d41d140265 Make ems relay loop report on brokerd `.reqid` changes 2022-07-05 20:58:55 -04:00
Tyler Goodlet 78a78f5edb Use `match:` syntax in data feed subs processing 2022-07-05 20:58:55 -04:00
Tyler Goodlet ea5004c8d3 First draft, working WS based order management
Move to using the websocket API for all order control ops and dropping
the sync rest api approach which resulted in a bunch of buggy races.
Further this gets us must faster (batch) order cancellation for free
and a simpler ems request handler loop. We now heavily leverage the new
py3.10 `match:` syntax for all kraken-side API msg parsing and
processing and handle both the `openOrders` and `ownTrades` subscription
streams.

We also block "order editing" (by immediate cancellation) for now since
the EMS isn't entirely yet equipped to handle brokerd side `.reqid`
changes (which is how kraken implements so called order "updates" or
"edits") for a given order-request dialog and we may want to even
consider just implementing "updates" ourselves via independent cancel
and submit requests? Definitely something to ponder. Alternatively we
can "masquerade" such updates behind the count-style `.oid` remapping we
had to implement anyway (kraken's limitation) and maybe everything will
just work?

Further details in this patch:
- create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s
  that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable
  local lookup of ems uids to piker-backend-client-side request ids and
  received order messages.
- add `openOrders` sub support which more or less directly relays to
  equivalent `BrokerdStatus` updates and calc the `.filled` and
  `.remaining` values based on cleared vlm updates.
- add handler blocks for `[add/edit/cancel]OrderStatus` events including
  error msg cases.
- don't do any order request response processing in
  `handle_order_requests()` since responses are always received via one
  (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus
  such msgs are now handled in the response relay loop.

Relates to #290
Resolves #310, #296
2022-07-05 20:58:55 -04:00
21 changed files with 935 additions and 535 deletions

View File

@ -22,13 +22,13 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm
from collections import defaultdict
from pydantic import BaseModel
import tractor
import trio
from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log
from .brokers import get_brokermod
from .data.types import Struct
log = get_logger(__name__)
@ -47,16 +47,13 @@ _root_modules = [
]
class Services(BaseModel):
class Services(Struct):
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config:
arbitrary_types_allowed = True
async def start_service_task(
self,
name: str,
@ -207,23 +204,26 @@ async def open_piker_runtime(
assert _services is None
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
with tractor.msg.configure_native_msgs(
[Struct],
):
yield tractor.current_actor()
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
):
yield tractor.current_actor()
@acm
@ -263,27 +263,31 @@ async def maybe_open_pikerd(
if loglevel:
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
# XXX: this may open a root actor as well
with tractor.msg.configure_native_msgs(
[Struct],
):
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
yield None
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
yield None
# brokerd enabled modules
@ -445,7 +449,7 @@ async def spawn_brokerd(
)
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
from .data.feed import _setup_persistent_brokerd
await _services.start_service_task(
dname,

View File

@ -34,13 +34,13 @@ from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__)
@ -79,12 +79,14 @@ _show_wap_in_history = False
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel):
class Pair(Struct, frozen=True):
symbol: str
status: str
baseAsset: str
baseAssetPrecision: int
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAsset: str
quotePrecision: int
quoteAssetPrecision: int
@ -287,7 +289,7 @@ async def get_client() -> Client:
# validation type
class AggTrade(BaseModel):
class AggTrade(Struct):
e: str # Event type
E: int # Event time
s: str # Symbol
@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
elif msg.get('e') == 'aggTrade':
# validate
# NOTE: this is purely for a definition, ``msgspec.Struct``
# does not runtime-validate until you decode/encode.
# see: https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg)
# TODO: type out and require this quote format
@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': msg.p,
'size': msg.q,
'price': float(msg.p),
'size': float(msg.q),
'broker_ts': msg.T,
}],
}
@ -448,7 +452,7 @@ async def stream_quotes(
d = cache[sym.upper()]
syminfo = Pair(**d) # validation
si = sym_infos[sym] = syminfo.dict()
si = sym_infos[sym] = syminfo.to_dict()
# XXX: after manually inspecting the response format we
# just directly pick out the info we need

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
))
continue
client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
))
continue
if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
).dict())
))
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
).dict()
)
)
elif action == 'cancel':
@ -559,7 +558,7 @@ async def trades_dialogue(
cids2pps,
validate=True,
)
all_positions.extend(msg.dict() for msg in msgs)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0]
break
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
async def deliver_trade_events(
@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'},
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
case 'fill':
@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'],
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
# 2 cases:
# - fill comes first or
@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict())
# await ems_stream.send(msg)
case 'event':
@ -891,7 +890,7 @@ async def deliver_trade_events(
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -0,0 +1,64 @@
``kraken`` backend
------------------
though they don't have the most liquidity of all the cexes they sure are
accommodating to those of us who appreciate a little ``xmr``.
status
******
current support is *production grade* and both real-time data and order
management should be correct and fast. this backend is used by core devs
for live trading.
config
******
In order to get order mode support your ``brokers.toml``
needs to have something like the following:
.. code:: toml
[kraken]
accounts.spot = 'spot'
key_descr = "spot"
api_key = "69696969696969696696969696969696969696969696969696969696"
secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696"
If everything works correctly you should see any current positions
loaded in the pps pane on chart load and you should also be able to
check your trade records in the file::
<pikerk_conf_dir>/ledgers/trades_kraken_spot.toml
An example ledger file will have entries written verbatim from the
trade events schema:
.. code:: toml
[TFJBKK-SMBZS-VJ4UWS]
ordertxid = "SMBZSA-7CNQU-3HWLNJ"
postxid = "SMBZSE-M7IF5-CFI7LT"
pair = "XXMRZEUR"
time = 1655691993.4133966
type = "buy"
ordertype = "limit"
price = "103.97000000"
cost = "499.99999977"
fee = "0.80000000"
vol = "4.80907954"
margin = "0.00000000"
misc = ""
your ``pps.toml`` file will have position entries like,
.. code:: toml
[kraken.spot."xmreur.kraken"]
size = 4.80907954
be_price = 103.97000000
bsuid = "XXMRZEUR"
clears = [
{ tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" },
]

View File

@ -282,6 +282,7 @@ class Client:
"volume": str(size),
}
return await self.endpoint('AddOrder', data)
else:
# Edit order data for kraken api
data["txid"] = reqid

View File

@ -18,21 +18,22 @@
Order api and machinery
'''
from contextlib import asynccontextmanager as acm
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from functools import partial
from itertools import chain
from itertools import chain, count
from pprint import pformat
import time
from typing import (
Any,
AsyncIterator,
# Callable,
# Optional,
# Union,
Union,
)
from bidict import bidict
import pendulum
from pydantic import BaseModel
import trio
import tractor
import wsproto
@ -61,179 +62,130 @@ from .feed import (
stream_messages,
)
class Trade(BaseModel):
'''
Trade class that helps parse and validate ownTrades stream
'''
reqid: str # kraken order transaction id
action: str # buy or sell
price: float # price of asset
size: float # vol of asset
broker_time: str # e.g GTC, GTD
MsgUnion = Union[
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
]
async def handle_order_requests(
ws: NoBsWs,
client: Client,
ems_order_stream: tractor.MsgStream,
token: str,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
) -> None:
'''
Process new order submission requests from the EMS
and deliver acks or errors.
request_msg: dict
'''
# XXX: UGH, let's unify this.. with ``msgspec``.
msg: dict[str, Any]
order: BrokerdOrder
counter = count()
async for request_msg in ems_order_stream:
log.info(
'Received order request:\n'
f'{pformat(request_msg)}'
)
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
case {
'account': 'kraken.spot',
'action': action,
} if action in {'buy', 'sell'}:
action = request_msg['action']
# validate
order = BrokerdOrder(**msg)
if action in {'buy', 'sell'}:
# logic from old `Client.submit_limit()`
if order.oid in ids:
ep = 'editOrder'
reqid = ids[order.oid] # integer not txid
last = emsflow[order.oid][-1]
assert last.reqid == order.reqid
extra = {
'orderid': last.reqid, # txid
}
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
# reason=f'Kraken only, No account found: `{account}` ?',
reason=(
'Kraken only, order mode disabled due to '
'https://github.com/pikers/piker/issues/299'
),
).dict())
continue
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
resp = await client.submit_limit(
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
reqid=order.reqid,
)
err = resp['error']
if err:
oid = order.oid
log.error(f'Failed to submit order: {oid}')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=order.reqid,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
# TODO: handle multiple orders (cancels?)
# txid is an array of strings
if order.reqid is None:
reqid = resp['result']['txid'][0]
else:
# update the internal pairing of oid to krakens
# txid with the new txid that is returned on edit
reqid = resp['result']['txid']
ep = 'addOrder'
reqid = next(counter)
ids[order.oid] = reqid
log.debug(
f"GENERATED ORDER {reqid}\n"
f'{ids}'
)
extra = {
'ordertype': 'limit',
'type': order.action,
}
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
psym = order.symbol.upper()
pair = f'{psym[:3]}/{psym[3:]}'
# ems order request id
oid=order.oid,
# call ws api to submit the order:
# https://docs.kraken.com/websockets/#message-addOrder
req = {
'event': ep,
'token': token,
# broker specific request id
reqid=reqid,
'reqid': reqid, # remapped-to-int uid from ems
'pair': pair,
'price': str(order.price),
'volume': str(order.size),
# account the made the order
account=order.account
# only ensures request is valid, nothing more
# validate: 'true',
).dict()
)
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
await ws.send_msg(req)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
# placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order)
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
)
case {
'account': 'kraken.spot',
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
assert cancel.oid in emsflow
reqid = ids[cancel.oid]
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled.
try:
result = resp['result']
count = result['count']
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [cancel.reqid], # should be txid from submission
})
# check for 'error' key if we received no 'result'
except KeyError:
error = resp.get('error')
case _:
account = msg.get('account')
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
reason="Failed order cancel",
broker_details=resp
).dict()
)
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
),
if not error:
raise BrokerError(f'Unknown order cancel response: {resp}')
else:
if not count: # no orders were cancelled?
# XXX: what exactly is this from and why would we care?
# there doesn't seem to be any docs here?
# https://docs.kraken.com/rest/#operation/cancelOrder
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
pending = result.get('pending')
if pending:
log.error(f'Order {oid} cancel was not yet successful')
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
)
else: # order cancel success case.
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
else:
log.error(f'Unknown order command: {request_msg}')
)
@acm
@ -310,13 +262,13 @@ async def trades_dialogue(
log.info(
f'Loaded {len(trades)} trades from account `{acc_name}`'
)
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans),
)
with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans),
)
position_msgs: list[dict] = []
pps: dict[int, pp.Position]
@ -330,7 +282,7 @@ async def trades_dialogue(
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg.dict())
position_msgs.append(msg)
await ctx.started(
(position_msgs, [acc_name])
@ -358,134 +310,410 @@ async def trades_dialogue(
) as ws,
trio.open_nursery() as n,
):
# task local msg dialog tracking
emsflow: dict[
str,
list[MsgUnion],
] = {}
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
# task for processing inbound requests from ems
n.start_soon(handle_order_requests, client, ems_stream)
n.start_soon(
handle_order_requests,
ws,
client,
ems_stream,
token,
emsflow,
ids,
)
count: int = 0
# enter relay loop
await handle_order_updates(
ws,
ems_stream,
emsflow,
ids,
trans,
acctid,
acc_name,
token,
)
# process and relay trades events to ems
async def handle_order_updates(
ws: NoBsWs,
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
Main msg handling loop for all things order management.
This code is broken out to make the context explicit and state variables
defined in the signature clear to the reader.
'''
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
trans: list[pp.Transaction]
async for msg in stream_messages(ws):
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
async for msg in stream_messages(ws):
match msg:
case [
trades_msgs,
'ownTrades',
{'sequence': seq},
]:
# XXX: do we actually need this orrr?
# ensure that we are only processing new trades?
assert seq > count
count += 1
case [
trades_msgs,
'ownTrades',
{'sequence': seq},
]:
# flatten msgs to an {id -> data} table for processing
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# flatten msgs for processing
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans}
}
for tid, trade in trades.items():
# only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans}
}
for tid, trade in trades.items():
# parse-cast
reqid = trade['ordertxid']
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])
# parse-cast
reqid = trade['ordertxid']
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg)
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg)
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
# update ledger and position tracking
with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# update ledger and position tracking
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# emit any new pp msgs to ems
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# emit pp msgs
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
# TODO
# currency=''
)
await ems_stream.send(pp_msg)
# process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders
case [
order_msgs,
'openOrders',
{'sequence': seq},
]:
for order_msg in order_msgs:
log.info(
f'Order msg update_{seq}:\n'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
match update_msg:
# we ignore internal order updates triggered by
# kraken's "edit" endpoint.
case {
'cancel_reason': 'Order replaced',
'status': status,
'userref': reqid,
**rest,
}:
continue
case {
'status': status,
'userref': reqid,
**rest,
# XXX: eg. of remaining msg schema:
# 'avg_price': _,
# 'cost': _,
# 'descr': {
# 'close': None,
# 'leverage': None,
# 'order': descr,
# 'ordertype': 'limit',
# 'pair': 'XMR/EUR',
# 'price': '74.94000000',
# 'price2': '0.00000000',
# 'type': 'buy'
# },
# 'expiretm': None,
# 'fee': '0.00000000',
# 'limitprice': '0.00000000',
# 'misc': '',
# 'oflags': 'fciq',
# 'opentm': '1656966131.337344',
# 'refid': None,
# 'starttm': None,
# 'stopprice': '0.00000000',
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm, # 0.0000
}:
ems_status = {
'open': 'submitted',
'closed': 'cancelled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
oid = ids.inverse[reqid]
msgs = emsflow[oid]
# send BrokerdStatus messages for all
# order state updates
resp = BrokerdStatus(
reqid=txid,
time_ns=time.time_ns(), # cuz why not
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# TODO
# currency=''
# everyone doin camel case..
status=ems_status, # force lower case
filled=exec_vlm,
reason='', # why held?
remaining=(
float(submit_vlm)
-
float(exec_vlm)
),
broker_details=dict(
{'name': 'kraken'}, **update_msg
),
)
await ems_stream.send(pp_msg.dict())
msgs.append(resp)
await ems_stream.send(resp)
case [
trades_msgs,
'openOrders',
{'sequence': seq},
]:
# TODO: async order update handling which we
# should remove from `handle_order_requests()`
# above:
# https://github.com/pikers/piker/issues/293
# https://github.com/pikers/piker/issues/310
log.info(f'Order update {seq}:{trades_msgs}')
case _:
log.warning(
'Unknown orders msg:\n'
f'{txid}:{order_msg}'
)
case _:
log.warning(f'Unhandled trades msg: {msg}')
await tractor.breakpoint()
case {
'event': etype,
'status': status,
'reqid': reqid,
} as event if (
etype in {
'addOrderStatus',
'editOrderStatus',
'cancelOrderStatus',
}
):
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resps, errored = process_status(
event,
oid,
token,
msgs,
last,
)
# if errored:
# if we rx any error cancel the order again
# await ws.send_msg({
# 'event': 'cancelOrder',
# 'token': token,
# 'reqid': reqid,
# 'txid': [last.reqid], # txid from submission
# })
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp)
case _:
log.warning(f'Unhandled trades update msg: {msg}')
def process_status(
event: dict[str, str],
oid: str,
token: str,
msgs: list[MsgUnion],
last: MsgUnion,
) -> tuple[list[MsgUnion], bool]:
'''
Process `'[add/edit/cancel]OrderStatus'` events by translating to
and returning the equivalent EMS-msg responses.
'''
match event:
case {
'event': etype,
'status': 'error',
'reqid': reqid,
'errorMessage': errmsg,
}:
# any of ``{'add', 'edit', 'cancel'}``
action = etype.rstrip('OrderStatus')
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
resp = BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=last.reqid,
symbol=getattr(last, 'symbol', 'N/A'),
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
)
return [resp], True
# successful request cases
case {
'event': 'addOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'txid': txid,
'descr': descr, # only on success?
}:
log.info(
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
)
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
return [resp], False
case {
'event': 'editOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
return [resp], False
case {
"event": "cancelOrderStatus",
"status": "ok",
'reqid': reqid,
# XXX: sometimes this isn't provided!?
# 'txid': txids,
**rest,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
resps: list[MsgUnion] = []
for txid in rest.get('txid', [last.reqid]):
resp = BrokerdStatus(
reqid=txid,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=event,
)
resps.append(resp)
return resps, False
def norm_trade_records(
@ -494,10 +722,9 @@ def norm_trade_records(
) -> list[pp.Transaction]:
records: list[pp.Transaction] = []
for tid, record in ledger.items():
size = record.get('vol') * {
size = float(record.get('vol')) * {
'buy': 1,
'sell': -1,
}[record['type']]
@ -508,7 +735,7 @@ def norm_trade_records(
pp.Transaction(
fqsn=f'{norm_sym}.kraken',
tid=tid,
size=float(size),
size=size,
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),
@ -522,19 +749,24 @@ def norm_trade_records(
return records
async def update_ledger(
@cm
def open_ledger(
acctid: str,
trade_entries: list[dict[str, Any]],
) -> list[pp.Transaction]:
'''
Write recent session's trades to the user's (local) ledger file.
# write recent session's trades to the user's (local) ledger file.
'''
with pp.open_trade_ledger(
'kraken',
acctid,
) as ledger:
ledger.update(trade_entries)
# normalize to transaction form
records = norm_trade_records(trade_entries)
return records
# normalize to transaction form
records = norm_trade_records(trade_entries)
yield records
# update on exit
ledger.update(trade_entries)

View File

@ -31,7 +31,6 @@ import time
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from pydantic import BaseModel
from trio_typing import TaskStatus
import tractor
import trio
@ -45,6 +44,7 @@ from piker.brokers._util import (
)
from piker.log import get_console_log
from piker.data import ShmArray
from piker.data.types import Struct
from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log
from .api import (
@ -54,7 +54,7 @@ from .api import (
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(BaseModel):
class Pair(Struct):
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component
@ -117,9 +117,8 @@ async def stream_messages(
too_slow_count = 0
continue
if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
match msg:
case {'event': 'heartbeat'}:
now = time.time()
delay = now - last_hb
last_hb = now
@ -130,11 +129,20 @@ async def stream_messages(
continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else:
yield msg
case {
'connectionID': _,
'event': 'systemStatus',
'status': 'online',
'version': _,
} as msg:
log.info(
'WS connection is up:\n'
f'{msg}'
)
continue
case _:
yield msg
async def process_data_feed_msgs(
@ -145,37 +153,60 @@ async def process_data_feed_msgs(
'''
async for msg in stream_messages(ws):
match msg:
case {
'errorMessage': errmsg
}:
raise BrokerError(errmsg)
chan_id, *payload_array, chan_name, pair = msg
case {
'event': 'subscriptionStatus',
} as sub:
log.info(
'WS subscription is active:\n'
f'{sub}'
)
continue
if 'ohlc' in chan_name:
case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(
chan_id,
chan_name,
pair,
*payload_array[0]
)
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
elif 'spread' in chan_name:
elif 'spread' in chan_name:
bid, ask, ts, bsize, asize = map(
float, payload_array[0])
bid, ask, ts, bsize, asize = map(float, payload_array[0])
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
case _:
print(f'UNHANDLED MSG: {msg}')
# yield msg
def normalize(
@ -316,7 +347,7 @@ async def stream_quotes(
sym = sym.upper()
si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.dict()
syminfo = si.to_dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto'
@ -385,7 +416,7 @@ async def stream_quotes(
msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__()
typ, ohlc_last = await anext(msg_gen)
topic, quote = normalize(ohlc_last)

View File

@ -22,10 +22,9 @@ from enum import Enum
from typing import Optional
from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position
@ -41,33 +40,30 @@ SizeUnit = Enum(
)
class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
class Allocator(Struct):
symbol: Symbol
account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage.
size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
_size_unit: str = 'currency'
@validator('size_unit', pre=True)
def maybe_lookup_key(cls, v):
# apply the corresponding enum key for the text "description" value
@property
def size_unit(self) -> str:
return self._size_unit
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units:
return _size_units.inverse[v]
v = _size_units.inverse[v]
assert v in _size_units
self._size_unit = v
return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -262,7 +258,7 @@ def mk_allocator(
# default allocation settings
defaults: dict[str, float] = {
'account': None, # select paper by default
'size_unit': 'currency',
# 'size_unit': 'currency',
'units_limit': 400,
'currency_limit': 5e3,
'slots': 4,
@ -301,6 +297,9 @@ def mk_allocator(
# entry step 1.0
alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send(
self,
msg: Order,
msg: Order | dict,
) -> dict:
self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict())
self._to_ems.send_nowait(msg)
return msg
def update(
@ -73,9 +73,8 @@ class OrderBook:
) -> dict:
cmd = self._sent_orders[uuid]
msg = cmd.dict()
msg.update(data)
self._sent_orders[uuid] = Order(**msg)
msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg)
return cmd
@ -88,7 +87,7 @@ class OrderBook:
oid=uuid,
symbol=cmd.symbol,
)
self._to_ems.send_nowait(msg.dict())
self._to_ems.send_nowait(msg)
_orders: OrderBook = None
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd['symbol'] == symbol_key:
if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)

View File

@ -20,12 +20,12 @@ In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
from typing import AsyncIterator, Callable
from bidict import bidict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
@ -33,6 +33,7 @@ import tractor
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed
from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
@ -87,7 +88,8 @@ def mk_check(
@dataclass
class _DarkBook:
'''EMS-trigger execution book.
'''
EMS-trigger execution book.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
@ -230,7 +232,7 @@ async def clear_dark_triggers(
price=submit_price,
size=cmd['size'],
)
await brokerd_orders_stream.send(msg.dict())
await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order
# request. the entry will be replaced once the
@ -246,14 +248,11 @@ async def clear_dark_triggers(
msg = Status(
oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(),
symbol=fqsn,
resp=resp,
trigger_price=price,
broker_details={'name': broker},
cmd=cmd, # original request message
).dict()
brokerd_msg=cmd,
)
# remove exec-condition from set
log.info(f'removing pred for {oid}')
@ -302,7 +301,7 @@ class TradesRelay:
consumers: int = 0
class Router(BaseModel):
class Router(Struct):
'''
Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
@ -323,10 +322,6 @@ class Router(BaseModel):
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book(
self,
brokername: str,
@ -580,11 +575,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict()
pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker']
sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault(
# NOTE: translate to a FQSN!
@ -651,6 +646,13 @@ async def translate_and_relay_brokerd_events(
else:
# check for existing live flow entry
entry = book._ems_entries.get(oid)
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# initial response to brokerd order request
if name == 'ack':
@ -661,6 +663,10 @@ async def translate_and_relay_brokerd_events(
# a ``BrokerdOrderAck`` **must** be sent after an order
# request in order to establish this id mapping.
book._ems2brokerd_ids[oid] = reqid
log.info(
'Rx ACK for order\n'
f'oid: {oid} -> reqid: {reqid}'
)
# new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases:
@ -675,7 +681,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict())
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# our book -> registered as live flow
@ -715,7 +721,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel
resp = 'broker_errored'
broker_details = msg.dict()
broker_details = msg
# don't relay message to order requester client
# continue
@ -750,7 +756,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict()
broker_details = msg
elif name in (
'fill',
@ -759,7 +765,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s)
resp = 'broker_filled'
broker_details = msg.dict()
broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -777,7 +783,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
)
except KeyError:
log.error(
@ -849,7 +855,7 @@ async def process_client_order_cmds(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg.dict())
await brokerd_order_stream.send(msg)
else:
# this might be a cancel for an order that hasn't been
@ -871,7 +877,7 @@ async def process_client_order_cmds(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
).dict()
)
)
# de-register this client dialogue
router.dialogues.pop(oid)
@ -926,7 +932,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to
# the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict())
await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
@ -951,6 +957,12 @@ async def process_client_order_cmds(
# like every other shitty tina platform that makes
# the user choose the predicate operator.
last = dark_book.lasts[fqsn]
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.shm.array[-1]['close']
pred = mk_check(trigger_price, last, action)
spread_slap: float = 5
@ -1000,7 +1012,7 @@ async def process_client_order_cmds(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
).dict()
)
)
@ -1138,8 +1150,14 @@ async def _emsd_main(
)
finally:
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
# try to remove client from "registry"
try:
_router.clients.remove(ems_client_order_stream)
except KeyError:
log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}'
' was already dropped?'
)
dialogues = _router.dialogues

View File

@ -20,16 +20,15 @@ Clearing system messagingn types and protocols.
"""
from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol
from ..data.types import Struct
# --------------
# Client -> emsd
# --------------
class Cancel(BaseModel):
class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
@ -39,7 +38,7 @@ class Cancel(BaseModel):
symbol: str
class Order(BaseModel):
class Order(Struct):
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
@ -59,20 +58,14 @@ class Order(BaseModel):
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd
# --------------
# update msgs from ems which relay state change info
# from the active clearing engine.
class Status(BaseModel):
class Status(Struct):
name: str = 'status'
oid: str # uuid4
@ -95,8 +88,6 @@ class Status(BaseModel):
# }
resp: str # "response", see above
# symbol: str
# trigger info
trigger_price: Optional[float] = None
# price: float
@ -111,10 +102,12 @@ class Status(BaseModel):
brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(BaseModel):
class BrokerdCancel(Struct):
action: str = 'cancel'
oid: str # piker emsd order id
@ -130,7 +123,7 @@ class BrokerdCancel(BaseModel):
reqid: Optional[Union[int, str]] = None
class BrokerdOrder(BaseModel):
class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str
@ -150,11 +143,12 @@ class BrokerdOrder(BaseModel):
size: float
# ---------------
# emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend
class BrokerdOrderAck(BaseModel):
class BrokerdOrderAck(Struct):
'''
Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this
@ -172,7 +166,7 @@ class BrokerdOrderAck(BaseModel):
account: str = ''
class BrokerdStatus(BaseModel):
class BrokerdStatus(Struct):
name: str = 'status'
reqid: Union[int, str]
@ -205,7 +199,7 @@ class BrokerdStatus(BaseModel):
}
class BrokerdFill(BaseModel):
class BrokerdFill(Struct):
'''
A single message indicating a "fill-details" event from the broker
if avaiable.
@ -230,7 +224,7 @@ class BrokerdFill(BaseModel):
broker_time: float
class BrokerdError(BaseModel):
class BrokerdError(Struct):
'''
Optional error type that can be relayed to emsd for error handling.
@ -249,7 +243,7 @@ class BrokerdError(BaseModel):
broker_details: dict = {}
class BrokerdPosition(BaseModel):
class BrokerdPosition(Struct):
'''Position update event from brokerd.
'''

View File

@ -30,8 +30,8 @@ import trio
import tractor
from dataclasses import dataclass
from .. import data
from ..data._source import Symbol
from ..data.feed import open_feed
from ..pp import Position
from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger',
remaining=size,
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill
if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker,
time_ns=time.time_ns(),
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
async def fake_fill(
self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper',
},
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker,
},
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
# lookup any existing position
token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict())
await self.ems_trades_stream.send(pp_msg)
async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?',
).dict())
))
continue
# validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id
reqid=reqid,
).dict()
)
)
elif action == 'cancel':
@ -441,14 +441,11 @@ async def trades_dialogue(
) -> None:
tractor.log.get_console_log(loglevel)
async with (
async with open_feed(
[fqsn],
loglevel=loglevel,
) as feed:
data.open_feed(
[fqsn],
loglevel=loglevel,
) as feed,
):
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)

View File

@ -30,19 +30,19 @@ from ._sharedmem import (
get_shm_token,
ShmArray,
)
from .feed import (
open_feed,
_setup_persistent_brokerd,
)
# from .feed import (
# # open_feed,
# _setup_persistent_brokerd,
# )
__all__ = [
'open_feed',
# 'open_feed',
'ShmArray',
'iterticks',
'maybe_open_shm_array',
'attach_shm_array',
'open_shm_array',
'get_shm_token',
'_setup_persistent_brokerd',
# '_setup_persistent_brokerd',
]

View File

@ -37,6 +37,7 @@ from docker.models.containers import Container as DockerContainer
from docker.errors import (
DockerException,
APIError,
ContainerError,
)
from requests.exceptions import ConnectionError, ReadTimeout
@ -50,10 +51,6 @@ class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh'
class ContainerError(RuntimeError):
'Error reported via app-container logging level'
@acm
async def open_docker(
url: Optional[str] = None,
@ -96,9 +93,9 @@ async def open_docker(
# not perms?
raise
finally:
if client:
client.close()
# finally:
# if client:
# client.close()
class Container:
@ -185,6 +182,21 @@ class Container:
if 'is not running' in err.explanation:
return False
def hard_kill(self, start: float) -> None:
delay = time.time() - start
log.error(
f'Failed to kill container {self.cntr.id} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
condition='not-running',
)
async def cancel(
self,
stop_msg: str,
@ -231,21 +243,9 @@ class Container:
ConnectionError,
):
log.exception('Docker connection failure')
break
self.hard_kill(start)
else:
delay = time.time() - start
log.error(
f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
condition='not-running',
)
self.hard_kill(start)
log.cancel(f'Container stopped: {cid}')

View File

@ -27,13 +27,14 @@ from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
if _USE_POSIX:
from _posixshmem import shm_unlink
import tractor
# import msgspec
import numpy as np
from pydantic import BaseModel
from numpy.lib import recfunctions as rfn
import tractor
from ..log import get_logger
from ._source import base_iohlc_dtype
from .types import Struct
log = get_logger(__name__)
@ -107,15 +108,12 @@ class SharedInt:
log.warning(f'Shm for {name} already unlinked?')
class _Token(BaseModel):
class _Token(Struct, frozen=True):
'''
Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry.
'''
class Config:
frozen = True
shm_name: str # this servers as a "key" value
shm_first_index_name: str
shm_last_index_name: str
@ -126,17 +124,25 @@ class _Token(BaseModel):
return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self):
return self.dict()
return self.to_dict()
@classmethod
def from_msg(cls, msg: dict) -> _Token:
# TODO: native struct decoding
# return _token_dec.decode(msg)
if isinstance(msg, _Token):
return msg
# assert 0
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)
# _token_dec = msgspec.msgpack.Decoder(_Token)
# TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', )
@ -167,7 +173,7 @@ def _make_token(
shm_name=key,
shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last",
dtype_descr=np.dtype(dtype).descr
dtype_descr=tuple(np.dtype(dtype).descr)
)

View File

@ -42,7 +42,6 @@ from trio_typing import TaskStatus
import trimeter
import tractor
from tractor.trionics import maybe_open_context
from pydantic import BaseModel
import pendulum
import numpy as np
@ -59,6 +58,7 @@ from ._sharedmem import (
ShmArray,
)
from .ingest import get_ingestormod
from .types import Struct
from ._source import (
base_iohlc_dtype,
Symbol,
@ -84,7 +84,7 @@ if TYPE_CHECKING:
log = get_logger(__name__)
class _FeedsBus(BaseModel):
class _FeedsBus(Struct):
'''
Data feeds broadcaster and persistence management.
@ -100,10 +100,6 @@ class _FeedsBus(BaseModel):
a dedicated cancel scope.
'''
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
brokername: str
nursery: trio.Nursery
feeds: dict[str, tuple[dict, dict]] = {}

View File

@ -0,0 +1,68 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -21,7 +21,6 @@ Qt event proxying and processing using ``trio`` mem chans.
from contextlib import asynccontextmanager, AsyncExitStack
from typing import Callable
from pydantic import BaseModel
import trio
from PyQt5 import QtCore
from PyQt5.QtCore import QEvent, pyqtBoundSignal
@ -30,6 +29,8 @@ from PyQt5.QtWidgets import (
QGraphicsSceneMouseEvent as gs_mouse,
)
from ..data.types import Struct
MOUSE_EVENTS = {
gs_mouse.GraphicsSceneMousePress,
@ -43,13 +44,10 @@ MOUSE_EVENTS = {
# TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(BaseModel):
class KeyboardMsg(Struct):
'''Unpacked Qt keyboard event data.
'''
class Config:
arbitrary_types_allowed = True
event: QEvent
etype: int
key: int
@ -57,16 +55,13 @@ class KeyboardMsg(BaseModel):
txt: str
def to_tuple(self) -> tuple:
return tuple(self.dict().values())
return tuple(self.to_dict().values())
class MouseMsg(BaseModel):
class MouseMsg(Struct):
'''Unpacked Qt keyboard event data.
'''
class Config:
arbitrary_types_allowed = True
event: QEvent
etype: int
button: int

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D;
# width: 10px;
self.setRange(0, slots)
self.setRange(0, int(slots))
self.setValue(value)

View File

@ -22,12 +22,9 @@ from __future__ import annotations
from typing import (
Optional, Generic,
TypeVar, Callable,
Literal,
)
import enum
import sys
from pydantic import BaseModel, validator
# from pydantic import BaseModel, validator
from pydantic.generics import GenericModel
from PyQt5.QtWidgets import (
QWidget,
@ -38,6 +35,7 @@ from ._forms import (
# FontScaledDelegate,
Edit,
)
from ..data.types import Struct
DataType = TypeVar('DataType')
@ -62,7 +60,7 @@ class Selection(Field[DataType], Generic[DataType]):
options: dict[str, DataType]
# value: DataType = None
@validator('value') # , always=True)
# @validator('value') # , always=True)
def set_value_first(
cls,
@ -100,7 +98,7 @@ class Edit(Field[DataType], Generic[DataType]):
widget_factory = Edit
class AllocatorPane(BaseModel):
class AllocatorPane(Struct):
account = Selection[str](
options=dict.fromkeys(

View File

@ -27,7 +27,6 @@ import time
from typing import Optional, Dict, Callable, Any
import uuid
from pydantic import BaseModel
import tractor
import trio
from PyQt5.QtCore import Qt
@ -41,6 +40,7 @@ from ..clearing._allocate import (
from ._style import _font
from ..data._source import Symbol
from ..data.feed import Feed
from ..data.types import Struct
from ..log import get_logger
from ._editors import LineEditor, ArrowEditor
from ._lines import order_line, LevelLine
@ -58,7 +58,7 @@ from ._forms import open_form_input_handling
log = get_logger(__name__)
class OrderDialog(BaseModel):
class OrderDialog(Struct):
'''
Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart.
@ -73,10 +73,6 @@ class OrderDialog(BaseModel):
msgs: dict[str, dict] = {}
fills: Dict[str, Any] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def on_level_change_update_next_order_info(
@ -268,7 +264,8 @@ class OrderMode:
self,
) -> OrderDialog:
'''Send execution order to EMS return a level line to
'''
Send execution order to EMS return a level line to
represent the order on a chart.
'''
@ -277,13 +274,9 @@ class OrderMode:
oid = str(uuid.uuid4())
# format order data for ems
fqsn = symbol.front_fqsn()
order = staged.copy(
update={
'symbol': fqsn,
'oid': oid,
}
)
order = staged.copy()
order.oid = oid
order.symbol = symbol.front_fqsn()
line = self.line_from_order(
order,
@ -858,7 +851,9 @@ async def process_trades_and_update_ui(
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}')
log.warning(
f'Order {oid} failed with:\n{pformat(broker_msg)}'
)
elif resp in (
'dark_triggered'