Merge pull request #190 from pikers/ems_to_bidir_streaming

Ems to bidir streaming
ems_hotfixes
goodboy 2021-06-10 08:45:44 -04:00 committed by GitHub
commit 689bc0cde0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1349 additions and 677 deletions

View File

@ -102,7 +102,9 @@ async def open_pikerd(
assert _services is None
# XXX: this may open a root actor as well
async with tractor.open_root_actor(
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_tractor_kwargs['arbiter_addr'],
name=_root_dname,
@ -113,10 +115,10 @@ async def open_pikerd(
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
# enable_modules=[__name__],
enable_modules=_root_modules,
) as _, tractor.open_nursery() as actor_nursery:
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:
# setup service mngr singleton instance
@ -137,6 +139,7 @@ async def open_pikerd(
async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,
) -> None:
"""
Start the ``tractor`` runtime (a root actor) if none exists.
@ -159,6 +162,7 @@ async def maybe_open_runtime(
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
**kwargs,
) -> Union[tractor._portal.Portal, Services]:
"""If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
@ -197,6 +201,66 @@ _data_mods = [
]
class Brokerd:
locks = defaultdict(trio.Lock)
@asynccontextmanager
async def maybe_spawn_daemon(
service_name: str,
spawn_func: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
**kwargs,
) -> tractor.Portal:
"""
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
"""
if loglevel:
get_console_log(loglevel)
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[service_name]
await lock.acquire()
# attach to existing brokerd if possible
async with tractor.find_actor(service_name) as portal:
if portal is not None:
lock.release()
yield portal
return
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**kwargs,
) as pikerd_portal:
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_func(**spawn_args)
else:
await pikerd_portal.run(
spawn_func,
**spawn_args,
)
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
async def spawn_brokerd(
brokername: str,
@ -205,8 +269,6 @@ async def spawn_brokerd(
) -> tractor._portal.Portal:
from .data import _setup_persistent_brokerd
log.info(f'Spawning {brokername} broker daemon')
brokermod = get_brokermod(brokername)
@ -226,13 +288,9 @@ async def spawn_brokerd(
**tractor_kwargs
)
# TODO: so i think this is the perfect use case for supporting
# a cross-actor async context manager api instead of this
# shoort-and-forget task spawned in the root nursery, we'd have an
# async exit stack that we'd register the `portal.open_context()`
# call with and then have the ability to unwind the call whenevs.
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await _services.open_remote_ctx(
portal,
_setup_persistent_brokerd,
@ -242,10 +300,6 @@ async def spawn_brokerd(
return dname
class Brokerd:
locks = defaultdict(trio.Lock)
@asynccontextmanager
async def maybe_spawn_brokerd(
@ -253,57 +307,24 @@ async def maybe_spawn_brokerd(
loglevel: Optional[str] = None,
**kwargs,
) -> tractor._portal.Portal:
"""
If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
) -> tractor.Portal:
'''Helper to spawn a brokerd service.
"""
if loglevel:
get_console_log(loglevel)
'''
async with maybe_spawn_daemon(
dname = f'brokerd.{brokername}'
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[brokername]
await lock.acquire()
# attach to existing brokerd if possible
async with tractor.find_actor(dname) as portal:
if portal is not None:
lock.release()
yield portal
return
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
f'brokerd.{brokername}',
spawn_func=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as pikerd_portal:
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_brokerd(brokername, loglevel=loglevel)
else:
await pikerd_portal.run(
spawn_brokerd,
brokername=brokername,
loglevel=loglevel,
)
async with tractor.wait_for_actor(dname) as portal:
lock.release()
yield portal
) as portal:
yield portal
async def spawn_emsd(
brokername: str,
loglevel: Optional[str] = None,
**extra_tractor_kwargs
@ -314,10 +335,10 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')
# TODO: raise exception when _services == None?
global _services
assert _services
await _services.actor_n.start_actor(
portal = await _services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
@ -327,4 +348,34 @@ async def spawn_emsd(
debug_mode=_services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await _services.open_remote_ctx(
portal,
_setup_persistent_emsd,
)
return 'emsd'
@asynccontextmanager
async def maybe_open_emsd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor._portal.Portal: # noqa
async with maybe_spawn_daemon(
'emsd',
spawn_func=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal

View File

@ -25,7 +25,7 @@ from contextlib import asynccontextmanager
from dataclasses import asdict
from datetime import datetime
from functools import partial
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import asyncio
from pprint import pformat
import inspect
@ -39,7 +39,8 @@ import tractor
from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails, Option
from ib_insync.order import Order
from ib_insync.order import Order, Trade, OrderStatus
from ib_insync.objects import Fill, Execution
from ib_insync.ticker import Ticker
from ib_insync.objects import Position
import ib_insync as ibis
@ -53,6 +54,12 @@ from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df
from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound, NoData
from ..clearing._messages import (
BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdPosition, BrokerdCancel,
BrokerdFill,
# BrokerdError,
)
log = get_logger(__name__)
@ -472,7 +479,7 @@ class Client:
# XXX: by default 0 tells ``ib_insync`` methods that there is no
# existing order so ask the client to create a new one (which it
# seems to do by allocating an int counter - collision prone..)
brid: int = None,
reqid: int = None,
) -> int:
"""Place an order and return integer request id provided by client.
@ -488,7 +495,7 @@ class Client:
trade = self.ib.placeOrder(
contract,
Order(
orderId=brid or 0, # stupid api devs..
orderId=reqid or 0, # stupid api devs..
action=action.upper(), # BUY/SELL
orderType='LMT',
lmtPrice=price,
@ -582,6 +589,7 @@ class Client:
self,
to_trio: trio.abc.SendChannel,
) -> None:
# connect error msgs
def push_err(
reqId: int,
@ -589,13 +597,16 @@ class Client:
errorString: str,
contract: Contract,
) -> None:
log.error(errorString)
try:
to_trio.send_nowait((
'error',
# error "object"
{'reqid': reqId,
'message': errorString,
'reason': errorString,
'contract': contract}
))
except trio.BrokenResourceError:
@ -635,6 +646,8 @@ async def _aio_get_client(
"""Return an ``ib_insync.IB`` instance wrapped in our client API.
Client instances are cached for later use.
TODO: consider doing this with a ctx mngr eventually?
"""
# first check cache for existing client
@ -738,7 +751,7 @@ async def _trio_run_client_method(
class _MethodProxy:
def __init__(
self,
portal: tractor._portal.Portal
portal: tractor.Portal
) -> None:
self._portal = portal
@ -755,7 +768,12 @@ class _MethodProxy:
)
def get_client_proxy(portal, target=Client) -> _MethodProxy:
def get_client_proxy(
portal: tractor.Portal,
target=Client,
) -> _MethodProxy:
proxy = _MethodProxy(portal)
@ -843,7 +861,7 @@ async def get_bars(
end_dt: str = "",
) -> (dict, np.ndarray):
_err = None
_err: Optional[Exception] = None
fails = 0
for _ in range(2):
@ -880,12 +898,12 @@ async def get_bars(
raise NoData(f'Symbol: {sym}')
break
else:
log.exception(
"Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
)
print(_err)
# TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here
@ -932,7 +950,7 @@ async def backfill_bars(
if fails is None or fails > 1:
break
if out is (None, None):
if out == (None, None):
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
@ -1183,112 +1201,213 @@ def pack_position(pos: Position) -> Dict[str, Any]:
else:
symbol = con.symbol
return {
'broker': 'ib',
'account': pos.account,
'symbol': symbol,
'currency': con.currency,
'size': float(pos.position),
'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0),
}
return BrokerdPosition(
broker='ib',
account=pos.account,
symbol=symbol,
currency=con.currency,
size=float(pos.position),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
)
@tractor.msg.pub(
send_on_connect={'local_trades': 'start'}
)
async def stream_trades(
async def handle_order_requests(
ems_order_stream: tractor.MsgStream,
) -> None:
# request_msg: dict
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = await _trio_run_client_method(
method='submit_limit',
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await _trio_run_client_method(
method='submit_cancel',
reqid=msg.reqid
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
get_topics: Callable = None,
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
stream = await _trio_run_client_method(
ib_trade_events_stream = await _trio_run_client_method(
method='recv_trade_updates',
)
# deliver positions to subscriber before anything else
positions = await _trio_run_client_method(method='positions')
all_positions = {}
for pos in positions:
yield {'local_trades': ('position', pack_position(pos))}
msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
await ctx.started(all_positions)
action_map = {'BOT': 'buy', 'SLD': 'sell'}
async for event_name, item in stream:
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream)
# XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
async for event_name, item in ib_trade_events_stream:
if event_name == 'status':
# XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
# unwrap needed data from ib_insync internal objects
trade = item
status = trade.orderStatus
if event_name == 'status':
# skip duplicate filled updates - we get the deats
# from the execution details event
msg = {
'reqid': trade.order.orderId,
'status': status.status,
'filled': status.filled,
'reason': status.whyHeld,
# unwrap needed data from ib_insync internal types
trade: Trade = item
status: OrderStatus = trade.orderStatus
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
'remaining': status.remaining,
}
# skip duplicate filled updates - we get the deats
# from the execution details event
msg = BrokerdStatus(
elif event_name == 'fill':
reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not
status=status.status.lower(), # force lower case
trade, fill = item
execu = fill.execution
filled=status.filled,
reason=status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
msg = {
'reqid': execu.orderId,
'execid': execu.execId,
broker_details={'name': 'ib'},
)
# supposedly IB server fill time
'broker_time': execu.time, # converted to float by us
# ns from main TCP handler by us inside ``ib_insync`` override
'time': fill.time,
'time_ns': time.time_ns(), # cuz why not
'action': action_map[execu.side],
'size': execu.shares,
'price': execu.price,
}
elif event_name == 'fill':
elif event_name == 'error':
msg = item
# for wtv reason this is a separate event type
# from IB, not sure why it's needed other then for extra
# complexity and over-engineering :eyeroll:.
# we may just end up dropping these events (or
# translating them to ``Status`` msgs) if we can
# show the equivalent status events are no more latent.
# f$#$% gawd dammit insync..
con = msg['contract']
if isinstance(con, Contract):
msg['contract'] = asdict(con)
# unpack ib_insync types
# pep-0526 style:
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
trade: Trade
fill: Fill
trade, fill = item
execu: Execution = fill.execution
if msg['reqid'] == -1:
log.error(pformat(msg))
# TODO: normalize out commissions details?
details = {
'contract': asdict(fill.contract),
'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport),
'broker_time': execu.time, # supposedly IB server fill time
'name': 'ib',
}
# don't forward, it's pointless..
continue
msg = BrokerdFill(
# should match the value returned from `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
elif event_name == 'position':
msg = pack_position(item)
action=action_map[execu.side],
size=execu.shares,
price=execu.price,
if msg.get('reqid', 0) < -1:
# it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
broker_details=details,
# XXX: required by order mode currently
broker_time=details['broker_time'],
msg['reqid'] = 'tws-' + str(-1 * msg['reqid'])
)
# mark msg as from "external system"
# TODO: probably something better then this..
msg['external'] = True
elif event_name == 'error':
yield {'remote_trades': (event_name, msg)}
continue
err: dict = item
yield {'local_trades': (event_name, msg)}
# f$#$% gawd dammit insync..
con = err['contract']
if isinstance(con, Contract):
err['contract'] = asdict(con)
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
# don't forward for now, it's unecessary.. but if we wanted to,
# msg = BrokerdError(**err)
continue
elif event_name == 'position':
msg = pack_position(item)
if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
msg.reqid = 'tws-' + str(-1 * msg.reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
msg['external'] = True
continue
# XXX: we always serialize to a dict for msgpack
# translations, ideally we can move to an msgspec (or other)
# encoder # that can be enabled in ``tractor`` ahead of
# time so we can pass through the message types directly.
await ems_stream.send(msg.dict())
@tractor.context

View File

@ -19,32 +19,23 @@ Orders and execution client API.
"""
from contextlib import asynccontextmanager
from typing import Dict, Tuple, List
from typing import Dict
from pprint import pformat
from dataclasses import dataclass, field
import trio
import tractor
# import msgspec
from ..data._source import Symbol
from ..log import get_logger
from ._ems import _emsd_main
from .._daemon import maybe_open_emsd
from ._messages import Order, Cancel
log = get_logger(__name__)
# class Order(msgspec.Struct):
# action: str
# price: float
# size: float
# symbol: str
# brokers: List[str]
# oid: str
# exec_mode: str
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
@ -62,31 +53,34 @@ class OrderBook:
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel
_sent_orders: Dict[str, dict] = field(default_factory=dict)
_sent_orders: Dict[str, Order] = field(default_factory=dict)
_ready_to_receive: trio.Event = trio.Event()
def send(
self,
uuid: str,
symbol: str,
brokers: List[str],
brokers: list[str],
price: float,
size: float,
action: str,
exec_mode: str,
) -> dict:
cmd = {
'action': action,
'price': price,
'size': size,
'symbol': symbol,
'brokers': brokers,
'oid': uuid,
'exec_mode': exec_mode, # dark or live
}
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
return cmd
msg = Order(
action=action,
price=price,
size=size,
symbol=symbol,
brokers=brokers,
oid=uuid,
exec_mode=exec_mode, # dark or live
)
self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg.dict())
return msg
def update(
self,
@ -94,29 +88,29 @@ class OrderBook:
**data: dict,
) -> dict:
cmd = self._sent_orders[uuid]
cmd.update(data)
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
msg = cmd.dict()
msg.update(data)
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg)
return cmd
def cancel(self, uuid: str) -> bool:
"""Cancel an order (or alert) from the EMS.
"""Cancel an order (or alert) in the EMS.
"""
cmd = self._sent_orders[uuid]
msg = {
'action': 'cancel',
'oid': uuid,
'symbol': cmd['symbol'],
}
self._to_ems.send_nowait(msg)
msg = Cancel(
oid=uuid,
symbol=cmd.symbol,
)
self._to_ems.send_nowait(msg.dict())
_orders: OrderBook = None
def get_orders(
emsd_uid: Tuple[str, str] = None
emsd_uid: tuple[str, str] = None
) -> OrderBook:
""""
OrderBook singleton factory per actor.
@ -136,7 +130,14 @@ def get_orders(
return _orders
async def send_order_cmds(symbol_key: str):
# TODO: we can get rid of this relay loop once we move
# order_mode inputs to async code!
async def relay_order_cmds_from_sync_code(
symbol_key: str,
to_ems_stream: tractor.MsgStream,
) -> None:
"""
Order streaming task: deliver orders transmitted from UI
to downstream consumers.
@ -156,16 +157,15 @@ async def send_order_cmds(symbol_key: str):
book = get_orders()
orders_stream = book._from_order_book
# signal that ems connection is up and ready
book._ready_to_receive.set()
async for cmd in orders_stream:
print(cmd)
if cmd['symbol'] == symbol_key:
# send msg over IPC / wire
log.info(f'Send order cmd:\n{pformat(cmd)}')
yield cmd
await to_ems_stream.send(cmd)
else:
# XXX BRUTAL HACKZORZES !!!
# re-insert for another consumer
@ -174,36 +174,12 @@ async def send_order_cmds(symbol_key: str):
book._to_ems.send_nowait(cmd)
@asynccontextmanager
async def maybe_open_emsd(
brokername: str,
) -> tractor._portal.Portal: # noqa
async with tractor.find_actor('emsd') as portal:
if portal is not None:
yield portal
return
# ask remote daemon tree to spawn it
from .._daemon import spawn_emsd
async with tractor.find_actor('pikerd') as portal:
assert portal
name = await portal.run(
spawn_emsd,
brokername=brokername,
)
async with tractor.wait_for_actor(name) as portal:
yield portal
@asynccontextmanager
async def open_ems(
broker: str,
symbol: Symbol,
) -> None:
) -> (OrderBook, tractor.MsgStream, dict):
"""Spawn an EMS daemon and begin sending orders and receiving
alerts.
@ -237,32 +213,31 @@ async def open_ems(
- 'broker_filled'
"""
actor = tractor.current_actor()
# wait for service to connect back to us signalling
# ready for order commands
book = get_orders()
async with maybe_open_emsd(broker) as portal:
async with portal.open_stream_from(
async with (
_emsd_main,
client_actor_name=actor.name,
broker=broker,
symbol=symbol.key,
# connect to emsd
portal.open_context(
) as trades_stream:
with trio.fail_after(10):
await book._ready_to_receive.wait()
_emsd_main,
broker=broker,
symbol=symbol.key,
try:
yield book, trades_stream
) as (ctx, positions),
finally:
# TODO: we want to eventually keep this up (by having
# the exec loop keep running in the pikerd tree) but for
# now we have to kill the context to avoid backpressure
# build-up on the shm write loop.
with trio.CancelScope(shield=True):
await trades_stream.aclose()
# open 2-way trade command stream
ctx.open_stream() as trades_stream,
):
async with trio.open_nursery() as n:
n.start_soon(
relay_order_cmds_from_sync_code,
symbol.key,
trades_stream
)
yield book, trades_stream, positions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,238 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Clearing system messagingn types and protocols.
"""
from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
# Client -> emsd
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
'''
action: str = 'cancel'
oid: str # uuid4
symbol: str
class Order(BaseModel):
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: str
price: float
size: float
brokers: list[str]
# Assigned once initial ack is received
# ack_time_ns: Optional[int] = None
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
# Client <- emsd
# update msgs from ems which relay state change info
# from the active clearing engine.
class Status(BaseModel):
name: str = 'status'
oid: str # uuid4
time_ns: int
# {
# 'dark_submitted',
# 'dark_cancelled',
# 'dark_triggered',
# 'broker_submitted',
# 'broker_cancelled',
# 'broker_executed',
# 'broker_filled',
# 'alert_submitted',
# 'alert_triggered',
# 'position',
# }
resp: str # "response", see above
# symbol: str
# trigger info
trigger_price: Optional[float] = None
# price: float
# broker: Optional[str] = None
# this maps normally to the ``BrokerdOrder.reqid`` below, an id
# normally allocated internally by the backend broker routing system
broker_reqid: Optional[Union[int, str]] = None
# for relaying backend msg data "through" the ems layer
brokerd_msg: dict = {}
# emsd -> brokerd
# requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(BaseModel):
action: str = 'cancel'
oid: str # piker emsd order id
time_ns: int
# "broker request id": broker specific/internal order id if this is
# None, creates a new order otherwise if the id is valid the backend
# api must modify the existing matching order. If the broker allows
# for setting a unique order id then this value will be relayed back
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
# field
reqid: Optional[Union[int, str]] = None
class BrokerdOrder(BaseModel):
action: str # {buy, sell}
oid: str
time_ns: int
# "broker request id": broker specific/internal order id if this is
# None, creates a new order otherwise if the id is valid the backend
# api must modify the existing matching order. If the broker allows
# for setting a unique order id then this value will be relayed back
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
# field
reqid: Optional[Union[int, str]] = None
symbol: str # symbol.<providername> ?
price: float
size: float
# emsd <- brokerd
# requests *received* to ems from broker backend
class BrokerdOrderAck(BaseModel):
'''Immediate reponse to a brokerd order request providing
the broker specifci unique order id.
'''
name: str = 'ack'
# defined and provided by backend
reqid: Union[int, str]
# emsd id originally sent in matching request msg
oid: str
class BrokerdStatus(BaseModel):
name: str = 'status'
reqid: Union[int, str]
time_ns: int
# {
# 'submitted',
# 'cancelled',
# 'executed',
# }
status: str
filled: float = 0.0
reason: str = ''
remaining: float = 0.0
# XXX: better design/name here?
# flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems).
external: bool = False
# XXX: not required schema as of yet
broker_details: dict = {
'name': '',
}
class BrokerdFill(BaseModel):
'''A single message indicating a "fill-details" event from the broker
if avaiable.
'''
name: str = 'fill'
reqid: Union[int, str]
time_ns: int
# order exeuction related
action: str
size: float
price: float
broker_details: dict = {} # meta-data (eg. commisions etc.)
# brokerd timestamp required for order mode arrow placement on x-axis
# TODO: maybe int if we force ns?
# we need to normalize this somehow since backends will use their
# own format and likely across many disparate epoch clocks...
broker_time: float
class BrokerdError(BaseModel):
'''Optional error type that can be relayed to emsd for error handling.
This is still a TODO thing since we're not sure how to employ it yet.
'''
name: str = 'error'
reqid: Union[int, str]
symbol: str
reason: str
broker_details: dict = {}
class BrokerdPosition(BaseModel):
'''Position update event from brokerd.
'''
name: str = 'position'
broker: str
account: str
symbol: str
currency: str
size: float
avg_price: float

View File

@ -18,17 +18,28 @@
Fake trading for forward testing.
"""
from contextlib import asynccontextmanager
from datetime import datetime
from operator import itemgetter
import time
from typing import Tuple, Optional
from typing import Tuple, Optional, Callable
import uuid
from bidict import bidict
import trio
import tractor
from dataclasses import dataclass
from .. import data
from ..data._normalize import iterticks
from ..log import get_logger
from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill,
)
log = get_logger(__name__)
@dataclass
@ -41,8 +52,8 @@ class PaperBoi:
"""
broker: str
_to_trade_stream: trio.abc.SendChannel
trade_stream: trio.abc.ReceiveChannel
ems_trades_stream: tractor.MsgStream
# map of paper "live" orders which be used
# to simulate fills based on paper engine settings
@ -61,20 +72,20 @@ class PaperBoi:
price: float,
action: str,
size: float,
brid: Optional[str],
reqid: Optional[str],
) -> int:
"""Place an order and return integer request id provided by client.
"""
if brid is None:
is_modify: bool = False
if reqid is None:
reqid = str(uuid.uuid4())
else:
# order is already existing, this is a modify
(oid, symbol, action, old_price) = self._reqids[brid]
(oid, symbol, action, old_price) = self._reqids[reqid]
assert old_price != price
reqid = brid
is_modify = True
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
@ -90,22 +101,16 @@ class PaperBoi:
# in the broker trades event processing loop
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('status', {
'time_ns': time.time_ns(),
'reqid': reqid,
'status': 'submitted',
'broker': self.broker,
# 'cmd': cmd, # original request message
'paper_info': {
'oid': oid,
},
}),
})
msg = BrokerdStatus(
status='submitted',
reqid=reqid,
broker=self.broker,
time_ns=time.time_ns(),
filled=0.0,
reason='paper_trigger',
remaining=size,
)
await self.ems_trades_stream.send(msg.dict())
# if we're already a clearing price simulate an immediate fill
if (
@ -129,7 +134,7 @@ class PaperBoi:
# and trigger by the simulated clearing task normally
# running ``simulate_fills()``.
if brid is not None:
if is_modify:
# remove any existing order for the old price
orders[symbol].pop((oid, old_price))
@ -144,7 +149,6 @@ class PaperBoi:
) -> None:
# TODO: fake market simulation effects
# await self._to_trade_stream.send(
oid, symbol, action, price = self._reqids[reqid]
if action == 'buy':
@ -155,21 +159,14 @@ class PaperBoi:
# TODO: net latency model
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('status', {
'time_ns': time.time_ns(),
'oid': oid,
'reqid': reqid,
'status': 'cancelled',
'broker': self.broker,
# 'cmd': cmd, # original request message
'paper': True,
}),
})
msg = BrokerdStatus(
status='cancelled',
oid=oid,
reqid=reqid,
broker=self.broker,
time_ns=time.time_ns(),
)
await self.ems_trades_stream.send(msg.dict())
async def fake_fill(
self,
@ -191,56 +188,49 @@ class PaperBoi:
# TODO: net latency model
await trio.sleep(0.05)
# the trades stream expects events in the form
# {'local_trades': (event_name, msg)}
await self._to_trade_stream.send({
msg = BrokerdFill(
'local_trades': ('fill', {
reqid=reqid,
time_ns=time.time_ns(),
'status': 'filled',
'broker': self.broker,
# converted to float by us in ib backend
'broker_time': datetime.now().timestamp(),
'action': action,
'size': size,
'price': price,
'remaining': 0 if order_complete else remaining,
# normally filled by real `brokerd` daemon
'time': time.time_ns(),
'time_ns': time.time_ns(), # cuz why not
# fake ids
'reqid': reqid,
action=action,
size=size,
price=price,
broker_time=datetime.now().timestamp(),
broker_details={
'paper_info': {
'oid': oid,
},
# mocking ib
'name': self.broker + '_paper',
},
)
await self.ems_trades_stream.send(msg.dict())
# XXX: fields we might not need to emulate?
# execution id from broker
# 'execid': execu.execId,
# 'cmd': cmd, # original request message?
}),
})
if order_complete:
await self._to_trade_stream.send({
'local_trades': ('status', {
'reqid': reqid,
'status': 'filled',
'broker': self.broker,
'filled': size,
'remaining': 0 if order_complete else remaining,
msg = BrokerdStatus(
# converted to float by us in ib backend
'broker_time': datetime.now().timestamp(),
reqid=reqid,
time_ns=time.time_ns(),
status='filled',
filled=size,
remaining=0 if order_complete else remaining,
action=action,
size=size,
price=price,
broker_details={
'paper_info': {
'oid': oid,
},
}),
})
'name': self.broker,
},
)
await self.ems_trades_stream.send(msg.dict())
async def simulate_fills(
@ -327,3 +317,145 @@ async def simulate_fills(
else:
# prices are iterated in sorted order so we're done
break
async def handle_order_requests(
client: PaperBoi,
ems_order_stream: tractor.MsgStream,
) -> None:
# order_request: dict
async for request_msg in ems_order_stream:
action = request_msg['action']
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(
reqid=msg.reqid
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
broker: str,
symbol: str,
loglevel: str = None,
) -> None:
async with (
data.open_feed(
broker,
[symbol],
loglevel=loglevel,
) as feed,
):
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)
await ctx.started({})
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
client = PaperBoi(
broker,
ems_stream,
_buys={},
_sells={},
_reqids={},
)
n.start_soon(handle_order_requests, client, ems_stream)
# paper engine simulator clearing task
await simulate_fills(feed.stream, client)
@asynccontextmanager
async def open_paperboi(
broker: str,
symbol: str,
loglevel: str,
) -> Callable:
'''Spawn a paper engine actor and yield through access to
its context.
'''
service_name = f'paperboi.{broker}'
async with (
tractor.find_actor(service_name) as portal,
tractor.open_nursery() as tn,
):
# only spawn if no paperboi already is up
# (we likely don't need more then one proc for basic
# simulated order clearing)
if portal is None:
portal = await tn.start_actor(
service_name,
enable_modules=[__name__]
)
async with portal.open_context(
trades_dialogue,
broker=broker,
symbol=symbol,
loglevel=loglevel,
) as (ctx, first):
try:
yield ctx, first
finally:
# be sure to tear down the paper service on exit
with trio.CancelScope(shield=True):
await portal.cancel_actor()

View File

@ -9,6 +9,7 @@ import tractor
from ..log import get_console_log, get_logger, colorize_json
from ..brokers import get_brokermod, config
from .._daemon import _tractor_kwargs
log = get_logger('cli')
@ -101,8 +102,9 @@ def cli(ctx, brokers, loglevel, tl, configdir):
def services(config, tl, names):
async def list_services():
async with tractor.get_arbiter(
*tractor.current_actor()._arb_addr
*_tractor_kwargs['arbiter_addr']
) as portal:
registry = await portal.run('self', 'get_registry')
json_d = {}
@ -118,6 +120,7 @@ def services(config, tl, names):
list_services,
name='service_query',
loglevel=config['loglevel'] if tl else None,
arbiter_addr=_tractor_kwargs['arbiter_addr'],
)

View File

@ -64,13 +64,13 @@ class NoBsWs:
async def _connect(
self,
tries: int = 10000,
tries: int = 1000,
) -> None:
while True:
try:
await self._stack.aclose()
except (DisconnectionTimeout, RuntimeError):
await trio.sleep(1)
await trio.sleep(0.5)
else:
break
@ -95,7 +95,7 @@ class NoBsWs:
f'{self} connection bail with '
f'{type(err)}...retry attempt {i}'
)
await trio.sleep(1)
await trio.sleep(0.5)
continue
else:
log.exception('ws connection fail...')

View File

@ -217,8 +217,8 @@ async def allocate_persistent_feed(
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# pass OHLC sample rate in seconds
init_msg[symbol]['sample_rate'] = delay_s
# pass OHLC sample rate in seconds (be sure to use python int type)
init_msg[symbol]['sample_rate'] = int(delay_s)
# yield back control to starting nursery
task_status.started((init_msg, first_quote))

View File

@ -127,10 +127,6 @@ class OrderMode:
"""
line = self.lines.commit_line(uuid)
req_msg = self.book._sent_orders.get(uuid)
if req_msg:
req_msg['ack_time_ns'] = time.time_ns()
return line
def on_fill(
@ -196,8 +192,10 @@ class OrderMode:
def submit_exec(
self,
size: Optional[float] = None,
) -> LevelLine:
"""Send execution order to EMS.
"""Send execution order to EMS return a level line to
represent the order on a chart.
"""
# register the "staged" line under the cursor
@ -226,6 +224,9 @@ class OrderMode:
exec_mode=self._exec_mode,
)
# TODO: update the line once an ack event comes back
# from the EMS!
# make line graphic if order push was
# sucessful
line = self.lines.create_order_line(
@ -266,14 +267,6 @@ class OrderMode:
price=line.value(),
)
# def on_key_press(
# self,
# key:
# mods:
# text: str,
# ) -> None:
# pass
@asynccontextmanager
async def open_order_mode(
@ -320,10 +313,14 @@ async def start_order_mode(
# spawn EMS actor-service
async with (
open_ems(brokername, symbol) as (book, trades_stream),
open_ems(brokername, symbol) as (book, trades_stream, positions),
open_order_mode(symbol, chart, book) as order_mode
):
# update any exising positions
for sym, msg in positions.items():
order_mode.on_position_update(msg)
def get_index(time: float):
# XXX: not sure why the time is so off here
@ -346,16 +343,15 @@ async def start_order_mode(
fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}')
resp = msg['resp']
if resp in (
name = msg['name']
if name in (
'position',
):
# show line label once order is live
order_mode.on_position_update(msg)
continue
# delete the line from view
resp = msg['resp']
oid = msg['oid']
# response to 'action' request (buy/sell)
@ -378,21 +374,21 @@ async def start_order_mode(
order_mode.on_cancel(oid)
elif resp in (
'dark_executed'
'dark_triggered'
):
log.info(f'Dark order triggered for {fmsg}')
# for alerts add a triangle and remove the
# level line
if msg['cmd']['action'] == 'alert':
# should only be one "fill" for an alert
order_mode.on_fill(
oid,
price=msg['trigger_price'],
arrow_index=get_index(time.time())
)
await order_mode.on_exec(oid, msg)
elif resp in (
'alert_triggered'
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
order_mode.on_fill(
oid,
price=msg['trigger_price'],
arrow_index=get_index(time.time())
)
await order_mode.on_exec(oid, msg)
# response to completed 'action' request for buy/sell
elif resp in (
@ -403,12 +399,15 @@ async def start_order_mode(
# each clearing tick is responded individually
elif resp in ('broker_filled',):
action = msg['action']
action = book._sent_orders[oid].action
details = msg['brokerd_msg']
# TODO: some kinda progress system
order_mode.on_fill(
oid,
price=msg['price'],
arrow_index=get_index(msg['broker_time']),
price=details['price'],
pointing='up' if action == 'buy' else 'down',
# TODO: put the actual exchange timestamp
arrow_index=get_index(details['broker_time']),
)