Compare commits
No commits in common. "f75057bc64f06b617687872122f6c5b9b2ab770a" and "b81c538e85c407746b7c60e8b2ec47b30738cb43" have entirely different histories.
f75057bc64
...
b81c538e85
124
piker/_daemon.py
124
piker/_daemon.py
|
@ -19,12 +19,11 @@ Structured, daemon tree service management.
|
|||
|
||||
"""
|
||||
from typing import Optional, Union, Callable, Any
|
||||
from contextlib import asynccontextmanager
|
||||
from contextlib import asynccontextmanager, AsyncExitStack
|
||||
from collections import defaultdict
|
||||
|
||||
from pydantic import BaseModel
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
|
||||
from .log import get_logger, get_console_log
|
||||
|
@ -46,79 +45,36 @@ _root_modules = [
|
|||
|
||||
|
||||
class Services(BaseModel):
|
||||
|
||||
actor_n: tractor._trionics.ActorNursery
|
||||
service_n: trio.Nursery
|
||||
debug_mode: bool # tractor sub-actor debug mode flag
|
||||
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
|
||||
ctx_stack: AsyncExitStack
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
async def start_service_task(
|
||||
async def open_remote_ctx(
|
||||
self,
|
||||
name: str,
|
||||
portal: tractor.Portal,
|
||||
target: Callable,
|
||||
**kwargs,
|
||||
|
||||
) -> (trio.CancelScope, tractor.Context):
|
||||
) -> tractor.Context:
|
||||
'''
|
||||
Open a context in a service sub-actor, add to a stack
|
||||
that gets unwound at ``pikerd`` teardown.
|
||||
that gets unwound at ``pikerd`` tearodwn.
|
||||
|
||||
This allows for allocating long-running sub-services in our main
|
||||
daemon and explicitly controlling their lifetimes.
|
||||
|
||||
'''
|
||||
async def open_context_in_task(
|
||||
task_status: TaskStatus[
|
||||
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> Any:
|
||||
|
||||
with trio.CancelScope() as cs:
|
||||
|
||||
async with portal.open_context(
|
||||
ctx, first = await self.ctx_stack.enter_async_context(
|
||||
portal.open_context(
|
||||
target,
|
||||
**kwargs,
|
||||
|
||||
) as (ctx, first):
|
||||
|
||||
# unblock once the remote context has started
|
||||
task_status.started((cs, first))
|
||||
|
||||
# wait on any context's return value
|
||||
ctx_res = await ctx.result()
|
||||
log.info(
|
||||
f'`pikerd` service {name} started with value {ctx_res}'
|
||||
)
|
||||
|
||||
# wait on any error from the sub-actor
|
||||
# NOTE: this will block indefinitely until cancelled
|
||||
# either by error from the target context function or
|
||||
# by being cancelled here by the surroundingn cancel
|
||||
# scope
|
||||
return await (portal.result(), ctx_res)
|
||||
|
||||
cs, first = await self.service_n.start(open_context_in_task)
|
||||
|
||||
# store the cancel scope and portal for later cancellation or
|
||||
# retstart if needed.
|
||||
self.service_tasks[name] = (cs, portal)
|
||||
|
||||
return cs, first
|
||||
|
||||
async def cancel_service(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
) -> Any:
|
||||
|
||||
log.info(f'Cancelling `pikerd` service {name}')
|
||||
cs, portal = self.service_tasks[name]
|
||||
cs.cancel()
|
||||
return await portal.cancel_actor()
|
||||
)
|
||||
return ctx
|
||||
|
||||
|
||||
_services: Optional[Services] = None
|
||||
|
@ -161,19 +117,19 @@ async def open_pikerd(
|
|||
# spawn other specialized daemons I think?
|
||||
enable_modules=_root_modules,
|
||||
) as _,
|
||||
|
||||
tractor.open_nursery() as actor_nursery,
|
||||
):
|
||||
async with trio.open_nursery() as service_nursery:
|
||||
|
||||
# # setup service mngr singleton instance
|
||||
# async with AsyncExitStack() as stack:
|
||||
# setup service mngr singleton instance
|
||||
async with AsyncExitStack() as stack:
|
||||
|
||||
# assign globally for future daemon/task creation
|
||||
_services = Services(
|
||||
actor_n=actor_nursery,
|
||||
service_n=service_nursery,
|
||||
debug_mode=debug_mode,
|
||||
ctx_stack=stack,
|
||||
)
|
||||
|
||||
yield _services
|
||||
|
@ -218,20 +174,16 @@ async def maybe_open_pikerd(
|
|||
|
||||
# subtle, we must have the runtime up here or portal lookup will fail
|
||||
async with maybe_open_runtime(loglevel, **kwargs):
|
||||
|
||||
async with tractor.find_actor(_root_dname) as portal:
|
||||
# assert portal is not None
|
||||
if portal is not None:
|
||||
yield portal
|
||||
return
|
||||
|
||||
# presume pikerd role since no daemon could be found at
|
||||
# configured address
|
||||
# presume pikerd role
|
||||
async with open_pikerd(
|
||||
|
||||
loglevel=loglevel,
|
||||
debug_mode=kwargs.get('debug_mode', False),
|
||||
|
||||
) as _:
|
||||
# in the case where we're starting up the
|
||||
# tractor-piker runtime stack in **this** process
|
||||
|
@ -257,7 +209,7 @@ class Brokerd:
|
|||
async def maybe_spawn_daemon(
|
||||
|
||||
service_name: str,
|
||||
service_task_target: Callable,
|
||||
spawn_func: Callable,
|
||||
spawn_args: dict[str, Any],
|
||||
loglevel: Optional[str] = None,
|
||||
**kwargs,
|
||||
|
@ -267,13 +219,6 @@ async def maybe_spawn_daemon(
|
|||
If no ``service_name`` daemon-actor can be found,
|
||||
spawn one in a local subactor and return a portal to it.
|
||||
|
||||
If this function is called from a non-pikerd actor, the
|
||||
spawned service will persist as long as pikerd does or
|
||||
it is requested to be cancelled.
|
||||
|
||||
This can be seen as a service starting api for remote-actor
|
||||
clients.
|
||||
|
||||
"""
|
||||
if loglevel:
|
||||
get_console_log(loglevel)
|
||||
|
@ -301,24 +246,13 @@ async def maybe_spawn_daemon(
|
|||
) as pikerd_portal:
|
||||
|
||||
if pikerd_portal is None:
|
||||
# we are the root and thus are `pikerd`
|
||||
# so spawn the target service directly by calling
|
||||
# the provided target routine.
|
||||
# XXX: this assumes that the target is well formed and will
|
||||
# do the right things to setup both a sub-actor **and** call
|
||||
# the ``_Services`` api from above to start the top level
|
||||
# service task for that actor.
|
||||
await service_task_target(**spawn_args)
|
||||
# we are root so spawn brokerd directly in our tree
|
||||
# the root nursery is accessed through process global state
|
||||
await spawn_func(**spawn_args)
|
||||
|
||||
else:
|
||||
# tell the remote `pikerd` to start the target,
|
||||
# the target can't return a non-serializable value
|
||||
# since it is expected that service startingn is
|
||||
# non-blocking and the target task will persist running
|
||||
# on `pikerd` after the client requesting it's start
|
||||
# disconnects.
|
||||
await pikerd_portal.run(
|
||||
service_task_target,
|
||||
spawn_func,
|
||||
**spawn_args,
|
||||
)
|
||||
|
||||
|
@ -333,7 +267,7 @@ async def spawn_brokerd(
|
|||
loglevel: Optional[str] = None,
|
||||
**tractor_kwargs,
|
||||
|
||||
) -> bool:
|
||||
) -> tractor._portal.Portal:
|
||||
|
||||
log.info(f'Spawning {brokername} broker daemon')
|
||||
|
||||
|
@ -346,8 +280,6 @@ async def spawn_brokerd(
|
|||
global _services
|
||||
assert _services
|
||||
|
||||
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
||||
# actor nursery
|
||||
portal = await _services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=_data_mods + [brokermod.__name__],
|
||||
|
@ -359,13 +291,13 @@ async def spawn_brokerd(
|
|||
# non-blocking setup of brokerd service nursery
|
||||
from .data import _setup_persistent_brokerd
|
||||
|
||||
await _services.start_service_task(
|
||||
dname,
|
||||
await _services.open_remote_ctx(
|
||||
portal,
|
||||
_setup_persistent_brokerd,
|
||||
brokername=brokername,
|
||||
)
|
||||
return True
|
||||
|
||||
return dname
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
@ -382,7 +314,7 @@ async def maybe_spawn_brokerd(
|
|||
async with maybe_spawn_daemon(
|
||||
|
||||
f'brokerd.{brokername}',
|
||||
service_task_target=spawn_brokerd,
|
||||
spawn_func=spawn_brokerd,
|
||||
spawn_args={'brokername': brokername, 'loglevel': loglevel},
|
||||
loglevel=loglevel,
|
||||
**kwargs,
|
||||
|
@ -396,7 +328,7 @@ async def spawn_emsd(
|
|||
loglevel: Optional[str] = None,
|
||||
**extra_tractor_kwargs
|
||||
|
||||
) -> bool:
|
||||
) -> tractor._portal.Portal:
|
||||
"""
|
||||
Start the clearing engine under ``pikerd``.
|
||||
|
||||
|
@ -420,12 +352,12 @@ async def spawn_emsd(
|
|||
# non-blocking setup of clearing service
|
||||
from .clearing._ems import _setup_persistent_emsd
|
||||
|
||||
await _services.start_service_task(
|
||||
'emsd',
|
||||
await _services.open_remote_ctx(
|
||||
portal,
|
||||
_setup_persistent_emsd,
|
||||
)
|
||||
return True
|
||||
|
||||
return 'emsd'
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
@ -440,7 +372,7 @@ async def maybe_open_emsd(
|
|||
async with maybe_spawn_daemon(
|
||||
|
||||
'emsd',
|
||||
service_task_target=spawn_emsd,
|
||||
spawn_func=spawn_emsd,
|
||||
spawn_args={'loglevel': loglevel},
|
||||
loglevel=loglevel,
|
||||
**kwargs,
|
||||
|
|
|
@ -53,6 +53,6 @@ def resproc(
|
|||
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
||||
raise BrokerError(resp.text)
|
||||
else:
|
||||
log.debug(f"Received json contents:\n{colorize_json(json)}")
|
||||
log.trace(f"Received json contents:\n{colorize_json(json)}")
|
||||
|
||||
return json if return_json else resp
|
||||
|
|
|
@ -1442,14 +1442,14 @@ async def trades_dialogue(
|
|||
if getattr(msg, 'reqid', 0) < -1:
|
||||
|
||||
# it's a trade event generated by TWS usage.
|
||||
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
|
||||
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
|
||||
|
||||
msg.reqid = 'tws-' + str(-1 * msg.reqid)
|
||||
|
||||
# mark msg as from "external system"
|
||||
# TODO: probably something better then this.. and start
|
||||
# considering multiplayer/group trades tracking
|
||||
msg.broker_details['external_src'] = 'tws'
|
||||
msg.external = True
|
||||
continue
|
||||
|
||||
# XXX: we always serialize to a dict for msgpack
|
||||
|
|
|
@ -27,14 +27,12 @@ from typing import AsyncIterator, Callable, Any
|
|||
from bidict import bidict
|
||||
from pydantic import BaseModel
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
|
||||
from .. import data
|
||||
from ..log import get_logger
|
||||
from ..data._normalize import iterticks
|
||||
from ..data.feed import Feed
|
||||
from .._daemon import maybe_spawn_brokerd
|
||||
from . import _paper_engine as paper
|
||||
from ._messages import (
|
||||
Status, Order,
|
||||
|
@ -84,16 +82,15 @@ def mk_check(
|
|||
|
||||
@dataclass
|
||||
class _DarkBook:
|
||||
'''EMS-trigger execution book.
|
||||
"""Client-side execution book.
|
||||
|
||||
Contains conditions for executions (aka "orders" or "triggers")
|
||||
which are not exposed to brokers and thus the market; i.e. these are
|
||||
privacy focussed "client side" orders which are submitted in real-time
|
||||
based on specified trigger conditions.
|
||||
Contains conditions for executions (aka "orders") which are not
|
||||
exposed to brokers and thus the market; i.e. these are privacy
|
||||
focussed "client side" orders.
|
||||
|
||||
A an instance per `brokerd` is created per EMS actor (for now).
|
||||
A singleton instance is created per EMS actor (for now).
|
||||
|
||||
'''
|
||||
"""
|
||||
broker: str
|
||||
|
||||
# levels which have an executable action (eg. alert, order, signal)
|
||||
|
@ -259,34 +256,17 @@ async def clear_dark_triggers(
|
|||
# print(f'execs scan took: {time.time() - start}')
|
||||
|
||||
|
||||
@dataclass
|
||||
class TradesRelay:
|
||||
brokerd_dialogue: tractor.MsgStream
|
||||
positions: dict[str, float]
|
||||
consumers: int = 0
|
||||
|
||||
|
||||
class _Router(BaseModel):
|
||||
'''Order router which manages and tracks per-broker dark book,
|
||||
alerts, clearing and related data feed management.
|
||||
|
||||
A singleton per ``emsd`` actor.
|
||||
'''Order router which manages per-broker dark books, alerts,
|
||||
and clearing related data feed management.
|
||||
|
||||
'''
|
||||
# setup at actor spawn time
|
||||
nursery: trio.Nursery
|
||||
|
||||
feeds: dict[tuple[str, str], Any] = {}
|
||||
|
||||
# broker to book map
|
||||
books: dict[str, _DarkBook] = {}
|
||||
|
||||
# order id to client stream map
|
||||
clients: set[tractor.MsgStream] = set()
|
||||
dialogues: dict[str, list[tractor.MsgStream]] = {}
|
||||
|
||||
# brokername to trades-dialogues streams with ``brokerd`` actors
|
||||
relays: dict[str, TradesRelay] = {}
|
||||
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
@ -300,160 +280,10 @@ class _Router(BaseModel):
|
|||
|
||||
return self.books.setdefault(brokername, _DarkBook(brokername))
|
||||
|
||||
@asynccontextmanager
|
||||
async def maybe_open_brokerd_trades_dialogue(
|
||||
|
||||
self,
|
||||
feed: Feed,
|
||||
symbol: str,
|
||||
dark_book: _DarkBook,
|
||||
_exec_mode: str,
|
||||
loglevel: str,
|
||||
|
||||
) -> tuple[dict, tractor.MsgStream]:
|
||||
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
||||
already exists.
|
||||
|
||||
'''
|
||||
relay = self.relays.get(feed.mod.name)
|
||||
|
||||
if relay is None:
|
||||
|
||||
relay = await self.nursery.start(
|
||||
open_brokerd_trades_dialogue,
|
||||
self,
|
||||
feed,
|
||||
symbol,
|
||||
_exec_mode,
|
||||
loglevel,
|
||||
)
|
||||
|
||||
relay.consumers += 1
|
||||
|
||||
# TODO: get updated positions here?
|
||||
assert relay.brokerd_dialogue
|
||||
try:
|
||||
yield relay
|
||||
|
||||
finally:
|
||||
|
||||
# TODO: what exactly needs to be torn down here or
|
||||
# are we just consumer tracking?
|
||||
|
||||
relay.consumers -= 1
|
||||
|
||||
|
||||
_router: _Router = None
|
||||
|
||||
|
||||
async def open_brokerd_trades_dialogue(
|
||||
|
||||
router: _Router,
|
||||
feed: Feed,
|
||||
symbol: str,
|
||||
_exec_mode: str,
|
||||
loglevel: str,
|
||||
|
||||
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> tuple[dict, tractor.MsgStream]:
|
||||
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
||||
already exists.
|
||||
|
||||
'''
|
||||
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
|
||||
|
||||
broker = feed.mod.name
|
||||
|
||||
# TODO: make a `tractor` bug about this!
|
||||
# portal = feed._brokerd_portal
|
||||
|
||||
# XXX: we must have our own portal + channel otherwise
|
||||
# when the data feed closes it may result in a half-closed/fucked
|
||||
# channel that the brokerd side thinks is still open somehow!?
|
||||
async with maybe_spawn_brokerd(
|
||||
|
||||
broker,
|
||||
loglevel=loglevel,
|
||||
|
||||
) as portal:
|
||||
|
||||
if trades_endpoint is None or _exec_mode == 'paper':
|
||||
|
||||
# for paper mode we need to mock this trades response feed
|
||||
# so we load bidir stream to a new sub-actor running a
|
||||
# paper-simulator clearing engine.
|
||||
|
||||
# load the paper trading engine
|
||||
_exec_mode = 'paper'
|
||||
log.warning(f'Entering paper trading mode for {broker}')
|
||||
|
||||
# load the paper trading engine as a subactor of this emsd
|
||||
# actor to simulate the real IPC load it'll have when also
|
||||
# pulling data from feeds
|
||||
open_trades_endpoint = paper.open_paperboi(
|
||||
broker=broker,
|
||||
symbol=symbol,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
|
||||
else:
|
||||
# open live brokerd trades endpoint
|
||||
open_trades_endpoint = portal.open_context(
|
||||
trades_endpoint,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
|
||||
try:
|
||||
async with (
|
||||
|
||||
open_trades_endpoint as (brokerd_ctx, positions),
|
||||
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
||||
|
||||
):
|
||||
# XXX: really we only want one stream per `emsd` actor
|
||||
# to relay global `brokerd` order events unless we're
|
||||
# doing to expect each backend to relay only orders
|
||||
# affiliated with a particular ``trades_dialogue()``
|
||||
# session (seems annoying for implementers). So, here
|
||||
# we cache the relay task and instead of running multiple
|
||||
# tasks (which will result in multiples of the same msg being
|
||||
# relayed for each EMS client) we just register each client
|
||||
# stream to this single relay loop using _router.dialogues
|
||||
|
||||
# begin processing order events from the target brokerd backend
|
||||
# by receiving order submission response messages,
|
||||
# normalizing them to EMS messages and relaying back to
|
||||
# the piker order client set.
|
||||
|
||||
relay = TradesRelay(
|
||||
brokerd_dialogue=brokerd_trades_stream,
|
||||
positions=positions,
|
||||
consumers=1
|
||||
)
|
||||
|
||||
_router.relays[broker] = relay
|
||||
|
||||
# the ems scan loop may be cancelled by the client but we
|
||||
# want to keep the ``brokerd`` dialogue up regardless
|
||||
|
||||
task_status.started(relay)
|
||||
|
||||
await translate_and_relay_brokerd_events(
|
||||
broker,
|
||||
brokerd_trades_stream,
|
||||
_router,
|
||||
)
|
||||
|
||||
# this context should block here indefinitely until
|
||||
# the ``brokerd`` task either dies or is cancelled
|
||||
|
||||
finally:
|
||||
# parent context must have been closed
|
||||
# remove from cache so next client will respawn if needed
|
||||
_router.relays.pop(broker)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def _setup_persistent_emsd(
|
||||
|
||||
|
@ -468,18 +298,20 @@ async def _setup_persistent_emsd(
|
|||
|
||||
_router = _Router(nursery=service_nursery)
|
||||
|
||||
# TODO: send back the full set of persistent
|
||||
# orders/execs?
|
||||
# TODO: send back the full set of persistent orders/execs persistent
|
||||
await ctx.started()
|
||||
|
||||
# allow service tasks to run until cancelled
|
||||
# we pin this task to keep the feeds manager active until the
|
||||
# parent actor decides to tear it down
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
async def translate_and_relay_brokerd_events(
|
||||
|
||||
broker: str,
|
||||
# ems_client_order_stream: tractor.MsgStream,
|
||||
brokerd_trades_stream: tractor.MsgStream,
|
||||
book: _DarkBook,
|
||||
router: _Router,
|
||||
|
||||
) -> AsyncIterator[dict]:
|
||||
|
@ -502,11 +334,6 @@ async def translate_and_relay_brokerd_events(
|
|||
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
||||
|
||||
'''
|
||||
book = router.get_dark_book(broker)
|
||||
relay = router.relays[broker]
|
||||
|
||||
assert relay.brokerd_dialogue == brokerd_trades_stream
|
||||
|
||||
async for brokerd_msg in brokerd_trades_stream:
|
||||
|
||||
name = brokerd_msg['name']
|
||||
|
@ -515,15 +342,13 @@ async def translate_and_relay_brokerd_events(
|
|||
|
||||
if name == 'position':
|
||||
|
||||
pos_msg = BrokerdPosition(**brokerd_msg).dict()
|
||||
|
||||
# keep up to date locally in ``emsd``
|
||||
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
|
||||
|
||||
# relay through position msgs immediately by
|
||||
# broadcasting updates on all client streams
|
||||
for client_stream in router.clients:
|
||||
await client_stream.send(pos_msg)
|
||||
for oid, ems_client_order_stream in router.dialogues.items():
|
||||
|
||||
await ems_client_order_stream.send(
|
||||
BrokerdPosition(**brokerd_msg).dict()
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
|
@ -600,7 +425,7 @@ async def translate_and_relay_brokerd_events(
|
|||
|
||||
resp = None
|
||||
broker_details = {}
|
||||
# client_flow_complete: bool = False
|
||||
client_flow_complete: bool = False
|
||||
|
||||
if name in (
|
||||
'error',
|
||||
|
@ -635,7 +460,7 @@ async def translate_and_relay_brokerd_events(
|
|||
|
||||
if msg.status == 'cancelled':
|
||||
|
||||
# client_flow_complete = True
|
||||
client_flow_complete = True
|
||||
log.info(f'Cancellation for {oid} is complete!')
|
||||
|
||||
if msg.status == 'filled':
|
||||
|
@ -648,7 +473,7 @@ async def translate_and_relay_brokerd_events(
|
|||
|
||||
# be sure to pop this stream from our dialogue set
|
||||
# since the order dialogue should be done.
|
||||
# client_flow_complete = True
|
||||
client_flow_complete = True
|
||||
log.info(f'Execution for {oid} is complete!')
|
||||
|
||||
# just log it
|
||||
|
@ -678,7 +503,6 @@ async def translate_and_relay_brokerd_events(
|
|||
|
||||
# Create and relay response status message
|
||||
# to requesting EMS client
|
||||
try:
|
||||
ems_client_order_stream = router.dialogues[oid]
|
||||
await ems_client_order_stream.send(
|
||||
Status(
|
||||
|
@ -689,9 +513,6 @@ async def translate_and_relay_brokerd_events(
|
|||
brokerd_msg=broker_details,
|
||||
).dict()
|
||||
)
|
||||
except KeyError:
|
||||
log.error(
|
||||
f'Received `brokerd` msg for unknown client with oid: {oid}')
|
||||
|
||||
# TODO: do we want this to keep things cleaned up?
|
||||
# it might require a special status from brokerd to affirm the
|
||||
|
@ -712,8 +533,6 @@ async def process_client_order_cmds(
|
|||
|
||||
) -> None:
|
||||
|
||||
client_dialogues = router.dialogues
|
||||
|
||||
# cmd: dict
|
||||
async for cmd in client_order_stream:
|
||||
|
||||
|
@ -722,16 +541,14 @@ async def process_client_order_cmds(
|
|||
action = cmd['action']
|
||||
oid = cmd['oid']
|
||||
|
||||
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
|
||||
# can be stored in sets like the old context was.
|
||||
# wait, maybe this **is** already working thanks to our parent
|
||||
# `trio` type?
|
||||
|
||||
# register this stream as an active dialogue for this order id
|
||||
# such that translated message from the brokerd backend can be
|
||||
# routed (relayed) to **just** that client stream (and in theory
|
||||
# others who are registered for such order affiliated msgs).
|
||||
client_dialogues[oid] = client_order_stream
|
||||
|
||||
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
|
||||
# can be stored in sets like the old context was.
|
||||
router.dialogues[oid] = client_order_stream
|
||||
|
||||
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
|
||||
live_entry = dark_book._ems_entries.get(oid)
|
||||
|
@ -838,7 +655,7 @@ async def process_client_order_cmds(
|
|||
# flow so that if a cancel comes from the requesting
|
||||
# client, before that ack, when the ack does arrive we
|
||||
# immediately take the reqid from the broker and cancel
|
||||
# that live order asap.
|
||||
# that order with them immediately.
|
||||
dark_book._ems_entries[oid] = msg
|
||||
|
||||
# "DARK" triggers
|
||||
|
@ -908,6 +725,102 @@ async def process_client_order_cmds(
|
|||
)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def maybe_open_brokerd_trades_dialogue(
|
||||
|
||||
router: _Router,
|
||||
feed: Feed,
|
||||
broker: str,
|
||||
symbol: str,
|
||||
dark_book: _DarkBook,
|
||||
_exec_mode: str,
|
||||
loglevel: str,
|
||||
|
||||
) -> tuple[dict, tractor.MsgStream]:
|
||||
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
||||
already exists.
|
||||
|
||||
'''
|
||||
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
|
||||
portal = feed._brokerd_portal
|
||||
|
||||
if broker in _router.relays:
|
||||
|
||||
positions, brokerd_trades_stream = _router.relays[broker]
|
||||
|
||||
# TODO: get updated positions here?
|
||||
yield positions, brokerd_trades_stream
|
||||
return
|
||||
|
||||
if trades_endpoint is None or _exec_mode == 'paper':
|
||||
|
||||
# for paper mode we need to mock this trades response feed
|
||||
# so we load bidir stream to a new sub-actor running a
|
||||
# paper-simulator clearing engine.
|
||||
|
||||
# load the paper trading engine
|
||||
_exec_mode = 'paper'
|
||||
log.warning(f'Entering paper trading mode for {broker}')
|
||||
|
||||
# load the paper trading engine as a subactor of this emsd
|
||||
# actor to simulate the real IPC load it'll have when also
|
||||
# pulling data from feeds
|
||||
open_trades_endpoint = paper.open_paperboi(
|
||||
broker=broker,
|
||||
symbol=symbol,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
|
||||
else:
|
||||
# open live brokerd trades endpoint
|
||||
open_trades_endpoint = portal.open_context(
|
||||
trades_endpoint,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
|
||||
async with (
|
||||
|
||||
open_trades_endpoint as (brokerd_ctx, positions),
|
||||
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
||||
trio.open_nursery() as n,
|
||||
|
||||
):
|
||||
# XXX: really we only want one stream per `emsd` actor
|
||||
# to relay global `brokerd` order events unless we're
|
||||
# doing to expect each backend to relay only orders
|
||||
# affiliated with a particular ``trades_dialogue()``
|
||||
# session (seems annoying for implementers). So, here
|
||||
# we cache the relay task and instead of running multiple
|
||||
# tasks (which will result in multiples of the same msg being
|
||||
# relayed for each EMS client) we just register each client
|
||||
# stream to this single relay loop using _router.dialogues
|
||||
|
||||
# begin processing order events from the target brokerd backend
|
||||
# by receiving order submission response messages,
|
||||
# normalizing them to EMS messages and relaying back to
|
||||
# the piker order client set.
|
||||
|
||||
n.start_soon(
|
||||
|
||||
translate_and_relay_brokerd_events,
|
||||
|
||||
broker,
|
||||
# ems_client_order_stream,
|
||||
brokerd_trades_stream,
|
||||
dark_book,
|
||||
_router,
|
||||
)
|
||||
|
||||
_router.relays[broker] = (positions, brokerd_trades_stream)
|
||||
|
||||
try:
|
||||
yield positions, brokerd_trades_stream
|
||||
|
||||
finally:
|
||||
# remove from cache so next client will respawn if needed
|
||||
_router.relays.pop(broker)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def _emsd_main(
|
||||
|
||||
|
@ -942,7 +855,7 @@ async def _emsd_main(
|
|||
run (dark order) conditions on inputs and trigger brokerd "live"
|
||||
order submissions.
|
||||
|
|
||||
- (maybe) ``translate_and_relay_brokerd_events()``:
|
||||
- ``translate_and_relay_brokerd_events()``:
|
||||
accept normalized trades responses from brokerd, process and
|
||||
relay to ems client(s); this is a effectively a "trade event
|
||||
reponse" proxy-broker.
|
||||
|
@ -992,24 +905,24 @@ async def _emsd_main(
|
|||
|
||||
# only open if one isn't already up: we try to keep
|
||||
# as few duplicate streams as necessary
|
||||
_router.maybe_open_brokerd_trades_dialogue(
|
||||
maybe_open_brokerd_trades_dialogue(
|
||||
_router,
|
||||
feed,
|
||||
broker,
|
||||
symbol,
|
||||
dark_book,
|
||||
_exec_mode,
|
||||
loglevel,
|
||||
|
||||
) as relay,
|
||||
) as (positions, brokerd_trades_stream),
|
||||
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
|
||||
brokerd_stream = relay.brokerd_dialogue # .clone()
|
||||
|
||||
# signal to client that we're started
|
||||
# TODO: we could eventually send back **all** brokerd
|
||||
# positions here?
|
||||
await ems_ctx.started(relay.positions)
|
||||
await ems_ctx.started(positions)
|
||||
|
||||
# establish 2-way stream with requesting order-client and
|
||||
# begin handling inbound order requests and updates
|
||||
|
@ -1019,8 +932,7 @@ async def _emsd_main(
|
|||
n.start_soon(
|
||||
clear_dark_triggers,
|
||||
|
||||
# relay.brokerd_dialogue,
|
||||
brokerd_stream,
|
||||
brokerd_trades_stream,
|
||||
ems_client_order_stream,
|
||||
feed.stream,
|
||||
|
||||
|
@ -1030,41 +942,12 @@ async def _emsd_main(
|
|||
)
|
||||
|
||||
# start inbound (from attached client) order request processing
|
||||
try:
|
||||
_router.clients.add(ems_client_order_stream)
|
||||
|
||||
await process_client_order_cmds(
|
||||
|
||||
ems_client_order_stream,
|
||||
|
||||
# relay.brokerd_dialogue,
|
||||
brokerd_stream,
|
||||
|
||||
brokerd_trades_stream,
|
||||
symbol,
|
||||
feed,
|
||||
dark_book,
|
||||
_router,
|
||||
)
|
||||
|
||||
finally:
|
||||
# remove client from "registry"
|
||||
_router.clients.remove(ems_client_order_stream)
|
||||
|
||||
dialogues = _router.dialogues
|
||||
|
||||
for oid, client_stream in dialogues.items():
|
||||
|
||||
if client_stream == ems_client_order_stream:
|
||||
|
||||
log.warning(
|
||||
f'client dialogue is being abandoned:\n'
|
||||
f'{oid} ->\n{client_stream._ctx.chan.uid}'
|
||||
)
|
||||
dialogues.pop(oid)
|
||||
|
||||
# TODO: for order dialogues left "alive" in
|
||||
# the ems this is where we should allow some
|
||||
# system to take over management. Likely we
|
||||
# want to allow the user to choose what kind
|
||||
# of policy to use (eg. cancel all orders
|
||||
# from client, run some algo, etc.).
|
||||
|
|
|
@ -233,13 +233,12 @@ async def sample_and_broadcast(
|
|||
|
||||
for (stream, tick_throttle) in subs:
|
||||
|
||||
try:
|
||||
if tick_throttle:
|
||||
await stream.send(quote)
|
||||
|
||||
else:
|
||||
try:
|
||||
await stream.send({sym: quote})
|
||||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
|
@ -248,19 +247,13 @@ async def sample_and_broadcast(
|
|||
# if it's done in the fee bus code?
|
||||
# so far seems like no since this should all
|
||||
# be single-threaded.
|
||||
log.warning(
|
||||
f'{stream._ctx.chan.uid} dropped '
|
||||
'`brokerd`-quotes-feed connection'
|
||||
)
|
||||
subs.remove((stream, tick_throttle))
|
||||
log.error(f'{stream._ctx.chan.uid} dropped connection')
|
||||
|
||||
|
||||
async def uniform_rate_send(
|
||||
|
||||
rate: float,
|
||||
quote_stream: trio.abc.ReceiveChannel,
|
||||
stream: tractor.MsgStream,
|
||||
|
||||
) -> None:
|
||||
|
||||
sleep_period = 1/rate - 0.000616
|
||||
|
@ -296,14 +289,8 @@ async def uniform_rate_send(
|
|||
|
||||
# TODO: now if only we could sync this to the display
|
||||
# rate timing exactly lul
|
||||
try:
|
||||
await stream.send({first_quote['symbol']: first_quote})
|
||||
break
|
||||
except trio.ClosedResourceError:
|
||||
# if the feed consumer goes down then drop
|
||||
# out of this rate limiter
|
||||
log.warning(f'{stream} closed')
|
||||
return
|
||||
|
||||
end = time.time()
|
||||
diff = end - start
|
||||
|
|
|
@ -305,11 +305,6 @@ async def attach_feed_bus(
|
|||
):
|
||||
|
||||
if tick_throttle:
|
||||
|
||||
# open a bg task which receives quotes over a mem chan
|
||||
# and only pushes them to the target actor-consumer at
|
||||
# a max ``tick_throttle`` instantaneous rate.
|
||||
|
||||
send, recv = trio.open_memory_channel(2**10)
|
||||
n.start_soon(
|
||||
uniform_rate_send,
|
||||
|
@ -326,12 +321,7 @@ async def attach_feed_bus(
|
|||
|
||||
try:
|
||||
await trio.sleep_forever()
|
||||
|
||||
finally:
|
||||
log.info(
|
||||
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
|
||||
if tick_throttle:
|
||||
n.cancel_scope.cancel()
|
||||
bus._subscribers[symbol].remove(sub)
|
||||
|
||||
|
||||
|
@ -483,6 +473,11 @@ async def open_feed(
|
|||
ctx.open_stream() as stream,
|
||||
):
|
||||
|
||||
# TODO: can we make this work better with the proposed
|
||||
# context based bidirectional streaming style api proposed in:
|
||||
# https://github.com/goodboy/tractor/issues/53
|
||||
# init_msg = await stream.receive()
|
||||
|
||||
# we can only read from shm
|
||||
shm = attach_shm_array(
|
||||
token=init_msg[sym]['shm_token'],
|
||||
|
@ -525,8 +520,4 @@ async def open_feed(
|
|||
|
||||
feed._max_sample_rate = max(ohlc_sample_rates)
|
||||
|
||||
try:
|
||||
yield feed
|
||||
finally:
|
||||
# drop the infinite stream connection
|
||||
await ctx.cancel()
|
||||
|
|
|
@ -19,6 +19,7 @@ High level Qt chart widgets.
|
|||
|
||||
"""
|
||||
import time
|
||||
from contextlib import AsyncExitStack
|
||||
from typing import Tuple, Dict, Any, Optional
|
||||
from types import ModuleType
|
||||
from functools import partial
|
||||
|
@ -843,7 +844,7 @@ class ChartPlotWidget(pg.PlotWidget):
|
|||
# istart=max(lbar, l), iend=min(rbar, r), just_history=True)
|
||||
|
||||
# bars_len = rbar - lbar
|
||||
# log.debug(
|
||||
# log.trace(
|
||||
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
|
||||
# f"view_len: {view_len}, bars_len: {bars_len}\n"
|
||||
# f"begin: {begin}, end: {end}, extra: {extra}"
|
||||
|
@ -1473,6 +1474,7 @@ async def display_symbol_data(
|
|||
|
||||
) as feed,
|
||||
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
|
||||
ohlcv: ShmArray = feed.shm
|
||||
|
@ -1540,7 +1542,6 @@ async def display_symbol_data(
|
|||
},
|
||||
})
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
# load initial fsp chain (otherwise known as "indicators")
|
||||
n.start_soon(
|
||||
spawn_fsps,
|
||||
|
@ -1577,29 +1578,41 @@ async def display_symbol_data(
|
|||
await start_order_mode(chart, symbol, provider, order_mode_started)
|
||||
|
||||
|
||||
async def load_provider_search(
|
||||
async def load_providers(
|
||||
|
||||
broker: str,
|
||||
brokernames: list[str],
|
||||
loglevel: str,
|
||||
|
||||
) -> None:
|
||||
|
||||
# TODO: seems like our incentive for brokerd caching lelel
|
||||
backends = {}
|
||||
|
||||
async with AsyncExitStack() as stack:
|
||||
# TODO: spawn these async in nursery.
|
||||
# load all requested brokerd's at startup and load their
|
||||
# search engines.
|
||||
for broker in brokernames:
|
||||
|
||||
log.info(f'loading brokerd for {broker}..')
|
||||
|
||||
async with (
|
||||
|
||||
# spin up broker daemons for each provider
|
||||
portal = await stack.enter_async_context(
|
||||
maybe_spawn_brokerd(
|
||||
broker,
|
||||
loglevel=loglevel
|
||||
) as portal,
|
||||
)
|
||||
)
|
||||
|
||||
backends[broker] = portal
|
||||
|
||||
await stack.enter_async_context(
|
||||
feed.install_brokerd_search(
|
||||
portal,
|
||||
get_brokermod(broker),
|
||||
),
|
||||
):
|
||||
)
|
||||
)
|
||||
|
||||
# keep search engine stream up until cancelled
|
||||
# keep search engines up until cancelled
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
|
@ -1640,7 +1653,9 @@ async def _async_main(
|
|||
sbar = godwidget.window.status_bar
|
||||
starting_done = sbar.open_status('starting ze sexy chartz')
|
||||
|
||||
async with trio.open_nursery() as root_n:
|
||||
async with (
|
||||
trio.open_nursery() as root_n,
|
||||
):
|
||||
|
||||
# set root nursery and task stack for spawning other charts/feeds
|
||||
# that run cached in the bg
|
||||
|
@ -1679,8 +1694,7 @@ async def _async_main(
|
|||
):
|
||||
# load other providers into search **after**
|
||||
# the chart's select cache
|
||||
for broker in brokernames:
|
||||
root_n.start_soon(load_provider_search, broker, loglevel)
|
||||
root_n.start_soon(load_providers, brokernames, loglevel)
|
||||
|
||||
await order_mode_ready.wait()
|
||||
|
||||
|
|
Loading…
Reference in New Issue