Merge pull request #353 from pikers/drop_pydantic

Drop `pydantic`
paper_eng_msg_fixes
goodboy 2022-07-09 14:15:50 -04:00 committed by GitHub
commit 37f634a2ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 223 additions and 173 deletions

View File

@ -22,10 +22,10 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from collections import defaultdict from collections import defaultdict
from pydantic import BaseModel from msgspec import Struct
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log from .log import get_logger, get_console_log
from .brokers import get_brokermod from .brokers import get_brokermod
@ -47,16 +47,13 @@ _root_modules = [
] ]
class Services(BaseModel): class Services(Struct):
actor_n: tractor._supervise.ActorNursery actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config:
arbitrary_types_allowed = True
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,

View File

@ -34,13 +34,13 @@ from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto import wsproto
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__) log = get_logger(__name__)
@ -79,12 +79,14 @@ _show_wap_in_history = False
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information # https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel): class Pair(Struct, frozen=True):
symbol: str symbol: str
status: str status: str
baseAsset: str baseAsset: str
baseAssetPrecision: int baseAssetPrecision: int
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAsset: str quoteAsset: str
quotePrecision: int quotePrecision: int
quoteAssetPrecision: int quoteAssetPrecision: int
@ -287,7 +289,7 @@ async def get_client() -> Client:
# validation type # validation type
class AggTrade(BaseModel): class AggTrade(Struct):
e: str # Event type e: str # Event type
E: int # Event time E: int # Event time
s: str # Symbol s: str # Symbol
@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
elif msg.get('e') == 'aggTrade': elif msg.get('e') == 'aggTrade':
# validate # NOTE: this is purely for a definition, ``msgspec.Struct``
# does not runtime-validate until you decode/encode.
# see: https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg) msg = AggTrade(**msg)
# TODO: type out and require this quote format # TODO: type out and require this quote format
@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
'brokerd_ts': time.time(), 'brokerd_ts': time.time(),
'ticks': [{ 'ticks': [{
'type': 'trade', 'type': 'trade',
'price': msg.p, 'price': float(msg.p),
'size': msg.q, 'size': float(msg.q),
'broker_ts': msg.T, 'broker_ts': msg.T,
}], }],
} }
@ -448,7 +452,7 @@ async def stream_quotes(
d = cache[sym.upper()] d = cache[sym.upper()]
syminfo = Pair(**d) # validation syminfo = Pair(**d) # validation
si = sym_infos[sym] = syminfo.dict() si = sym_infos[sym] = syminfo.to_dict()
# XXX: after manually inspecting the response format we # XXX: after manually inspecting the response format we
# just directly pick out the info we need # just directly pick out the info we need

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
).dict()) ))
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?', reason=f'No api client loaded for account: `{account}` ?',
).dict()) ))
continue continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason='Order already active?', reason='Order already active?',
).dict()) ))
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(),
account=account, account=account,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':
@ -559,7 +558,7 @@ async def trades_dialogue(
cids2pps, cids2pps,
validate=True, validate=True,
) )
all_positions.extend(msg.dict() for msg in msgs) all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0] msg = msgs[0]
break break
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
async def deliver_trade_events( async def deliver_trade_events(
@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
case 'fill': case 'fill':
@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'], broker_time=trade_entry['broker_time'],
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
# 2 cases: # 2 cases:
# - fill comes first or # - fill comes first or
@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item) cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict()) # await ems_stream.send(msg)
case 'event': case 'event':
@ -891,7 +890,7 @@ async def deliver_trade_events(
# level... # level...
# reqid = item.get('reqid', 0) # reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1: # if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}") # log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid) # msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -32,7 +32,6 @@ from typing import (
) )
import pendulum import pendulum
from pydantic import BaseModel
import trio import trio
import tractor import tractor
import wsproto import wsproto
@ -47,6 +46,7 @@ from piker.clearing._messages import (
BrokerdPosition, BrokerdPosition,
BrokerdStatus, BrokerdStatus,
) )
from piker.data.types import Struct
from . import log from . import log
from .api import ( from .api import (
Client, Client,
@ -62,7 +62,7 @@ from .feed import (
) )
class Trade(BaseModel): class Trade(Struct):
''' '''
Trade class that helps parse and validate ownTrades stream Trade class that helps parse and validate ownTrades stream
@ -110,7 +110,7 @@ async def handle_order_requests(
'https://github.com/pikers/piker/issues/299' 'https://github.com/pikers/piker/issues/299'
), ),
).dict()) ))
continue continue
# validate # validate
@ -136,7 +136,7 @@ async def handle_order_requests(
symbol=order.symbol, symbol=order.symbol,
reason="Failed order submission", reason="Failed order submission",
broker_details=resp broker_details=resp
).dict() )
) )
else: else:
# TODO: handle multiple orders (cancels?) # TODO: handle multiple orders (cancels?)
@ -161,7 +161,7 @@ async def handle_order_requests(
# account the made the order # account the made the order
account=order.account account=order.account
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':
@ -189,7 +189,7 @@ async def handle_order_requests(
symbol=msg.symbol, symbol=msg.symbol,
reason="Failed order cancel", reason="Failed order cancel",
broker_details=resp broker_details=resp
).dict() )
) )
if not error: if not error:
@ -217,7 +217,7 @@ async def handle_order_requests(
# cancels will eventually get cancelled # cancels will eventually get cancelled
reason="Order cancel is still pending?", reason="Order cancel is still pending?",
broker_details=resp broker_details=resp
).dict() )
) )
else: # order cancel success case. else: # order cancel success case.
@ -230,7 +230,7 @@ async def handle_order_requests(
status='cancelled', status='cancelled',
reason='Order cancelled', reason='Order cancelled',
broker_details={'name': 'kraken'} broker_details={'name': 'kraken'}
).dict() )
) )
else: else:
log.error(f'Unknown order command: {request_msg}') log.error(f'Unknown order command: {request_msg}')
@ -330,7 +330,7 @@ async def trades_dialogue(
avg_price=p.be_price, avg_price=p.be_price,
currency='', currency='',
) )
position_msgs.append(msg.dict()) position_msgs.append(msg)
await ctx.started( await ctx.started(
(position_msgs, [acc_name]) (position_msgs, [acc_name])
@ -408,7 +408,7 @@ async def trades_dialogue(
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=broker_time broker_time=broker_time
) )
await ems_stream.send(fill_msg.dict()) await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
@ -432,7 +432,7 @@ async def trades_dialogue(
# https://github.com/pikers/piker/issues/296 # https://github.com/pikers/piker/issues/296
remaining=0, remaining=0,
) )
await ems_stream.send(filled_msg.dict()) await ems_stream.send(filled_msg)
# update ledger and position tracking # update ledger and position tracking
trans = await update_ledger(acctid, trades) trans = await update_ledger(acctid, trades)
@ -469,7 +469,7 @@ async def trades_dialogue(
# TODO # TODO
# currency='' # currency=''
) )
await ems_stream.send(pp_msg.dict()) await ems_stream.send(pp_msg)
case [ case [
trades_msgs, trades_msgs,

View File

@ -31,7 +31,6 @@ import time
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import pendulum import pendulum
from pydantic import BaseModel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
import trio import trio
@ -45,6 +44,7 @@ from piker.brokers._util import (
) )
from piker.log import get_console_log from piker.log import get_console_log
from piker.data import ShmArray from piker.data import ShmArray
from piker.data.types import Struct
from piker.data._web_bs import open_autorecon_ws, NoBsWs from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log from . import log
from .api import ( from .api import (
@ -54,7 +54,7 @@ from .api import (
# https://www.kraken.com/features/api#get-tradable-pairs # https://www.kraken.com/features/api#get-tradable-pairs
class Pair(BaseModel): class Pair(Struct):
altname: str # alternate pair name altname: str # alternate pair name
wsname: str # WebSocket pair name (if available) wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component aclass_base: str # asset class of base component
@ -316,7 +316,7 @@ async def stream_quotes(
sym = sym.upper() sym = sym.upper()
si = Pair(**await client.symbol_info(sym)) # validation si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.dict() syminfo = si.to_dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto' syminfo['asset_type'] = 'crypto'

View File

@ -22,10 +22,9 @@ from enum import Enum
from typing import Optional from typing import Optional
from bidict import bidict from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position from ..pp import Position
@ -41,33 +40,30 @@ SizeUnit = Enum(
) )
class Allocator(BaseModel): class Allocator(Struct):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol symbol: Symbol
account: Optional[str] = 'paper' account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set # TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set # a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure # that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage. # unintuitive garbage.
size_unit: str = 'currency' _size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@validator('size_unit', pre=True) @property
def maybe_lookup_key(cls, v): def size_unit(self) -> str:
# apply the corresponding enum key for the text "description" value return self._size_unit
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units: if v not in _size_units:
return _size_units.inverse[v] v = _size_units.inverse[v]
assert v in _size_units assert v in _size_units
self._size_unit = v
return v return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion # TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -262,7 +258,7 @@ def mk_allocator(
# default allocation settings # default allocation settings
defaults: dict[str, float] = { defaults: dict[str, float] = {
'account': None, # select paper by default 'account': None, # select paper by default
'size_unit': 'currency', # 'size_unit': 'currency',
'units_limit': 400, 'units_limit': 400,
'currency_limit': 5e3, 'currency_limit': 5e3,
'slots': 4, 'slots': 4,
@ -301,6 +297,9 @@ def mk_allocator(
# entry step 1.0 # entry step 1.0
alloc.units_limit = alloc.slots alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit # if the current position is already greater then the limit
# settings, increase the limit to the current position # settings, increase the limit to the current position
if alloc.size_unit == 'currency': if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send( def send(
self, self,
msg: Order, msg: Order | dict,
) -> dict: ) -> dict:
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
return msg return msg
def update( def update(
@ -73,9 +73,8 @@ class OrderBook:
) -> dict: ) -> dict:
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.dict() msg = cmd.copy(update=data)
msg.update(data) self._sent_orders[uuid] = msg
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return cmd return cmd
@ -88,7 +87,7 @@ class OrderBook:
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
) )
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
_orders: OrderBook = None _orders: OrderBook = None
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd['symbol'] == symbol_key: if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)

View File

@ -26,7 +26,6 @@ import time
from typing import AsyncIterator, Callable from typing import AsyncIterator, Callable
from bidict import bidict from bidict import bidict
from pydantic import BaseModel
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
@ -34,6 +33,7 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed from ..data.feed import Feed, maybe_open_feed
from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
@ -231,7 +231,7 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
await brokerd_orders_stream.send(msg.dict()) await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -247,14 +247,11 @@ async def clear_dark_triggers(
msg = Status( msg = Status(
oid=oid, # ems order id oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(), time_ns=time.time_ns(),
symbol=fqsn, resp=resp,
trigger_price=price, trigger_price=price,
broker_details={'name': broker}, brokerd_msg=cmd,
cmd=cmd, # original request message )
).dict()
# remove exec-condition from set # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
@ -303,7 +300,7 @@ class TradesRelay:
consumers: int = 0 consumers: int = 0
class Router(BaseModel): class Router(Struct):
''' '''
Order router which manages and tracks per-broker dark book, Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management. alerts, clearing and related data feed management.
@ -324,10 +321,6 @@ class Router(BaseModel):
# brokername to trades-dialogues streams with ``brokerd`` actors # brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {} relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book( def get_dark_book(
self, self,
brokername: str, brokername: str,
@ -581,11 +574,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict() pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker'] sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
@ -676,7 +669,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid entry.reqid = reqid
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict()) await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in # - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -716,7 +709,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel # if 10147 in message: cancel
resp = 'broker_errored' resp = 'broker_errored'
broker_details = msg.dict() broker_details = msg
# don't relay message to order requester client # don't relay message to order requester client
# continue # continue
@ -751,7 +744,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field # pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict() broker_details = msg
elif name in ( elif name in (
'fill', 'fill',
@ -760,7 +753,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
resp = 'broker_filled' resp = 'broker_filled'
broker_details = msg.dict() broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -778,7 +771,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=broker_details,
).dict() )
) )
except KeyError: except KeyError:
log.error( log.error(
@ -843,14 +836,14 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
if reqid: if reqid is not None:
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
@ -872,7 +865,7 @@ async def process_client_order_cmds(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
@ -927,7 +920,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()`` # with ems order id from the ``trades_dialogue()``
@ -1007,7 +1000,7 @@ async def process_client_order_cmds(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0) # Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -15,21 +15,26 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Clearing system messagingn types and protocols. Clearing sub-system message and protocols.
""" """
from typing import Optional, Union from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd # Client -> emsd
# --------------
class Cancel(Struct):
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or '''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order. broker-submitted (live) trigger/order.
@ -39,8 +44,10 @@ class Cancel(BaseModel):
symbol: str symbol: str
class Order(BaseModel): class Order(Struct):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'} action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id" # internal ``emdsd`` unique "order id"
oid: str # uuid4 oid: str # uuid4
@ -48,6 +55,9 @@ class Order(BaseModel):
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float size: float
brokers: list[str] brokers: list[str]
@ -59,20 +69,14 @@ class Order(BaseModel):
# the backend broker # the backend broker
exec_mode: str # {'dark', 'live', 'paper'} exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd # Client <- emsd
# --------------
# update msgs from ems which relay state change info # update msgs from ems which relay state change info
# from the active clearing engine. # from the active clearing engine.
class Status(Struct):
class Status(BaseModel):
name: str = 'status' name: str = 'status'
oid: str # uuid4 oid: str # uuid4
@ -95,8 +99,6 @@ class Status(BaseModel):
# } # }
resp: str # "response", see above resp: str # "response", see above
# symbol: str
# trigger info # trigger info
trigger_price: Optional[float] = None trigger_price: Optional[float] = None
# price: float # price: float
@ -111,10 +113,12 @@ class Status(BaseModel):
brokerd_msg: dict = {} brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd # emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon # requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(BaseModel): class BrokerdCancel(Struct):
action: str = 'cancel' action: str = 'cancel'
oid: str # piker emsd order id oid: str # piker emsd order id
@ -130,7 +134,7 @@ class BrokerdCancel(BaseModel):
reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
class BrokerdOrder(BaseModel): class BrokerdOrder(Struct):
action: str # {buy, sell} action: str # {buy, sell}
oid: str oid: str
@ -150,11 +154,12 @@ class BrokerdOrder(BaseModel):
size: float size: float
# ---------------
# emsd <- brokerd # emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend # requests *received* to ems from broker backend
class BrokerdOrderAck(Struct):
class BrokerdOrderAck(BaseModel):
''' '''
Immediate reponse to a brokerd order request providing the broker Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this specific unique order id so that the EMS can associate this
@ -172,7 +177,7 @@ class BrokerdOrderAck(BaseModel):
account: str = '' account: str = ''
class BrokerdStatus(BaseModel): class BrokerdStatus(Struct):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: Union[int, str]
@ -205,7 +210,7 @@ class BrokerdStatus(BaseModel):
} }
class BrokerdFill(BaseModel): class BrokerdFill(Struct):
''' '''
A single message indicating a "fill-details" event from the broker A single message indicating a "fill-details" event from the broker
if avaiable. if avaiable.
@ -230,7 +235,7 @@ class BrokerdFill(BaseModel):
broker_time: float broker_time: float
class BrokerdError(BaseModel): class BrokerdError(Struct):
''' '''
Optional error type that can be relayed to emsd for error handling. Optional error type that can be relayed to emsd for error handling.
@ -249,7 +254,7 @@ class BrokerdError(BaseModel):
broker_details: dict = {} broker_details: dict = {}
class BrokerdPosition(BaseModel): class BrokerdPosition(Struct):
'''Position update event from brokerd. '''Position update event from brokerd.
''' '''

View File

@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger', reason='paper_trigger',
remaining=size, remaining=size,
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill # if we're already a clearing price simulate an immediate fill
if ( if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker, broker=self.broker,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
async def fake_fill( async def fake_fill(
self, self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
if order_complete: if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker, 'name': self.broker,
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
) )
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict()) await self.ems_trades_stream.send(pp_msg)
async def simulate_fills( async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
).dict()) ))
continue continue
# validate # validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':

View File

@ -27,13 +27,14 @@ from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
if _USE_POSIX: if _USE_POSIX:
from _posixshmem import shm_unlink from _posixshmem import shm_unlink
import tractor # import msgspec
import numpy as np import numpy as np
from pydantic import BaseModel
from numpy.lib import recfunctions as rfn from numpy.lib import recfunctions as rfn
import tractor
from ..log import get_logger from ..log import get_logger
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
from .types import Struct
log = get_logger(__name__) log = get_logger(__name__)
@ -107,15 +108,12 @@ class SharedInt:
log.warning(f'Shm for {name} already unlinked?') log.warning(f'Shm for {name} already unlinked?')
class _Token(BaseModel): class _Token(Struct, frozen=True):
''' '''
Internal represenation of a shared memory "token" Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry. which can be used to key a system wide post shm entry.
''' '''
class Config:
frozen = True
shm_name: str # this servers as a "key" value shm_name: str # this servers as a "key" value
shm_first_index_name: str shm_first_index_name: str
shm_last_index_name: str shm_last_index_name: str
@ -126,17 +124,22 @@ class _Token(BaseModel):
return np.dtype(list(map(tuple, self.dtype_descr))).descr return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self): def as_msg(self):
return self.dict() return self.to_dict()
@classmethod @classmethod
def from_msg(cls, msg: dict) -> _Token: def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token): if isinstance(msg, _Token):
return msg return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg) return _Token(**msg)
# _token_dec = msgspec.msgpack.Decoder(_Token)
# TODO: this api? # TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {}) # _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', ) # _known_tokens = tractor.ContextStack('_known_tokens', )
@ -167,7 +170,7 @@ def _make_token(
shm_name=key, shm_name=key,
shm_first_index_name=key + "_first", shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last", shm_last_index_name=key + "_last",
dtype_descr=np.dtype(dtype).descr dtype_descr=tuple(np.dtype(dtype).descr)
) )

View File

@ -42,7 +42,6 @@ from trio_typing import TaskStatus
import trimeter import trimeter
import tractor import tractor
from tractor.trionics import maybe_open_context from tractor.trionics import maybe_open_context
from pydantic import BaseModel
import pendulum import pendulum
import numpy as np import numpy as np
@ -59,6 +58,7 @@ from ._sharedmem import (
ShmArray, ShmArray,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from .types import Struct
from ._source import ( from ._source import (
base_iohlc_dtype, base_iohlc_dtype,
Symbol, Symbol,
@ -84,7 +84,7 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
class _FeedsBus(BaseModel): class _FeedsBus(Struct):
''' '''
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
@ -100,10 +100,6 @@ class _FeedsBus(BaseModel):
a dedicated cancel scope. a dedicated cancel scope.
''' '''
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
brokername: str brokername: str
nursery: trio.Nursery nursery: trio.Nursery
feeds: dict[str, tuple[dict, dict]] = {} feeds: dict[str, tuple[dict, dict]] = {}

View File

@ -0,0 +1,68 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -21,7 +21,6 @@ Qt event proxying and processing using ``trio`` mem chans.
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from typing import Callable from typing import Callable
from pydantic import BaseModel
import trio import trio
from PyQt5 import QtCore from PyQt5 import QtCore
from PyQt5.QtCore import QEvent, pyqtBoundSignal from PyQt5.QtCore import QEvent, pyqtBoundSignal
@ -30,6 +29,8 @@ from PyQt5.QtWidgets import (
QGraphicsSceneMouseEvent as gs_mouse, QGraphicsSceneMouseEvent as gs_mouse,
) )
from ..data.types import Struct
MOUSE_EVENTS = { MOUSE_EVENTS = {
gs_mouse.GraphicsSceneMousePress, gs_mouse.GraphicsSceneMousePress,
@ -43,13 +44,10 @@ MOUSE_EVENTS = {
# TODO: maybe consider some constrained ints down the road? # TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types # https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(BaseModel): class KeyboardMsg(Struct):
'''Unpacked Qt keyboard event data. '''Unpacked Qt keyboard event data.
''' '''
class Config:
arbitrary_types_allowed = True
event: QEvent event: QEvent
etype: int etype: int
key: int key: int
@ -57,16 +55,13 @@ class KeyboardMsg(BaseModel):
txt: str txt: str
def to_tuple(self) -> tuple: def to_tuple(self) -> tuple:
return tuple(self.dict().values()) return tuple(self.to_dict().values())
class MouseMsg(BaseModel): class MouseMsg(Struct):
'''Unpacked Qt keyboard event data. '''Unpacked Qt keyboard event data.
''' '''
class Config:
arbitrary_types_allowed = True
event: QEvent event: QEvent
etype: int etype: int
button: int button: int

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D; # color: #19232D;
# width: 10px; # width: 10px;
self.setRange(0, slots) self.setRange(0, int(slots))
self.setValue(value) self.setValue(value)

View File

@ -22,12 +22,9 @@ from __future__ import annotations
from typing import ( from typing import (
Optional, Generic, Optional, Generic,
TypeVar, Callable, TypeVar, Callable,
Literal,
) )
import enum
import sys
from pydantic import BaseModel, validator # from pydantic import BaseModel, validator
from pydantic.generics import GenericModel from pydantic.generics import GenericModel
from PyQt5.QtWidgets import ( from PyQt5.QtWidgets import (
QWidget, QWidget,
@ -38,6 +35,7 @@ from ._forms import (
# FontScaledDelegate, # FontScaledDelegate,
Edit, Edit,
) )
from ..data.types import Struct
DataType = TypeVar('DataType') DataType = TypeVar('DataType')
@ -62,7 +60,7 @@ class Selection(Field[DataType], Generic[DataType]):
options: dict[str, DataType] options: dict[str, DataType]
# value: DataType = None # value: DataType = None
@validator('value') # , always=True) # @validator('value') # , always=True)
def set_value_first( def set_value_first(
cls, cls,
@ -100,7 +98,7 @@ class Edit(Field[DataType], Generic[DataType]):
widget_factory = Edit widget_factory = Edit
class AllocatorPane(BaseModel): class AllocatorPane(Struct):
account = Selection[str]( account = Selection[str](
options=dict.fromkeys( options=dict.fromkeys(

View File

@ -27,7 +27,6 @@ import time
from typing import Optional, Dict, Callable, Any from typing import Optional, Dict, Callable, Any
import uuid import uuid
from pydantic import BaseModel
import tractor import tractor
import trio import trio
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
@ -41,6 +40,7 @@ from ..clearing._allocate import (
from ._style import _font from ._style import _font
from ..data._source import Symbol from ..data._source import Symbol
from ..data.feed import Feed from ..data.feed import Feed
from ..data.types import Struct
from ..log import get_logger from ..log import get_logger
from ._editors import LineEditor, ArrowEditor from ._editors import LineEditor, ArrowEditor
from ._lines import order_line, LevelLine from ._lines import order_line, LevelLine
@ -58,7 +58,7 @@ from ._forms import open_form_input_handling
log = get_logger(__name__) log = get_logger(__name__)
class OrderDialog(BaseModel): class OrderDialog(Struct):
''' '''
Trade dialogue meta-data describing the lifetime Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart. of an order submission to ``emsd`` from a chart.
@ -73,10 +73,6 @@ class OrderDialog(BaseModel):
msgs: dict[str, dict] = {} msgs: dict[str, dict] = {}
fills: Dict[str, Any] = {} fills: Dict[str, Any] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def on_level_change_update_next_order_info( def on_level_change_update_next_order_info(
@ -268,7 +264,8 @@ class OrderMode:
self, self,
) -> OrderDialog: ) -> OrderDialog:
'''Send execution order to EMS return a level line to '''
Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
@ -277,13 +274,9 @@ class OrderMode:
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
fqsn = symbol.front_fqsn() order = staged.copy()
order = staged.copy( order.oid = oid
update={ order.symbol = symbol.front_fqsn()
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order( line = self.line_from_order(
order, order,
@ -858,7 +851,9 @@ async def process_trades_and_update_ui(
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
broker_msg = msg['brokerd_msg'] broker_msg = msg['brokerd_msg']
log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}') log.warning(
f'Order {oid} failed with:\n{pformat(broker_msg)}'
)
elif resp in ( elif resp in (
'dark_triggered' 'dark_triggered'

View File

@ -47,12 +47,11 @@ setup(
'attrs', 'attrs',
'pygments', 'pygments',
'colorama', # numba traceback coloring 'colorama', # numba traceback coloring
'pydantic', # structured data 'msgspec', # performant IPC messaging and structs
# async # async
'trio', 'trio',
'trio-websocket', 'trio-websocket',
'msgspec', # performant IPC messaging
'async_generator', 'async_generator',
# from github currently (see requirements.txt) # from github currently (see requirements.txt)