Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet f75057bc64 Fix TWS triggered trades msg packing 2021-07-05 09:53:46 -04:00
Tyler Goodlet d8fd1c0d64 Load provider search engines in tasks instead of exit stack 2021-07-05 09:53:19 -04:00
Tyler Goodlet b306d1573b Feed detach must explicitly unsub throttled streams
If a client attaches to a quotes data feed and requests a throttle rate,
be sure to unsub that side-band memchan + task when it detaches and
especially so on any transport connection error.

Also, use an explicit `tractor.Context.cancel()` on the client feed
block exit since we removed the implicit cancel option from the
`tractor` api.
2021-07-05 09:41:35 -04:00
Tyler Goodlet 77baad1e92 Make json resp log debug level 2021-07-05 09:36:54 -04:00
Tyler Goodlet ce40e46c91 TOSQUASH ems comments 2021-07-01 08:43:01 -04:00
Tyler Goodlet f348cbcd52 Better formalize `pikerd` service semantics
An async exit stack around the new `@tractor.context` is problematic
since a pushed context can't bubble errors unless the exit stack has
been closed. But in that case why do you need the exit stack if you're
going to push it and wait it right away; it seems more correct to use
a nursery and spawn a task in `pikerd` that waits on the both the
target context completion first (thus being able to bubble up any errors
from the remote, and top level service task) and the sub-actor portal.
(Sub)service Daemons are spawned with `.start_actor()` and thus will
block forever until cancelled so, add a way to cancel them explicitly
which we'll need eventually for restarts and dynamic feed management.

The big lesson here is that async exit stacks are not conducive to
spawning and monitoring service tasks, and especially so if
a `@tractor.context` is used since if the `.open_context()` call isn't
exited (only possible by the stack being closed), then there will be no
way for `trio` to cancel the task that pushed that context (since it
can't run a checkpoint while yielded inside the stack) without also
cancelling all other contexts pushed on that stack. Presuming one
`pikerd` task is used to do the original pushing (which it was) then
any error would have to kill all service daemon tasks which obviously
won't work.

I see this mostly as the painz of tinkering out an SC service manager
with `tractor` / `trio` for the first time, so try to go easy on the
process ;P
2021-06-26 16:52:15 -04:00
Tyler Goodlet 1edccf37d9 Support multiple client dialogues active on one brokerd trades dialogue 2021-06-26 16:00:04 -04:00
Tyler Goodlet d6d7c24320 WIP single brokerd dialogue 2021-06-25 00:57:58 -04:00
Tyler Goodlet 2465c8fc78 Pop subscriber streams on connection errors 2021-06-25 00:53:32 -04:00
Tyler Goodlet 998775dfd9 Don't use a context stack for contexts 2021-06-25 00:44:02 -04:00
7 changed files with 471 additions and 278 deletions

View File

@ -19,11 +19,12 @@ Structured, daemon tree service management.
""" """
from typing import Optional, Union, Callable, Any from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager
from collections import defaultdict from collections import defaultdict
from pydantic import BaseModel from pydantic import BaseModel
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
from .log import get_logger, get_console_log from .log import get_logger, get_console_log
@ -45,36 +46,79 @@ _root_modules = [
class Services(BaseModel): class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
ctx_stack: AsyncExitStack service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
async def open_remote_ctx( async def start_service_task(
self, self,
name: str,
portal: tractor.Portal, portal: tractor.Portal,
target: Callable, target: Callable,
**kwargs, **kwargs,
) -> tractor.Context: ) -> (trio.CancelScope, tractor.Context):
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` tearodwn. that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes. daemon and explicitly controlling their lifetimes.
''' '''
ctx, first = await self.ctx_stack.enter_async_context( async def open_context_in_task(
portal.open_context( task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target, target,
**kwargs, **kwargs,
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
# wait on any context's return value
ctx_res = await ctx.result()
log.info(
f'`pikerd` service {name} started with value {ctx_res}'
) )
)
return ctx # wait on any error from the sub-actor
# NOTE: this will block indefinitely until cancelled
# either by error from the target context function or
# by being cancelled here by the surroundingn cancel
# scope
return await (portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
return cs, first
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None _services: Optional[Services] = None
@ -117,19 +161,19 @@ async def open_pikerd(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=_root_modules, enable_modules=_root_modules,
) as _, ) as _,
tractor.open_nursery() as actor_nursery, tractor.open_nursery() as actor_nursery,
): ):
async with trio.open_nursery() as service_nursery: async with trio.open_nursery() as service_nursery:
# setup service mngr singleton instance # # setup service mngr singleton instance
async with AsyncExitStack() as stack: # async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation # assign globally for future daemon/task creation
_services = Services( _services = Services(
actor_n=actor_nursery, actor_n=actor_nursery,
service_n=service_nursery, service_n=service_nursery,
debug_mode=debug_mode, debug_mode=debug_mode,
ctx_stack=stack,
) )
yield _services yield _services
@ -174,16 +218,20 @@ async def maybe_open_pikerd(
# subtle, we must have the runtime up here or portal lookup will fail # subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs): async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal: async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None # assert portal is not None
if portal is not None: if portal is not None:
yield portal yield portal
return return
# presume pikerd role # presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd( async with open_pikerd(
loglevel=loglevel, loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False), debug_mode=kwargs.get('debug_mode', False),
) as _: ) as _:
# in the case where we're starting up the # in the case where we're starting up the
# tractor-piker runtime stack in **this** process # tractor-piker runtime stack in **this** process
@ -209,7 +257,7 @@ class Brokerd:
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
spawn_func: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**kwargs, **kwargs,
@ -219,6 +267,13 @@ async def maybe_spawn_daemon(
If no ``service_name`` daemon-actor can be found, If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it. spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
""" """
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
@ -246,13 +301,24 @@ async def maybe_spawn_daemon(
) as pikerd_portal: ) as pikerd_portal:
if pikerd_portal is None: if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree # we are the root and thus are `pikerd`
# the root nursery is accessed through process global state # so spawn the target service directly by calling
await spawn_func(**spawn_args) # the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
else: else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run( await pikerd_portal.run(
spawn_func, service_task_target,
**spawn_args, **spawn_args,
) )
@ -267,7 +333,7 @@ async def spawn_brokerd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**tractor_kwargs, **tractor_kwargs,
) -> tractor._portal.Portal: ) -> bool:
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -280,6 +346,8 @@ async def spawn_brokerd(
global _services global _services
assert _services assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
portal = await _services.actor_n.start_actor( portal = await _services.actor_n.start_actor(
dname, dname,
enable_modules=_data_mods + [brokermod.__name__], enable_modules=_data_mods + [brokermod.__name__],
@ -291,13 +359,13 @@ async def spawn_brokerd(
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd from .data import _setup_persistent_brokerd
await _services.open_remote_ctx( await _services.start_service_task(
dname,
portal, portal,
_setup_persistent_brokerd, _setup_persistent_brokerd,
brokername=brokername, brokername=brokername,
) )
return True
return dname
@asynccontextmanager @asynccontextmanager
@ -314,7 +382,7 @@ async def maybe_spawn_brokerd(
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
f'brokerd.{brokername}', f'brokerd.{brokername}',
spawn_func=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel}, spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,
@ -328,7 +396,7 @@ async def spawn_emsd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**extra_tractor_kwargs **extra_tractor_kwargs
) -> tractor._portal.Portal: ) -> bool:
""" """
Start the clearing engine under ``pikerd``. Start the clearing engine under ``pikerd``.
@ -352,12 +420,12 @@ async def spawn_emsd(
# non-blocking setup of clearing service # non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd from .clearing._ems import _setup_persistent_emsd
await _services.open_remote_ctx( await _services.start_service_task(
'emsd',
portal, portal,
_setup_persistent_emsd, _setup_persistent_emsd,
) )
return True
return 'emsd'
@asynccontextmanager @asynccontextmanager
@ -372,7 +440,7 @@ async def maybe_open_emsd(
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
'emsd', 'emsd',
spawn_func=spawn_emsd, service_task_target=spawn_emsd,
spawn_args={'loglevel': loglevel}, spawn_args={'loglevel': loglevel},
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,

View File

@ -53,6 +53,6 @@ def resproc(
log.exception(f"Failed to process {resp}:\n{resp.text}") log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text) raise BrokerError(resp.text)
else: else:
log.trace(f"Received json contents:\n{colorize_json(json)}") log.debug(f"Received json contents:\n{colorize_json(json)}")
return json if return_json else resp return json if return_json else resp

View File

@ -1442,14 +1442,14 @@ async def trades_dialogue(
if getattr(msg, 'reqid', 0) < -1: if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage. # it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}") log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
msg.reqid = 'tws-' + str(-1 * msg.reqid) msg.reqid = 'tws-' + str(-1 * msg.reqid)
# mark msg as from "external system" # mark msg as from "external system"
# TODO: probably something better then this.. and start # TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking # considering multiplayer/group trades tracking
msg.external = True msg.broker_details['external_src'] = 'tws'
continue continue
# XXX: we always serialize to a dict for msgpack # XXX: we always serialize to a dict for msgpack

View File

@ -27,12 +27,14 @@ from typing import AsyncIterator, Callable, Any
from bidict import bidict from bidict import bidict
from pydantic import BaseModel from pydantic import BaseModel
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
from .. import data from .. import data
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed from ..data.feed import Feed
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Status, Order, Status, Order,
@ -82,15 +84,16 @@ def mk_check(
@dataclass @dataclass
class _DarkBook: class _DarkBook:
"""Client-side execution book. '''EMS-trigger execution book.
Contains conditions for executions (aka "orders") which are not Contains conditions for executions (aka "orders" or "triggers")
exposed to brokers and thus the market; i.e. these are privacy which are not exposed to brokers and thus the market; i.e. these are
focussed "client side" orders. privacy focussed "client side" orders which are submitted in real-time
based on specified trigger conditions.
A singleton instance is created per EMS actor (for now). A an instance per `brokerd` is created per EMS actor (for now).
""" '''
broker: str broker: str
# levels which have an executable action (eg. alert, order, signal) # levels which have an executable action (eg. alert, order, signal)
@ -256,17 +259,34 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
@dataclass
class TradesRelay:
brokerd_dialogue: tractor.MsgStream
positions: dict[str, float]
consumers: int = 0
class _Router(BaseModel): class _Router(BaseModel):
'''Order router which manages per-broker dark books, alerts, '''Order router which manages and tracks per-broker dark book,
and clearing related data feed management. alerts, clearing and related data feed management.
A singleton per ``emsd`` actor.
''' '''
# setup at actor spawn time
nursery: trio.Nursery nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {} feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {} books: dict[str, _DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {} dialogues: dict[str, list[tractor.MsgStream]] = {}
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
@ -280,10 +300,160 @@ class _Router(BaseModel):
return self.books.setdefault(brokername, _DarkBook(brokername)) return self.books.setdefault(brokername, _DarkBook(brokername))
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
self,
feed: Feed,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
relay = self.relays.get(feed.mod.name)
if relay is None:
relay = await self.nursery.start(
open_brokerd_trades_dialogue,
self,
feed,
symbol,
_exec_mode,
loglevel,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_dialogue
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
_router: _Router = None _router: _Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
symbol: str,
_exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug about this!
# portal = feed._brokerd_portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed/fucked
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=positions,
consumers=1
)
_router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context @tractor.context
async def _setup_persistent_emsd( async def _setup_persistent_emsd(
@ -298,20 +468,18 @@ async def _setup_persistent_emsd(
_router = _Router(nursery=service_nursery) _router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent orders/execs persistent # TODO: send back the full set of persistent
# orders/execs?
await ctx.started() await ctx.started()
# we pin this task to keep the feeds manager active until the # allow service tasks to run until cancelled
# parent actor decides to tear it down
await trio.sleep_forever() await trio.sleep_forever()
async def translate_and_relay_brokerd_events( async def translate_and_relay_brokerd_events(
broker: str, broker: str,
# ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook,
router: _Router, router: _Router,
) -> AsyncIterator[dict]: ) -> AsyncIterator[dict]:
@ -334,6 +502,11 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'} {'presubmitted', 'submitted', 'cancelled', 'inactive'}
''' '''
book = router.get_dark_book(broker)
relay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
async for brokerd_msg in brokerd_trades_stream: async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name'] name = brokerd_msg['name']
@ -342,13 +515,15 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd``
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# relay through position msgs immediately by # relay through position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
for oid, ems_client_order_stream in router.dialogues.items(): for client_stream in router.clients:
await client_stream.send(pos_msg)
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
continue continue
@ -425,7 +600,7 @@ async def translate_and_relay_brokerd_events(
resp = None resp = None
broker_details = {} broker_details = {}
client_flow_complete: bool = False # client_flow_complete: bool = False
if name in ( if name in (
'error', 'error',
@ -460,7 +635,7 @@ async def translate_and_relay_brokerd_events(
if msg.status == 'cancelled': if msg.status == 'cancelled':
client_flow_complete = True # client_flow_complete = True
log.info(f'Cancellation for {oid} is complete!') log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled': if msg.status == 'filled':
@ -473,7 +648,7 @@ async def translate_and_relay_brokerd_events(
# be sure to pop this stream from our dialogue set # be sure to pop this stream from our dialogue set
# since the order dialogue should be done. # since the order dialogue should be done.
client_flow_complete = True # client_flow_complete = True
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
# just log it # just log it
@ -503,6 +678,7 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message # Create and relay response status message
# to requesting EMS client # to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid] ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send( await ems_client_order_stream.send(
Status( Status(
@ -513,6 +689,9 @@ async def translate_and_relay_brokerd_events(
brokerd_msg=broker_details, brokerd_msg=broker_details,
).dict() ).dict()
) )
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the
@ -533,6 +712,8 @@ async def process_client_order_cmds(
) -> None: ) -> None:
client_dialogues = router.dialogues
# cmd: dict # cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
@ -541,14 +722,16 @@ async def process_client_order_cmds(
action = cmd['action'] action = cmd['action']
oid = cmd['oid'] oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id # register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be # such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory # routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
router.dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid) live_entry = dark_book._ems_entries.get(oid)
@ -655,7 +838,7 @@ async def process_client_order_cmds(
# flow so that if a cancel comes from the requesting # flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we # client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel # immediately take the reqid from the broker and cancel
# that order with them immediately. # that live order asap.
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
# "DARK" triggers # "DARK" triggers
@ -725,102 +908,6 @@ async def process_client_order_cmds(
) )
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
broker: str,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if broker in _router.relays:
positions, brokerd_trades_stream = _router.relays[broker]
# TODO: get updated positions here?
yield positions, brokerd_trades_stream
return
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
trio.open_nursery() as n,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
# ems_client_order_stream,
brokerd_trades_stream,
dark_book,
_router,
)
_router.relays[broker] = (positions, brokerd_trades_stream)
try:
yield positions, brokerd_trades_stream
finally:
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
@ -855,7 +942,7 @@ async def _emsd_main(
run (dark order) conditions on inputs and trigger brokerd "live" run (dark order) conditions on inputs and trigger brokerd "live"
order submissions. order submissions.
| |
- ``translate_and_relay_brokerd_events()``: - (maybe) ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker. reponse" proxy-broker.
@ -905,24 +992,24 @@ async def _emsd_main(
# only open if one isn't already up: we try to keep # only open if one isn't already up: we try to keep
# as few duplicate streams as necessary # as few duplicate streams as necessary
maybe_open_brokerd_trades_dialogue( _router.maybe_open_brokerd_trades_dialogue(
_router,
feed, feed,
broker,
symbol, symbol,
dark_book, dark_book,
_exec_mode, _exec_mode,
loglevel, loglevel,
) as (positions, brokerd_trades_stream), ) as relay,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started # signal to client that we're started
# TODO: we could eventually send back **all** brokerd # TODO: we could eventually send back **all** brokerd
# positions here? # positions here?
await ems_ctx.started(positions) await ems_ctx.started(relay.positions)
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates # begin handling inbound order requests and updates
@ -932,7 +1019,8 @@ async def _emsd_main(
n.start_soon( n.start_soon(
clear_dark_triggers, clear_dark_triggers,
brokerd_trades_stream, # relay.brokerd_dialogue,
brokerd_stream,
ems_client_order_stream, ems_client_order_stream,
feed.stream, feed.stream,
@ -942,12 +1030,41 @@ async def _emsd_main(
) )
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
try:
_router.clients.add(ems_client_order_stream)
await process_client_order_cmds( await process_client_order_cmds(
ems_client_order_stream, ems_client_order_stream,
brokerd_trades_stream,
# relay.brokerd_dialogue,
brokerd_stream,
symbol, symbol,
feed, feed,
dark_book, dark_book,
_router, _router,
) )
finally:
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
dialogues = _router.dialogues
for oid, client_stream in dialogues.items():
if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.).

View File

@ -233,12 +233,13 @@ async def sample_and_broadcast(
for (stream, tick_throttle) in subs: for (stream, tick_throttle) in subs:
try:
if tick_throttle: if tick_throttle:
await stream.send(quote) await stream.send(quote)
else: else:
try:
await stream.send({sym: quote}) await stream.send({sym: quote})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
@ -247,13 +248,19 @@ async def sample_and_broadcast(
# if it's done in the fee bus code? # if it's done in the fee bus code?
# so far seems like no since this should all # so far seems like no since this should all
# be single-threaded. # be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection') log.warning(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
subs.remove((stream, tick_throttle))
async def uniform_rate_send( async def uniform_rate_send(
rate: float, rate: float,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream, stream: tractor.MsgStream,
) -> None: ) -> None:
sleep_period = 1/rate - 0.000616 sleep_period = 1/rate - 0.000616
@ -289,8 +296,14 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display # TODO: now if only we could sync this to the display
# rate timing exactly lul # rate timing exactly lul
try:
await stream.send({first_quote['symbol']: first_quote}) await stream.send({first_quote['symbol']: first_quote})
break break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
end = time.time() end = time.time()
diff = end - start diff = end - start

View File

@ -305,6 +305,11 @@ async def attach_feed_bus(
): ):
if tick_throttle: if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
n.start_soon( n.start_soon(
uniform_rate_send, uniform_rate_send,
@ -321,7 +326,12 @@ async def attach_feed_bus(
try: try:
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub) bus._subscribers[symbol].remove(sub)
@ -473,11 +483,6 @@ async def open_feed(
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
# init_msg = await stream.receive()
# we can only read from shm # we can only read from shm
shm = attach_shm_array( shm = attach_shm_array(
token=init_msg[sym]['shm_token'], token=init_msg[sym]['shm_token'],
@ -520,4 +525,8 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()

View File

@ -19,7 +19,6 @@ High level Qt chart widgets.
""" """
import time import time
from contextlib import AsyncExitStack
from typing import Tuple, Dict, Any, Optional from typing import Tuple, Dict, Any, Optional
from types import ModuleType from types import ModuleType
from functools import partial from functools import partial
@ -844,7 +843,7 @@ class ChartPlotWidget(pg.PlotWidget):
# istart=max(lbar, l), iend=min(rbar, r), just_history=True) # istart=max(lbar, l), iend=min(rbar, r), just_history=True)
# bars_len = rbar - lbar # bars_len = rbar - lbar
# log.trace( # log.debug(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n" # f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n" # f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}" # f"begin: {begin}, end: {end}, extra: {extra}"
@ -1474,7 +1473,6 @@ async def display_symbol_data(
) as feed, ) as feed,
trio.open_nursery() as n,
): ):
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm
@ -1542,6 +1540,7 @@ async def display_symbol_data(
}, },
}) })
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
n.start_soon( n.start_soon(
spawn_fsps, spawn_fsps,
@ -1578,41 +1577,29 @@ async def display_symbol_data(
await start_order_mode(chart, symbol, provider, order_mode_started) await start_order_mode(chart, symbol, provider, order_mode_started)
async def load_providers( async def load_provider_search(
brokernames: list[str], broker: str,
loglevel: str, loglevel: str,
) -> None: ) -> None:
# TODO: seems like our incentive for brokerd caching lelel
backends = {}
async with AsyncExitStack() as stack:
# TODO: spawn these async in nursery.
# load all requested brokerd's at startup and load their
# search engines.
for broker in brokernames:
log.info(f'loading brokerd for {broker}..') log.info(f'loading brokerd for {broker}..')
# spin up broker daemons for each provider
portal = await stack.enter_async_context( async with (
maybe_spawn_brokerd( maybe_spawn_brokerd(
broker, broker,
loglevel=loglevel loglevel=loglevel
) ) as portal,
)
backends[broker] = portal
await stack.enter_async_context(
feed.install_brokerd_search( feed.install_brokerd_search(
portal, portal,
get_brokermod(broker), get_brokermod(broker),
) ),
) ):
# keep search engines up until cancelled # keep search engine stream up until cancelled
await trio.sleep_forever() await trio.sleep_forever()
@ -1653,9 +1640,7 @@ async def _async_main(
sbar = godwidget.window.status_bar sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz') starting_done = sbar.open_status('starting ze sexy chartz')
async with ( async with trio.open_nursery() as root_n:
trio.open_nursery() as root_n,
):
# set root nursery and task stack for spawning other charts/feeds # set root nursery and task stack for spawning other charts/feeds
# that run cached in the bg # that run cached in the bg
@ -1694,7 +1679,8 @@ async def _async_main(
): ):
# load other providers into search **after** # load other providers into search **after**
# the chart's select cache # the chart's select cache
root_n.start_soon(load_providers, brokernames, loglevel) for broker in brokernames:
root_n.start_soon(load_provider_search, broker, loglevel)
await order_mode_ready.wait() await order_mode_ready.wait()