Compare commits

..

No commits in common. "f75057bc64f06b617687872122f6c5b9b2ab770a" and "b81c538e85c407746b7c60e8b2ec47b30738cb43" have entirely different histories.

7 changed files with 278 additions and 471 deletions

View File

@ -19,12 +19,11 @@ Structured, daemon tree service management.
""" """
from typing import Optional, Union, Callable, Any from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager from contextlib import asynccontextmanager, AsyncExitStack
from collections import defaultdict from collections import defaultdict
from pydantic import BaseModel from pydantic import BaseModel
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
from .log import get_logger, get_console_log from .log import get_logger, get_console_log
@ -46,79 +45,36 @@ _root_modules = [
class Services(BaseModel): class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} ctx_stack: AsyncExitStack
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
async def start_service_task( async def open_remote_ctx(
self, self,
name: str,
portal: tractor.Portal, portal: tractor.Portal,
target: Callable, target: Callable,
**kwargs, **kwargs,
) -> (trio.CancelScope, tractor.Context): ) -> tractor.Context:
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown. that gets unwound at ``pikerd`` tearodwn.
This allows for allocating long-running sub-services in our main This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes. daemon and explicitly controlling their lifetimes.
''' '''
async def open_context_in_task( ctx, first = await self.ctx_stack.enter_async_context(
task_status: TaskStatus[ portal.open_context(
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target, target,
**kwargs, **kwargs,
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
# wait on any context's return value
ctx_res = await ctx.result()
log.info(
f'`pikerd` service {name} started with value {ctx_res}'
) )
)
# wait on any error from the sub-actor return ctx
# NOTE: this will block indefinitely until cancelled
# either by error from the target context function or
# by being cancelled here by the surroundingn cancel
# scope
return await (portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
return cs, first
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None _services: Optional[Services] = None
@ -161,19 +117,19 @@ async def open_pikerd(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=_root_modules, enable_modules=_root_modules,
) as _, ) as _,
tractor.open_nursery() as actor_nursery, tractor.open_nursery() as actor_nursery,
): ):
async with trio.open_nursery() as service_nursery: async with trio.open_nursery() as service_nursery:
# # setup service mngr singleton instance # setup service mngr singleton instance
# async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation # assign globally for future daemon/task creation
_services = Services( _services = Services(
actor_n=actor_nursery, actor_n=actor_nursery,
service_n=service_nursery, service_n=service_nursery,
debug_mode=debug_mode, debug_mode=debug_mode,
ctx_stack=stack,
) )
yield _services yield _services
@ -218,20 +174,16 @@ async def maybe_open_pikerd(
# subtle, we must have the runtime up here or portal lookup will fail # subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs): async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal: async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None # assert portal is not None
if portal is not None: if portal is not None:
yield portal yield portal
return return
# presume pikerd role since no daemon could be found at # presume pikerd role
# configured address
async with open_pikerd( async with open_pikerd(
loglevel=loglevel, loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False), debug_mode=kwargs.get('debug_mode', False),
) as _: ) as _:
# in the case where we're starting up the # in the case where we're starting up the
# tractor-piker runtime stack in **this** process # tractor-piker runtime stack in **this** process
@ -257,7 +209,7 @@ class Brokerd:
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
service_task_target: Callable, spawn_func: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**kwargs, **kwargs,
@ -267,13 +219,6 @@ async def maybe_spawn_daemon(
If no ``service_name`` daemon-actor can be found, If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it. spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
""" """
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
@ -301,24 +246,13 @@ async def maybe_spawn_daemon(
) as pikerd_portal: ) as pikerd_portal:
if pikerd_portal is None: if pikerd_portal is None:
# we are the root and thus are `pikerd` # we are root so spawn brokerd directly in our tree
# so spawn the target service directly by calling # the root nursery is accessed through process global state
# the provided target routine. await spawn_func(**spawn_args)
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
else: else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run( await pikerd_portal.run(
service_task_target, spawn_func,
**spawn_args, **spawn_args,
) )
@ -333,7 +267,7 @@ async def spawn_brokerd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**tractor_kwargs, **tractor_kwargs,
) -> bool: ) -> tractor._portal.Portal:
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -346,8 +280,6 @@ async def spawn_brokerd(
global _services global _services
assert _services assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
portal = await _services.actor_n.start_actor( portal = await _services.actor_n.start_actor(
dname, dname,
enable_modules=_data_mods + [brokermod.__name__], enable_modules=_data_mods + [brokermod.__name__],
@ -359,13 +291,13 @@ async def spawn_brokerd(
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd from .data import _setup_persistent_brokerd
await _services.start_service_task( await _services.open_remote_ctx(
dname,
portal, portal,
_setup_persistent_brokerd, _setup_persistent_brokerd,
brokername=brokername, brokername=brokername,
) )
return True
return dname
@asynccontextmanager @asynccontextmanager
@ -382,7 +314,7 @@ async def maybe_spawn_brokerd(
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
f'brokerd.{brokername}', f'brokerd.{brokername}',
service_task_target=spawn_brokerd, spawn_func=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel}, spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,
@ -396,7 +328,7 @@ async def spawn_emsd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**extra_tractor_kwargs **extra_tractor_kwargs
) -> bool: ) -> tractor._portal.Portal:
""" """
Start the clearing engine under ``pikerd``. Start the clearing engine under ``pikerd``.
@ -420,12 +352,12 @@ async def spawn_emsd(
# non-blocking setup of clearing service # non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd from .clearing._ems import _setup_persistent_emsd
await _services.start_service_task( await _services.open_remote_ctx(
'emsd',
portal, portal,
_setup_persistent_emsd, _setup_persistent_emsd,
) )
return True
return 'emsd'
@asynccontextmanager @asynccontextmanager
@ -440,7 +372,7 @@ async def maybe_open_emsd(
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
'emsd', 'emsd',
service_task_target=spawn_emsd, spawn_func=spawn_emsd,
spawn_args={'loglevel': loglevel}, spawn_args={'loglevel': loglevel},
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,

View File

@ -53,6 +53,6 @@ def resproc(
log.exception(f"Failed to process {resp}:\n{resp.text}") log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text) raise BrokerError(resp.text)
else: else:
log.debug(f"Received json contents:\n{colorize_json(json)}") log.trace(f"Received json contents:\n{colorize_json(json)}")
return json if return_json else resp return json if return_json else resp

View File

@ -1442,14 +1442,14 @@ async def trades_dialogue(
if getattr(msg, 'reqid', 0) < -1: if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage. # it's a trade event generated by TWS usage.
log.info(f"TWS triggered trade\n{pformat(msg.dict())}") log.warning(f"TWS triggered trade:\n{pformat(msg)}")
msg.reqid = 'tws-' + str(-1 * msg.reqid) msg.reqid = 'tws-' + str(-1 * msg.reqid)
# mark msg as from "external system" # mark msg as from "external system"
# TODO: probably something better then this.. and start # TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking # considering multiplayer/group trades tracking
msg.broker_details['external_src'] = 'tws' msg.external = True
continue continue
# XXX: we always serialize to a dict for msgpack # XXX: we always serialize to a dict for msgpack

View File

@ -27,14 +27,12 @@ from typing import AsyncIterator, Callable, Any
from bidict import bidict from bidict import bidict
from pydantic import BaseModel from pydantic import BaseModel
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
from .. import data from .. import data
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed from ..data.feed import Feed
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Status, Order, Status, Order,
@ -84,16 +82,15 @@ def mk_check(
@dataclass @dataclass
class _DarkBook: class _DarkBook:
'''EMS-trigger execution book. """Client-side execution book.
Contains conditions for executions (aka "orders" or "triggers") Contains conditions for executions (aka "orders") which are not
which are not exposed to brokers and thus the market; i.e. these are exposed to brokers and thus the market; i.e. these are privacy
privacy focussed "client side" orders which are submitted in real-time focussed "client side" orders.
based on specified trigger conditions.
A an instance per `brokerd` is created per EMS actor (for now). A singleton instance is created per EMS actor (for now).
''' """
broker: str broker: str
# levels which have an executable action (eg. alert, order, signal) # levels which have an executable action (eg. alert, order, signal)
@ -259,34 +256,17 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
@dataclass
class TradesRelay:
brokerd_dialogue: tractor.MsgStream
positions: dict[str, float]
consumers: int = 0
class _Router(BaseModel): class _Router(BaseModel):
'''Order router which manages and tracks per-broker dark book, '''Order router which manages per-broker dark books, alerts,
alerts, clearing and related data feed management. and clearing related data feed management.
A singleton per ``emsd`` actor.
''' '''
# setup at actor spawn time
nursery: trio.Nursery nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {} feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {} books: dict[str, _DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {} dialogues: dict[str, list[tractor.MsgStream]] = {}
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
@ -300,160 +280,10 @@ class _Router(BaseModel):
return self.books.setdefault(brokername, _DarkBook(brokername)) return self.books.setdefault(brokername, _DarkBook(brokername))
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
self,
feed: Feed,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
relay = self.relays.get(feed.mod.name)
if relay is None:
relay = await self.nursery.start(
open_brokerd_trades_dialogue,
self,
feed,
symbol,
_exec_mode,
loglevel,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_dialogue
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
_router: _Router = None _router: _Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
symbol: str,
_exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug about this!
# portal = feed._brokerd_portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed/fucked
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=positions,
consumers=1
)
_router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context @tractor.context
async def _setup_persistent_emsd( async def _setup_persistent_emsd(
@ -468,18 +298,20 @@ async def _setup_persistent_emsd(
_router = _Router(nursery=service_nursery) _router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent # TODO: send back the full set of persistent orders/execs persistent
# orders/execs?
await ctx.started() await ctx.started()
# allow service tasks to run until cancelled # we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever() await trio.sleep_forever()
async def translate_and_relay_brokerd_events( async def translate_and_relay_brokerd_events(
broker: str, broker: str,
# ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook,
router: _Router, router: _Router,
) -> AsyncIterator[dict]: ) -> AsyncIterator[dict]:
@ -502,11 +334,6 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'} {'presubmitted', 'submitted', 'cancelled', 'inactive'}
''' '''
book = router.get_dark_book(broker)
relay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
async for brokerd_msg in brokerd_trades_stream: async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name'] name = brokerd_msg['name']
@ -515,15 +342,13 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd``
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# relay through position msgs immediately by # relay through position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
for client_stream in router.clients: for oid, ems_client_order_stream in router.dialogues.items():
await client_stream.send(pos_msg)
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
continue continue
@ -600,7 +425,7 @@ async def translate_and_relay_brokerd_events(
resp = None resp = None
broker_details = {} broker_details = {}
# client_flow_complete: bool = False client_flow_complete: bool = False
if name in ( if name in (
'error', 'error',
@ -635,7 +460,7 @@ async def translate_and_relay_brokerd_events(
if msg.status == 'cancelled': if msg.status == 'cancelled':
# client_flow_complete = True client_flow_complete = True
log.info(f'Cancellation for {oid} is complete!') log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled': if msg.status == 'filled':
@ -648,7 +473,7 @@ async def translate_and_relay_brokerd_events(
# be sure to pop this stream from our dialogue set # be sure to pop this stream from our dialogue set
# since the order dialogue should be done. # since the order dialogue should be done.
# client_flow_complete = True client_flow_complete = True
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
# just log it # just log it
@ -678,7 +503,6 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message # Create and relay response status message
# to requesting EMS client # to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid] ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send( await ems_client_order_stream.send(
Status( Status(
@ -689,9 +513,6 @@ async def translate_and_relay_brokerd_events(
brokerd_msg=broker_details, brokerd_msg=broker_details,
).dict() ).dict()
) )
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the
@ -712,8 +533,6 @@ async def process_client_order_cmds(
) -> None: ) -> None:
client_dialogues = router.dialogues
# cmd: dict # cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
@ -722,16 +541,14 @@ async def process_client_order_cmds(
action = cmd['action'] action = cmd['action']
oid = cmd['oid'] oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id # register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be # such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory # routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
router.dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid) live_entry = dark_book._ems_entries.get(oid)
@ -838,7 +655,7 @@ async def process_client_order_cmds(
# flow so that if a cancel comes from the requesting # flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we # client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel # immediately take the reqid from the broker and cancel
# that live order asap. # that order with them immediately.
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
# "DARK" triggers # "DARK" triggers
@ -908,6 +725,102 @@ async def process_client_order_cmds(
) )
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
broker: str,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if broker in _router.relays:
positions, brokerd_trades_stream = _router.relays[broker]
# TODO: get updated positions here?
yield positions, brokerd_trades_stream
return
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
trio.open_nursery() as n,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
# ems_client_order_stream,
brokerd_trades_stream,
dark_book,
_router,
)
_router.relays[broker] = (positions, brokerd_trades_stream)
try:
yield positions, brokerd_trades_stream
finally:
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
@ -942,7 +855,7 @@ async def _emsd_main(
run (dark order) conditions on inputs and trigger brokerd "live" run (dark order) conditions on inputs and trigger brokerd "live"
order submissions. order submissions.
| |
- (maybe) ``translate_and_relay_brokerd_events()``: - ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker. reponse" proxy-broker.
@ -992,24 +905,24 @@ async def _emsd_main(
# only open if one isn't already up: we try to keep # only open if one isn't already up: we try to keep
# as few duplicate streams as necessary # as few duplicate streams as necessary
_router.maybe_open_brokerd_trades_dialogue( maybe_open_brokerd_trades_dialogue(
_router,
feed, feed,
broker,
symbol, symbol,
dark_book, dark_book,
_exec_mode, _exec_mode,
loglevel, loglevel,
) as relay, ) as (positions, brokerd_trades_stream),
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started # signal to client that we're started
# TODO: we could eventually send back **all** brokerd # TODO: we could eventually send back **all** brokerd
# positions here? # positions here?
await ems_ctx.started(relay.positions) await ems_ctx.started(positions)
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates # begin handling inbound order requests and updates
@ -1019,8 +932,7 @@ async def _emsd_main(
n.start_soon( n.start_soon(
clear_dark_triggers, clear_dark_triggers,
# relay.brokerd_dialogue, brokerd_trades_stream,
brokerd_stream,
ems_client_order_stream, ems_client_order_stream,
feed.stream, feed.stream,
@ -1030,41 +942,12 @@ async def _emsd_main(
) )
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
try:
_router.clients.add(ems_client_order_stream)
await process_client_order_cmds( await process_client_order_cmds(
ems_client_order_stream, ems_client_order_stream,
brokerd_trades_stream,
# relay.brokerd_dialogue,
brokerd_stream,
symbol, symbol,
feed, feed,
dark_book, dark_book,
_router, _router,
) )
finally:
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
dialogues = _router.dialogues
for oid, client_stream in dialogues.items():
if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.).

View File

@ -233,13 +233,12 @@ async def sample_and_broadcast(
for (stream, tick_throttle) in subs: for (stream, tick_throttle) in subs:
try:
if tick_throttle: if tick_throttle:
await stream.send(quote) await stream.send(quote)
else: else:
try:
await stream.send({sym: quote}) await stream.send({sym: quote})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
@ -248,19 +247,13 @@ async def sample_and_broadcast(
# if it's done in the fee bus code? # if it's done in the fee bus code?
# so far seems like no since this should all # so far seems like no since this should all
# be single-threaded. # be single-threaded.
log.warning( log.error(f'{stream._ctx.chan.uid} dropped connection')
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
subs.remove((stream, tick_throttle))
async def uniform_rate_send( async def uniform_rate_send(
rate: float, rate: float,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream, stream: tractor.MsgStream,
) -> None: ) -> None:
sleep_period = 1/rate - 0.000616 sleep_period = 1/rate - 0.000616
@ -296,14 +289,8 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display # TODO: now if only we could sync this to the display
# rate timing exactly lul # rate timing exactly lul
try:
await stream.send({first_quote['symbol']: first_quote}) await stream.send({first_quote['symbol']: first_quote})
break break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
end = time.time() end = time.time()
diff = end - start diff = end - start

View File

@ -305,11 +305,6 @@ async def attach_feed_bus(
): ):
if tick_throttle: if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
n.start_soon( n.start_soon(
uniform_rate_send, uniform_rate_send,
@ -326,12 +321,7 @@ async def attach_feed_bus(
try: try:
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub) bus._subscribers[symbol].remove(sub)
@ -483,6 +473,11 @@ async def open_feed(
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
# init_msg = await stream.receive()
# we can only read from shm # we can only read from shm
shm = attach_shm_array( shm = attach_shm_array(
token=init_msg[sym]['shm_token'], token=init_msg[sym]['shm_token'],
@ -525,8 +520,4 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()

View File

@ -19,6 +19,7 @@ High level Qt chart widgets.
""" """
import time import time
from contextlib import AsyncExitStack
from typing import Tuple, Dict, Any, Optional from typing import Tuple, Dict, Any, Optional
from types import ModuleType from types import ModuleType
from functools import partial from functools import partial
@ -843,7 +844,7 @@ class ChartPlotWidget(pg.PlotWidget):
# istart=max(lbar, l), iend=min(rbar, r), just_history=True) # istart=max(lbar, l), iend=min(rbar, r), just_history=True)
# bars_len = rbar - lbar # bars_len = rbar - lbar
# log.debug( # log.trace(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n" # f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n" # f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}" # f"begin: {begin}, end: {end}, extra: {extra}"
@ -1473,6 +1474,7 @@ async def display_symbol_data(
) as feed, ) as feed,
trio.open_nursery() as n,
): ):
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm
@ -1540,7 +1542,6 @@ async def display_symbol_data(
}, },
}) })
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
n.start_soon( n.start_soon(
spawn_fsps, spawn_fsps,
@ -1577,29 +1578,41 @@ async def display_symbol_data(
await start_order_mode(chart, symbol, provider, order_mode_started) await start_order_mode(chart, symbol, provider, order_mode_started)
async def load_provider_search( async def load_providers(
broker: str, brokernames: list[str],
loglevel: str, loglevel: str,
) -> None: ) -> None:
# TODO: seems like our incentive for brokerd caching lelel
backends = {}
async with AsyncExitStack() as stack:
# TODO: spawn these async in nursery.
# load all requested brokerd's at startup and load their
# search engines.
for broker in brokernames:
log.info(f'loading brokerd for {broker}..') log.info(f'loading brokerd for {broker}..')
# spin up broker daemons for each provider
async with ( portal = await stack.enter_async_context(
maybe_spawn_brokerd( maybe_spawn_brokerd(
broker, broker,
loglevel=loglevel loglevel=loglevel
) as portal, )
)
backends[broker] = portal
await stack.enter_async_context(
feed.install_brokerd_search( feed.install_brokerd_search(
portal, portal,
get_brokermod(broker), get_brokermod(broker),
), )
): )
# keep search engine stream up until cancelled # keep search engines up until cancelled
await trio.sleep_forever() await trio.sleep_forever()
@ -1640,7 +1653,9 @@ async def _async_main(
sbar = godwidget.window.status_bar sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz') starting_done = sbar.open_status('starting ze sexy chartz')
async with trio.open_nursery() as root_n: async with (
trio.open_nursery() as root_n,
):
# set root nursery and task stack for spawning other charts/feeds # set root nursery and task stack for spawning other charts/feeds
# that run cached in the bg # that run cached in the bg
@ -1679,8 +1694,7 @@ async def _async_main(
): ):
# load other providers into search **after** # load other providers into search **after**
# the chart's select cache # the chart's select cache
for broker in brokernames: root_n.start_soon(load_providers, brokernames, loglevel)
root_n.start_soon(load_provider_search, broker, loglevel)
await order_mode_ready.wait() await order_mode_ready.wait()