Compare commits

..

No commits in common. "869aa8251a9d5f26477c75b8630b663aa1e397b3" and "861826dd7bdd2967b2fb0e75251759c43dbfb97a" have entirely different histories.

14 changed files with 162 additions and 231 deletions

View File

@ -22,13 +22,13 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from collections import defaultdict from collections import defaultdict
from msgspec import Struct
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from .log import get_logger, get_console_log from .log import get_logger, get_console_log
from .brokers import get_brokermod from .brokers import get_brokermod
from .data.types import Struct
log = get_logger(__name__) log = get_logger(__name__)
@ -204,9 +204,6 @@ async def open_piker_runtime(
assert _services is None assert _services is None
# XXX: this may open a root actor as well # XXX: this may open a root actor as well
with tractor.msg.configure_native_msgs(
[Struct],
):
async with ( async with (
tractor.open_root_actor( tractor.open_root_actor(
@ -263,10 +260,6 @@ async def maybe_open_pikerd(
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
# XXX: this may open a root actor as well
with tractor.msg.configure_native_msgs(
[Struct],
):
# subtle, we must have the runtime up here or portal lookup will fail # subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs): async with maybe_open_runtime(loglevel, **kwargs):
@ -449,7 +442,7 @@ async def spawn_brokerd(
) )
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
from .data.feed import _setup_persistent_brokerd from .data import _setup_persistent_brokerd
await _services.start_service_task( await _services.start_service_task(
dname, dname,

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
)) ).dict())
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?', reason=f'No api client loaded for account: `{account}` ?',
)) ).dict())
continue continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason='Order already active?', reason='Order already active?',
)) ).dict())
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -197,8 +197,9 @@ async def handle_order_requests(
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(),
account=account, account=account,
) ).dict()
) )
elif action == 'cancel': elif action == 'cancel':
@ -558,7 +559,7 @@ async def trades_dialogue(
cids2pps, cids2pps,
validate=True, validate=True,
) )
all_positions.extend(msg for msg in msgs) all_positions.extend(msg.dict() for msg in msgs)
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
@ -664,7 +665,7 @@ async def emit_pp_update(
msg = msgs[0] msg = msgs[0]
break break
await ems_stream.send(msg) await ems_stream.send(msg.dict())
async def deliver_trade_events( async def deliver_trade_events(
@ -742,7 +743,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg) await ems_stream.send(msg.dict())
case 'fill': case 'fill':
@ -802,7 +803,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'], broker_time=trade_entry['broker_time'],
) )
await ems_stream.send(msg) await ems_stream.send(msg.dict())
# 2 cases: # 2 cases:
# - fill comes first or # - fill comes first or
@ -878,7 +879,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item) cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg) # await ems_stream.send(msg.dict())
case 'event': case 'event':
@ -890,7 +891,7 @@ async def deliver_trade_events(
# level... # level...
# reqid = item.get('reqid', 0) # reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1: # if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg)}") # log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# msg.reqid = 'tws-' + str(-1 * reqid) # msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -184,7 +184,7 @@ async def handle_order_requests(
'Invalid request msg:\n{msg}' 'Invalid request msg:\n{msg}'
), ),
) ).dict()
) )
@ -282,7 +282,7 @@ async def trades_dialogue(
avg_price=p.be_price, avg_price=p.be_price,
currency='', currency='',
) )
position_msgs.append(msg) position_msgs.append(msg.dict())
await ctx.started( await ctx.started(
(position_msgs, [acc_name]) (position_msgs, [acc_name])
@ -405,7 +405,7 @@ async def handle_order_updates(
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=broker_time broker_time=broker_time
) )
await ems_stream.send(fill_msg) await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
@ -429,7 +429,7 @@ async def handle_order_updates(
# https://github.com/pikers/piker/issues/296 # https://github.com/pikers/piker/issues/296
remaining=0, remaining=0,
) )
await ems_stream.send(filled_msg) await ems_stream.send(filled_msg.dict())
# update ledger and position tracking # update ledger and position tracking
with open_ledger(acctid, trades) as trans: with open_ledger(acctid, trades) as trans:
@ -466,7 +466,7 @@ async def handle_order_updates(
# TODO # TODO
# currency='' # currency=''
) )
await ems_stream.send(pp_msg) await ems_stream.send(pp_msg.dict())
# process and relay order state change events # process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders # https://docs.kraken.com/websockets/#message-openOrders
@ -563,7 +563,7 @@ async def handle_order_updates(
), ),
) )
msgs.append(resp) msgs.append(resp)
await ems_stream.send(resp) await ems_stream.send(resp.dict())
case _: case _:
log.warning( log.warning(
@ -603,7 +603,7 @@ async def handle_order_updates(
msgs.extend(resps) msgs.extend(resps)
for resp in resps: for resp in resps:
await ems_stream.send(resp) await ems_stream.send(resp.dict())
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')

View File

@ -22,9 +22,10 @@ from enum import Enum
from typing import Optional from typing import Optional
from bidict import bidict from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position from ..pp import Position
@ -40,30 +41,33 @@ SizeUnit = Enum(
) )
class Allocator(Struct): class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol symbol: Symbol
account: Optional[str] = 'paper' account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set # TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set # a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure # that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage. # unintuitive garbage.
_size_unit: str = 'currency' size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@property @validator('size_unit', pre=True)
def size_unit(self) -> str: def maybe_lookup_key(cls, v):
return self._size_unit # apply the corresponding enum key for the text "description" value
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units: if v not in _size_units:
v = _size_units.inverse[v] return _size_units.inverse[v]
assert v in _size_units assert v in _size_units
self._size_unit = v
return v return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion # TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -258,7 +262,7 @@ def mk_allocator(
# default allocation settings # default allocation settings
defaults: dict[str, float] = { defaults: dict[str, float] = {
'account': None, # select paper by default 'account': None, # select paper by default
# 'size_unit': 'currency', 'size_unit': 'currency',
'units_limit': 400, 'units_limit': 400,
'currency_limit': 5e3, 'currency_limit': 5e3,
'slots': 4, 'slots': 4,
@ -297,9 +301,6 @@ def mk_allocator(
# entry step 1.0 # entry step 1.0
alloc.units_limit = alloc.slots alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit # if the current position is already greater then the limit
# settings, increase the limit to the current position # settings, increase the limit to the current position
if alloc.size_unit == 'currency': if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send( def send(
self, self,
msg: Order | dict, msg: Order,
) -> dict: ) -> dict:
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg.dict())
return msg return msg
def update( def update(
@ -73,8 +73,9 @@ class OrderBook:
) -> dict: ) -> dict:
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.copy(update=data) msg = cmd.dict()
self._sent_orders[uuid] = msg msg.update(data)
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return cmd return cmd
@ -87,7 +88,7 @@ class OrderBook:
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
) )
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg.dict())
_orders: OrderBook = None _orders: OrderBook = None
@ -148,7 +149,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd.symbol == symbol_key: if cmd['symbol'] == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)

View File

@ -20,7 +20,6 @@ In da suit parlances: "Execution management systems"
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from math import isnan
from pprint import pformat from pprint import pformat
import time import time
from typing import AsyncIterator, Callable from typing import AsyncIterator, Callable
@ -232,7 +231,7 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
await brokerd_orders_stream.send(msg) await brokerd_orders_stream.send(msg.dict())
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -248,11 +247,14 @@ async def clear_dark_triggers(
msg = Status( msg = Status(
oid=oid, # ems order id oid=oid, # ems order id
time_ns=time.time_ns(),
resp=resp, resp=resp,
time_ns=time.time_ns(),
symbol=fqsn,
trigger_price=price, trigger_price=price,
brokerd_msg=cmd, broker_details={'name': broker},
) cmd=cmd, # original request message
).dict()
# remove exec-condition from set # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
@ -575,11 +577,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg) pos_msg = BrokerdPosition(**brokerd_msg).dict()
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg.symbol, pos_msg.broker sym, broker = pos_msg['symbol'], pos_msg['broker']
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
@ -681,7 +683,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid entry.reqid = reqid
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry) await brokerd_trades_stream.send(entry.dict())
# - the order is now active and will be mirrored in # - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -721,7 +723,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel # if 10147 in message: cancel
resp = 'broker_errored' resp = 'broker_errored'
broker_details = msg broker_details = msg.dict()
# don't relay message to order requester client # don't relay message to order requester client
# continue # continue
@ -756,7 +758,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field # pass the BrokerdStatus msg inside the broker details field
broker_details = msg broker_details = msg.dict()
elif name in ( elif name in (
'fill', 'fill',
@ -765,7 +767,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
resp = 'broker_filled' resp = 'broker_filled'
broker_details = msg broker_details = msg.dict()
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -783,7 +785,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=broker_details,
) ).dict()
) )
except KeyError: except KeyError:
log.error( log.error(
@ -855,7 +857,7 @@ async def process_client_order_cmds(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg.dict())
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
@ -877,7 +879,7 @@ async def process_client_order_cmds(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) ).dict()
) )
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
@ -932,7 +934,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg.dict())
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()`` # with ems order id from the ``trades_dialogue()``
@ -957,12 +959,6 @@ async def process_client_order_cmds(
# like every other shitty tina platform that makes # like every other shitty tina platform that makes
# the user choose the predicate operator. # the user choose the predicate operator.
last = dark_book.lasts[fqsn] last = dark_book.lasts[fqsn]
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.shm.array[-1]['close']
pred = mk_check(trigger_price, last, action) pred = mk_check(trigger_price, last, action)
spread_slap: float = 5 spread_slap: float = 5
@ -1012,7 +1008,7 @@ async def process_client_order_cmds(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) ).dict()
) )
@ -1150,14 +1146,8 @@ async def _emsd_main(
) )
finally: finally:
# try to remove client from "registry" # remove client from "registry"
try:
_router.clients.remove(ems_client_order_stream) _router.clients.remove(ems_client_order_stream)
except KeyError:
log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}'
' was already dropped?'
)
dialogues = _router.dialogues dialogues = _router.dialogues

View File

@ -20,15 +20,16 @@ Clearing system messagingn types and protocols.
""" """
from typing import Optional, Union from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
# --------------
# Client -> emsd # Client -> emsd
# --------------
class Cancel(Struct):
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or '''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order. broker-submitted (live) trigger/order.
@ -38,7 +39,7 @@ class Cancel(Struct):
symbol: str symbol: str
class Order(Struct): class Order(BaseModel):
action: str # {'buy', 'sell', 'alert'} action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id" # internal ``emdsd`` unique "order id"
@ -58,14 +59,20 @@ class Order(Struct):
# the backend broker # the backend broker
exec_mode: str # {'dark', 'live', 'paper'} exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd # Client <- emsd
# --------------
# update msgs from ems which relay state change info # update msgs from ems which relay state change info
# from the active clearing engine. # from the active clearing engine.
class Status(Struct):
class Status(BaseModel):
name: str = 'status' name: str = 'status'
oid: str # uuid4 oid: str # uuid4
@ -88,6 +95,8 @@ class Status(Struct):
# } # }
resp: str # "response", see above resp: str # "response", see above
# symbol: str
# trigger info # trigger info
trigger_price: Optional[float] = None trigger_price: Optional[float] = None
# price: float # price: float
@ -102,12 +111,10 @@ class Status(Struct):
brokerd_msg: dict = {} brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd # emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon # requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(Struct): class BrokerdCancel(BaseModel):
action: str = 'cancel' action: str = 'cancel'
oid: str # piker emsd order id oid: str # piker emsd order id
@ -123,7 +130,7 @@ class BrokerdCancel(Struct):
reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
class BrokerdOrder(Struct): class BrokerdOrder(BaseModel):
action: str # {buy, sell} action: str # {buy, sell}
oid: str oid: str
@ -143,12 +150,11 @@ class BrokerdOrder(Struct):
size: float size: float
# ---------------
# emsd <- brokerd # emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend # requests *received* to ems from broker backend
class BrokerdOrderAck(Struct):
class BrokerdOrderAck(BaseModel):
''' '''
Immediate reponse to a brokerd order request providing the broker Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this specific unique order id so that the EMS can associate this
@ -166,7 +172,7 @@ class BrokerdOrderAck(Struct):
account: str = '' account: str = ''
class BrokerdStatus(Struct): class BrokerdStatus(BaseModel):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: Union[int, str]
@ -199,7 +205,7 @@ class BrokerdStatus(Struct):
} }
class BrokerdFill(Struct): class BrokerdFill(BaseModel):
''' '''
A single message indicating a "fill-details" event from the broker A single message indicating a "fill-details" event from the broker
if avaiable. if avaiable.
@ -224,7 +230,7 @@ class BrokerdFill(Struct):
broker_time: float broker_time: float
class BrokerdError(Struct): class BrokerdError(BaseModel):
''' '''
Optional error type that can be relayed to emsd for error handling. Optional error type that can be relayed to emsd for error handling.
@ -243,7 +249,7 @@ class BrokerdError(Struct):
broker_details: dict = {} broker_details: dict = {}
class BrokerdPosition(Struct): class BrokerdPosition(BaseModel):
'''Position update event from brokerd. '''Position update event from brokerd.
''' '''

View File

@ -30,8 +30,8 @@ import trio
import tractor import tractor
from dataclasses import dataclass from dataclasses import dataclass
from .. import data
from ..data._source import Symbol from ..data._source import Symbol
from ..data.feed import open_feed
from ..pp import Position from ..pp import Position
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger', reason='paper_trigger',
remaining=size, remaining=size,
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
# if we're already a clearing price simulate an immediate fill # if we're already a clearing price simulate an immediate fill
if ( if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker, broker=self.broker,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
async def fake_fill( async def fake_fill(
self, self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
if order_complete: if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker, 'name': self.broker,
}, },
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
) )
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg) await self.ems_trades_stream.send(pp_msg.dict())
async def simulate_fills( async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
)) ).dict())
continue continue
# validate # validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
) ).dict()
) )
elif action == 'cancel': elif action == 'cancel':
@ -441,11 +441,14 @@ async def trades_dialogue(
) -> None: ) -> None:
tractor.log.get_console_log(loglevel) tractor.log.get_console_log(loglevel)
async with open_feed( async with (
data.open_feed(
[fqsn], [fqsn],
loglevel=loglevel, loglevel=loglevel,
) as feed: ) as feed,
):
# TODO: load paper positions per broker from .toml config file # TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]`` # and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions) # await ctx.started(all_positions)

View File

@ -30,19 +30,19 @@ from ._sharedmem import (
get_shm_token, get_shm_token,
ShmArray, ShmArray,
) )
# from .feed import ( from .feed import (
# # open_feed, open_feed,
# _setup_persistent_brokerd, _setup_persistent_brokerd,
# ) )
__all__ = [ __all__ = [
# 'open_feed', 'open_feed',
'ShmArray', 'ShmArray',
'iterticks', 'iterticks',
'maybe_open_shm_array', 'maybe_open_shm_array',
'attach_shm_array', 'attach_shm_array',
'open_shm_array', 'open_shm_array',
'get_shm_token', 'get_shm_token',
# '_setup_persistent_brokerd', '_setup_persistent_brokerd',
] ]

View File

@ -51,6 +51,10 @@ class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh' 'Prolly you dint start da daemon bruh'
class ContainerError(RuntimeError):
'Error reported via app-container logging level'
@acm @acm
async def open_docker( async def open_docker(
url: Optional[str] = None, url: Optional[str] = None,
@ -185,7 +189,7 @@ class Container:
def hard_kill(self, start: float) -> None: def hard_kill(self, start: float) -> None:
delay = time.time() - start delay = time.time() - start
log.error( log.error(
f'Failed to kill container {self.cntr.id} after {delay}s\n' f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..' 'sending SIGKILL..'
) )
# get out the big guns, bc apparently marketstore # get out the big guns, bc apparently marketstore

View File

@ -128,14 +128,11 @@ class _Token(Struct, frozen=True):
@classmethod @classmethod
def from_msg(cls, msg: dict) -> _Token: def from_msg(cls, msg: dict) -> _Token:
# TODO: native struct decoding
# return _token_dec.decode(msg)
if isinstance(msg, _Token): if isinstance(msg, _Token):
return msg return msg
# assert 0 # TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg) return _Token(**msg)

View File

@ -1,68 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D; # color: #19232D;
# width: 10px; # width: 10px;
self.setRange(0, int(slots)) self.setRange(0, slots)
self.setValue(value) self.setValue(value)

View File

@ -264,8 +264,7 @@ class OrderMode:
self, self,
) -> OrderDialog: ) -> OrderDialog:
''' '''Send execution order to EMS return a level line to
Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
@ -274,9 +273,13 @@ class OrderMode:
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
order = staged.copy() fqsn = symbol.front_fqsn()
order.oid = oid order = staged.copy(
order.symbol = symbol.front_fqsn() update={
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order( line = self.line_from_order(
order, order,