diff --git a/piker/_ems.py b/piker/_ems.py new file mode 100644 index 00000000..f3650f49 --- /dev/null +++ b/piker/_ems.py @@ -0,0 +1,91 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +In suit parlance: "Execution management systems" + +""" +import trio +from trio_typing import TaskStatus +import tractor + + +_to_router: trio.abc.SendChannel = None +_from_ui: trio.abc.ReceiveChannel = None + + +# TODO: make this a ``tractor.msg.pub`` +async def stream_orders(): + """Order streaming task: deliver orders transmitted from UI + to downstream consumers. + + This is run in the UI actor (usually the one running Qt). + The UI simply delivers order messages to the above ``_to_router`` + send channel (from sync code using ``.send_nowait()``), these values + are pulled from the channel here and send to any consumer(s). + + """ + global _from_ui + + async for order in _from_ui: + yield order + + +async def stream_and_route(ui_name): + """Order router (sub)actor entrypoint. + + """ + actor = tractor.current_actor() + + # new router entry point + async with tractor.wait_for_actor(ui_name) as portal: + + async for order in await portal.run(stream_orders): + print(f'order {order} received in {actor.uid}') + + # push order back to parent as an "alert" + # (mocking for eg. a "fill") + yield order + + +async def spawn_router_stream_alerts( + ident: str, + task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, +) -> None: + + # setup local ui event streaming channels + global _from_ui, _to_router + _to_router, _from_ui = trio.open_memory_channel(100) + + actor = tractor.current_actor() + subactor_name = ident + '.router' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + subactor_name, + rpc_module_paths=[__name__], + ) + stream = await portal.run( + stream_and_route, + ui_name=actor.name + ) + async with tractor.wait_for_actor(subactor_name): + # let parent task continue + task_status.started(_to_router) + + async for alert in stream: + print(f'alert {alert} received in {actor.uid}') diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 7af09d38..7d65d6c5 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -16,6 +16,7 @@ """ High level Qt chart widgets. + """ from typing import Tuple, Dict, Any, Optional, Callable from functools import partial @@ -25,7 +26,6 @@ import numpy as np import pyqtgraph as pg import tractor import trio -from trio_typing import TaskStatus from ._axes import ( DynamicDateAxis, @@ -59,6 +59,7 @@ from ..log import get_logger from ._exec import run_qtractor, current_screen from ._interaction import ChartView from .. import fsp +from .._ems import spawn_router_stream_alerts, _to_router log = get_logger(__name__) @@ -818,73 +819,6 @@ class ChartPlotWidget(pg.PlotWidget): self.scene().leaveEvent(ev) -_to_router: trio.abc.SendChannel = None -_from_ui: trio.abc.ReceiveChannel = None - - -# TODO: make this a ``tractor.msg.pub`` -async def stream_orders(): - """Order streaming task: deliver orders transmitted from UI - to downstream consumers. - - This is run in the UI actor (usually the one running Qt). - The UI simply delivers order messages to the above ``_to_router`` - send channel (from sync code using ``.send_nowait()``), these values - are pulled from the channel here and send to any consumer(s). - - """ - global _from_ui - - async for order in _from_ui: - yield order - - -async def stream_and_route(ui_name): - """Order router actor entrypoint. - - """ - actor = tractor.current_actor() - - # new router entry point - async with tractor.wait_for_actor(ui_name) as portal: - async for order in await portal.run(stream_orders): - print(f'order {order} received in {actor.uid}') - - # push order back to parent as an "alert" - # (mocking for eg. a "fill") - yield order - - -async def spawn_router_stream_alerts( - ident: str, - task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, -) -> None: - - # setup local ui event streaming channels - global _from_ui, _to_router - _to_router, _from_ui = trio.open_memory_channel(100) - - actor = tractor.current_actor() - subactor_name = ident + '.router' - - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - subactor_name, - rpc_module_paths=[__name__], - ) - stream = await portal.run( - stream_and_route, - ui_name=actor.name - ) - - # let parent task continue - task_status.started(subactor_name) - - async for alert in stream: - print(f'alert {alert} received in {actor.uid}') - - async def _async_main( sym: str, brokername: str, @@ -970,16 +904,6 @@ async def _async_main( async with trio.open_nursery() as n: - router_name = await n.start( - spawn_router_stream_alerts, - sym, - ) - - # wait for router to come up before setting - # enabling send channel on chart - async with tractor.wait_for_actor(router_name): - global _to_router - linked_charts._to_router = _to_router # load initial fsp chain (otherwise known as "indicators") n.start_soon( @@ -1001,8 +925,18 @@ async def _async_main( wap_in_history, ) + router_send_chan = await n.start( + spawn_router_stream_alerts, + sym, + ) + + # wait for router to come up before setting + # enabling send channel on chart + linked_charts._to_router = router_send_chan + # wait for a first quote before we start any update tasks quote = await feed.receive() + log.info(f'Received first quote {quote}') n.start_soon( diff --git a/piker/ui/cli.py b/piker/ui/cli.py index 0adeaf5a..d2050bbc 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -150,6 +150,6 @@ def chart(config, symbol, date, rate, test, profile): tractor_kwargs={ 'debug_mode': True, 'loglevel': tractorloglevel, - 'rpc_module_paths': ['piker.ui._chart'], + 'rpc_module_paths': ['piker._ems'], }, )