Add draft order actor architecture

basic_alerts
Tyler Goodlet 2021-01-01 12:37:58 -05:00
parent 5fddb581ab
commit bd85214017
2 changed files with 80 additions and 4 deletions

View File

@ -25,6 +25,7 @@ import numpy as np
import pyqtgraph as pg
import tractor
import trio
from trio_typing import TaskStatus
from ._axes import (
DynamicDateAxis,
@ -407,10 +408,6 @@ class ChartPlotWidget(pg.PlotWidget):
self.default_view()
# TODO: stick in config
# use cross-hair for cursor?
# self.setCursor(QtCore.Qt.CrossCursor)
# Assign callback for rescaling y-axis automatically
# based on data contents and ``ViewBox`` state.
self.sigXRangeChanged.connect(self._set_yrange)
@ -821,6 +818,73 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev)
_to_router: trio.abc.SendChannel = None
_from_ui: trio.abc.ReceiveChannel = None
# TODO: make this a ``tractor.msg.pub``
async def stream_orders():
"""Order streaming task: deliver orders transmitted from UI
to downstream consumers.
This is run in the UI actor (usually the one running Qt).
The UI simply delivers order messages to the above ``_to_router``
send channel (from sync code using ``.send_nowait()``), these values
are pulled from the channel here and send to any consumer(s).
"""
global _from_ui
async for order in _from_ui:
yield order
async def stream_and_route(ui_name):
"""Order router actor entrypoint.
"""
actor = tractor.current_actor()
# new router entry point
async with tractor.wait_for_actor(ui_name) as portal:
async for order in await portal.run(stream_orders):
print(f'order {order} received in {actor.uid}')
# push order back to parent as an "alert"
# (mocking for eg. a "fill")
yield order
async def spawn_router_stream_alerts(
ident: str,
task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
) -> None:
# setup local ui event streaming channels
global _from_ui, _to_router
_to_router, _from_ui = trio.open_memory_channel(100)
actor = tractor.current_actor()
subactor_name = ident + '.router'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
subactor_name,
rpc_module_paths=[__name__],
)
stream = await portal.run(
stream_and_route,
ui_name=actor.name
)
# let parent task continue
task_status.started(subactor_name)
async for alert in stream:
print(f'alert {alert} received in {actor.uid}')
async def _async_main(
sym: str,
brokername: str,
@ -906,6 +970,17 @@ async def _async_main(
async with trio.open_nursery() as n:
router_name = await n.start(
spawn_router_stream_alerts,
sym,
)
# wait for router to come up before setting
# enabling send channel on chart
async with tractor.wait_for_actor(router_name):
global _to_router
linked_charts._to_router = _to_router
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
spawn_fsps,

View File

@ -150,5 +150,6 @@ def chart(config, symbol, date, rate, test, profile):
tractor_kwargs={
'debug_mode': True,
'loglevel': tractorloglevel,
'rpc_module_paths': ['piker.ui._chart'],
},
)