Compare commits

...

41 Commits

Author SHA1 Message Date
Tyler Goodlet f75057bc64 Fix TWS triggered trades msg packing 2021-07-05 09:53:46 -04:00
Tyler Goodlet d8fd1c0d64 Load provider search engines in tasks instead of exit stack 2021-07-05 09:53:19 -04:00
Tyler Goodlet b306d1573b Feed detach must explicitly unsub throttled streams
If a client attaches to a quotes data feed and requests a throttle rate,
be sure to unsub that side-band memchan + task when it detaches and
especially so on any transport connection error.

Also, use an explicit `tractor.Context.cancel()` on the client feed
block exit since we removed the implicit cancel option from the
`tractor` api.
2021-07-05 09:41:35 -04:00
Tyler Goodlet 77baad1e92 Make json resp log debug level 2021-07-05 09:36:54 -04:00
Tyler Goodlet ce40e46c91 TOSQUASH ems comments 2021-07-01 08:43:01 -04:00
Tyler Goodlet f348cbcd52 Better formalize `pikerd` service semantics
An async exit stack around the new `@tractor.context` is problematic
since a pushed context can't bubble errors unless the exit stack has
been closed. But in that case why do you need the exit stack if you're
going to push it and wait it right away; it seems more correct to use
a nursery and spawn a task in `pikerd` that waits on the both the
target context completion first (thus being able to bubble up any errors
from the remote, and top level service task) and the sub-actor portal.
(Sub)service Daemons are spawned with `.start_actor()` and thus will
block forever until cancelled so, add a way to cancel them explicitly
which we'll need eventually for restarts and dynamic feed management.

The big lesson here is that async exit stacks are not conducive to
spawning and monitoring service tasks, and especially so if
a `@tractor.context` is used since if the `.open_context()` call isn't
exited (only possible by the stack being closed), then there will be no
way for `trio` to cancel the task that pushed that context (since it
can't run a checkpoint while yielded inside the stack) without also
cancelling all other contexts pushed on that stack. Presuming one
`pikerd` task is used to do the original pushing (which it was) then
any error would have to kill all service daemon tasks which obviously
won't work.

I see this mostly as the painz of tinkering out an SC service manager
with `tractor` / `trio` for the first time, so try to go easy on the
process ;P
2021-06-26 16:52:15 -04:00
Tyler Goodlet 1edccf37d9 Support multiple client dialogues active on one brokerd trades dialogue 2021-06-26 16:00:04 -04:00
Tyler Goodlet d6d7c24320 WIP single brokerd dialogue 2021-06-25 00:57:58 -04:00
Tyler Goodlet 2465c8fc78 Pop subscriber streams on connection errors 2021-06-25 00:53:32 -04:00
Tyler Goodlet 998775dfd9 Don't use a context stack for contexts 2021-06-25 00:44:02 -04:00
Tyler Goodlet b81c538e85 Add more futes, add in order status comments 2021-06-23 10:09:45 -04:00
Tyler Goodlet 3dea1834de Make subplot proportion slightly larger 2021-06-23 10:07:41 -04:00
Tyler Goodlet 95e8d8c3a2 Label doc tweak 2021-06-23 10:06:27 -04:00
Tyler Goodlet 3bf400a1c3 WIP position market offscreen nav 2021-06-23 10:06:05 -04:00
Tyler Goodlet 39e8fb6e1c Only close event send side (facepalm) 2021-06-23 10:04:56 -04:00
Tyler Goodlet 56d13a46c7 Don't forget to pop the brokerd dialogue on teardown.. 2021-06-22 13:19:38 -04:00
Tyler Goodlet 5787488bdb Don't cancel handler nursery, let errors bubble 2021-06-22 10:58:52 -04:00
Tyler Goodlet 3f02c88718 Better live order handling logic 2021-06-22 10:57:08 -04:00
Tyler Goodlet cfd6e3216f Enable contents labels on q for "query" 2021-06-22 08:18:18 -04:00
Tyler Goodlet 54baa7b132 Avoid multiple `brokerd` trades dialogue flows
This solves a bunch of issues to do with `brokerd` order status msgs
getting relayed for each order to **every** correspondingly connected
EMS client. Previously we weren't keeping track of which emsd orders
were associated with which clients so you had backend msgs getting
broadcast to all clients which not only resulted in duplicate (and
sometimes erroneous, due to state tracking) actions taking place in the
UI's order mode, but it's also just duplicate traffic (usually to the
same actor) over multiple logical streams. Instead, only keep up **one**
(cached) stream with the `trades_dialogue()` endpoint such that **all**
emsd orders route over that single connection to the particular
`brokerd` actor.
2021-06-22 07:48:31 -04:00
Tyler Goodlet 803c02bd3e Port all to use new cursor and ohlc refs 2021-06-22 07:17:49 -04:00
Tyler Goodlet d2c3b03513 Move contents labels management to cursor mod
Add a new type/api to manage "contents labels" (labels that sit in
a view and display info about viewed data) since it's mostly used by
the linked charts cursor. Make `LinkedSplits.cursor` the new and only
instance var for the cursor such that charts can look it up from that
common class. Drop the `ChartPlotWidget._ohlc` array, just add
a `'ohlc'` entry to `._arrays`.
2021-06-21 16:45:27 -04:00
Tyler Goodlet 3a041e4f47 Drop global order lines map
Orders in order mode should be chart oriented since there's a mode per
chart. If you want all orders just ask the ems or query all the charts
in a loop.

This fixes cancel-all-orders such that when 'cc' is tapped only the
orders on the *current* chart are cancelled, lel.
2021-06-18 17:13:39 -04:00
Tyler Goodlet 97a55156ed Use group status for symbol loading 2021-06-18 09:38:14 -04:00
Tyler Goodlet 17a40862fd Fix old msg clearing var name 2021-06-18 09:37:55 -04:00
Tyler Goodlet 7d1f9c5102 Add order cancellation and submission statuses
Generalize the methods for cancelling groups of orders (all or those
under cursor) and add new group status support such that statuses for
each cancel or order submission is displayed in the status bar. In the
"cancel-all-orders" case, use the new group status stuff.
2021-06-17 17:00:57 -04:00
Tyler Goodlet 0c1c18bb94 Add an all order lines getter method 2021-06-17 17:00:10 -04:00
Tyler Goodlet fb040339e5 Add "group statuses" support to status bar
Allows for submitting a top level "group status" associated with
a "group key" which eventually resolves once all sub-statuses associated
with that group key (and thus top level status) complete and are also
removed. Also add support for a "final message" for each status such
that once the status clear callback is called a final msg is placed on
the status bar that is then removed when the next status is set.

It's all a questionable bunch of closures/callbacks but it worx.
2021-06-17 16:53:19 -04:00
Tyler Goodlet 095850c3ae Add fast tap key sequence support and order-mode-type statuses 2021-06-17 16:52:54 -04:00
Tyler Goodlet 92f350ab37 Make alerts solid line only 2021-06-16 10:31:00 -04:00
Tyler Goodlet 760323000f Didn't end up needing a task stack 2021-06-16 08:45:11 -04:00
Tyler Goodlet 8dd5bbf4fa Start input handling **after** order mode is up 2021-06-16 08:28:57 -04:00
Tyler Goodlet 29d3ad59dc Don't access unset cursor 2021-06-16 08:28:11 -04:00
Tyler Goodlet 5c93c2b42f Beautifully simplify kb handling code with set ops 2021-06-16 08:27:34 -04:00
Tyler Goodlet 151e427e1f Drop old commented behaviour; see parent class if needed 2021-06-16 05:43:35 -04:00
Tyler Goodlet 68093d55f2 Factor press and release handling into same qtloop 2021-06-16 05:24:04 -04:00
Tyler Goodlet 376aa66a73 Move region selection to editors mod 2021-06-15 19:02:46 -04:00
Tyler Goodlet 367a058342 Move line and arrow editors to new mod 2021-06-15 18:54:28 -04:00
Tyler Goodlet 23a03e3a0a Port cursor and axes to new widget names 2021-06-15 18:25:03 -04:00
Tyler Goodlet 08a21378b9 Convert view box to async input handling
Instead of callbacks for key presses/releases convert our `ChartView`'s
kb input handling to async code using our event relaying-over-mem-chan
system. This is a first step toward a more async driven modal control
UX. Changed a bunch of "chart" component naming as part of this as well,
namely: `ChartSpace` -> `GodWidget` and `LinkedSplitCharts` ->
`LinkedSplits`. Engage the view boxe's async handler code as part of new
symbol data loading in `display_symbol_data()`. More re-orging to come!
2021-06-15 18:19:59 -04:00
Tyler Goodlet 4759e79d3d Extend Qt event relaying
Add an `open_handler()` ctx manager for wholesale handling event sets
with a passed in async func. Better document and implement the event
filtering core including adding support for key "auto repeat" filtering;
it turns out the events delivered when `trio` does its guest-most tick
are not the same (Qt has somehow consumed them or something) so we have
to do certain things (like getting the `.type()`, `.isAutoRepeat()`,
etc.) before shipping over the mem chan. The alt might be to copy the
event objects first but haven't tried it yet. For now just offer
auto-repeat filtering through a flag.
2021-06-15 18:14:24 -04:00
19 changed files with 2051 additions and 1271 deletions

View File

@ -19,11 +19,12 @@ Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager, AsyncExitStack
from contextlib import asynccontextmanager
from collections import defaultdict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log
@ -45,36 +46,79 @@ _root_modules = [
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
ctx_stack: AsyncExitStack
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config:
arbitrary_types_allowed = True
async def open_remote_ctx(
async def start_service_task(
self,
name: str,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> tractor.Context:
) -> (trio.CancelScope, tractor.Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` tearodwn.
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
ctx, first = await self.ctx_stack.enter_async_context(
portal.open_context(
async def open_context_in_task(
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
**kwargs,
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
# wait on any context's return value
ctx_res = await ctx.result()
log.info(
f'`pikerd` service {name} started with value {ctx_res}'
)
)
return ctx
# wait on any error from the sub-actor
# NOTE: this will block indefinitely until cancelled
# either by error from the target context function or
# by being cancelled here by the surroundingn cancel
# scope
return await (portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
return cs, first
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None
@ -117,19 +161,19 @@ async def open_pikerd(
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:
# setup service mngr singleton instance
async with AsyncExitStack() as stack:
# # setup service mngr singleton instance
# async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
ctx_stack=stack,
)
yield _services
@ -174,16 +218,20 @@ async def maybe_open_pikerd(
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
# presume pikerd role
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
@ -209,7 +257,7 @@ class Brokerd:
async def maybe_spawn_daemon(
service_name: str,
spawn_func: Callable,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
**kwargs,
@ -219,6 +267,13 @@ async def maybe_spawn_daemon(
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
"""
if loglevel:
get_console_log(loglevel)
@ -228,7 +283,7 @@ async def maybe_spawn_daemon(
lock = Brokerd.locks[service_name]
await lock.acquire()
# attach to existing brokerd if possible
# attach to existing daemon by name if possible
async with tractor.find_actor(service_name) as portal:
if portal is not None:
lock.release()
@ -246,13 +301,24 @@ async def maybe_spawn_daemon(
) as pikerd_portal:
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_func(**spawn_args)
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run(
spawn_func,
service_task_target,
**spawn_args,
)
@ -267,7 +333,7 @@ async def spawn_brokerd(
loglevel: Optional[str] = None,
**tractor_kwargs,
) -> tractor._portal.Portal:
) -> bool:
log.info(f'Spawning {brokername} broker daemon')
@ -280,6 +346,8 @@ async def spawn_brokerd(
global _services
assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
portal = await _services.actor_n.start_actor(
dname,
enable_modules=_data_mods + [brokermod.__name__],
@ -291,13 +359,13 @@ async def spawn_brokerd(
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await _services.open_remote_ctx(
await _services.start_service_task(
dname,
portal,
_setup_persistent_brokerd,
brokername=brokername,
)
return dname
return True
@asynccontextmanager
@ -314,7 +382,7 @@ async def maybe_spawn_brokerd(
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
spawn_func=spawn_brokerd,
service_task_target=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
@ -328,7 +396,7 @@ async def spawn_emsd(
loglevel: Optional[str] = None,
**extra_tractor_kwargs
) -> tractor._portal.Portal:
) -> bool:
"""
Start the clearing engine under ``pikerd``.
@ -352,12 +420,12 @@ async def spawn_emsd(
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await _services.open_remote_ctx(
await _services.start_service_task(
'emsd',
portal,
_setup_persistent_emsd,
)
return 'emsd'
return True
@asynccontextmanager
@ -372,7 +440,7 @@ async def maybe_open_emsd(
async with maybe_spawn_daemon(
'emsd',
spawn_func=spawn_emsd,
service_task_target=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,

View File

@ -53,6 +53,6 @@ def resproc(
log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text)
else:
log.trace(f"Received json contents:\n{colorize_json(json)}")
log.debug(f"Received json contents:\n{colorize_json(json)}")
return json if return_json else resp

View File

@ -169,6 +169,7 @@ _adhoc_futes_set = {
# equities
'nq.globex',
'mnq.globex',
'es.globex',
'mes.globex',
@ -176,8 +177,20 @@ _adhoc_futes_set = {
'brr.cmecrypto',
'ethusdrr.cmecrypto',
# agriculture
'he.globex', # lean hogs
'le.globex', # live cattle (geezers)
'gf.globex', # feeder cattle (younguns)
# raw
'lb.globex', # random len lumber
# metals
'xauusd.cmdty',
'xauusd.cmdty', # gold spot
'gc.nymex',
'mgc.nymex',
'xagusd.cmdty', # silver spot
}
# exchanges we don't support at the moment due to not knowing
@ -310,7 +323,8 @@ class Client:
unique_sym = f'{con.symbol}.{con.primaryExchange}'
as_dict = asdict(d)
# nested dataclass we probably don't need and that won't IPC serialize
# nested dataclass we probably don't need and that
# won't IPC serialize
as_dict.pop('secIdList')
details[unique_sym] = as_dict
@ -553,7 +567,7 @@ class Client:
else:
item = ('status', obj)
log.info(f'eventkit event -> {eventkit_obj}: {item}')
log.info(f'eventkit event ->\n{pformat(item)}')
try:
to_trio.send_nowait(item)
@ -1311,12 +1325,35 @@ async def trades_dialogue(
n.start_soon(handle_order_requests, ems_stream)
async for event_name, item in ib_trade_events_stream:
print(f' ib sending {item}')
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# XXX: here's some other sucky cases from the api
# - short-sale but securities haven't been located, in this case we
# should probably keep the order in some kind of weird state or cancel
# it outright?
# status='PendingSubmit', message=''),
# status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
if event_name == 'status':
# XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
if event_name == 'status':
# unwrap needed data from ib_insync internal types
trade: Trade = item
status: OrderStatus = trade.orderStatus
@ -1327,10 +1364,13 @@ async def trades_dialogue(
reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not
# everyone doin camel case..
status=status.status.lower(), # force lower case
filled=status.filled,
reason=status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
@ -1360,7 +1400,8 @@ async def trades_dialogue(
'contract': asdict(fill.contract),
'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport),
'broker_time': execu.time, # supposedly IB server fill time
# supposedly IB server fill time
'broker_time': execu.time,
'name': 'ib',
}
@ -1401,14 +1442,14 @@ async def trades_dialogue(
if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
msg.reqid = 'tws-' + str(-1 * msg.reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
msg.external = True
msg.broker_details['external_src'] = 'tws'
continue
# XXX: we always serialize to a dict for msgpack
@ -1450,6 +1491,12 @@ async def open_symbol_search(
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client
# side will cache a null set result and not showing
# anything to the use on re-searches when this query
# timed out. We probably need a special "timeout" msg
# or something...
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
@ -1457,7 +1504,7 @@ async def open_symbol_search(
continue
log.debug(f'searching for {pattern}')
# await tractor.breakpoint()
last = time.time()
results = await _trio_run_client_method(
method='search_stocks',

View File

@ -125,7 +125,9 @@ def get_orders(
if _orders is None:
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_orders = OrderBook(*trio.open_memory_channel(1))
_orders = OrderBook(
*trio.open_memory_channel(100),
)
return _orders

View File

@ -18,19 +18,23 @@
In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from pprint import pformat
import time
from dataclasses import dataclass, field
from typing import AsyncIterator, Callable, Any
from bidict import bidict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .. import data
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -80,15 +84,16 @@ def mk_check(
@dataclass
class _DarkBook:
"""Client-side execution book.
'''EMS-trigger execution book.
Contains conditions for executions (aka "orders") which are not
exposed to brokers and thus the market; i.e. these are privacy
focussed "client side" orders.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
privacy focussed "client side" orders which are submitted in real-time
based on specified trigger conditions.
A singleton instance is created per EMS actor (for now).
A an instance per `brokerd` is created per EMS actor (for now).
"""
'''
broker: str
# levels which have an executable action (eg. alert, order, signal)
@ -254,30 +259,237 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}')
# TODO: lots of cases still to handle
# XXX: right now this is very very ad-hoc to IB
# - short-sale but securities haven't been located, in this case we
# should probably keep the order in some kind of weird state or cancel
# it outright?
# status='PendingSubmit', message=''),
# status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
@dataclass
class TradesRelay:
brokerd_dialogue: tractor.MsgStream
positions: dict[str, float]
consumers: int = 0
class _Router(BaseModel):
'''Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
A singleton per ``emsd`` actor.
'''
# setup at actor spawn time
nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book(
self,
brokername: str,
) -> _DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername))
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
self,
feed: Feed,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
relay = self.relays.get(feed.mod.name)
if relay is None:
relay = await self.nursery.start(
open_brokerd_trades_dialogue,
self,
feed,
symbol,
_exec_mode,
loglevel,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_dialogue
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
_router: _Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
symbol: str,
_exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug about this!
# portal = feed._brokerd_portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed/fucked
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=positions,
consumers=1
)
_router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context
async def _setup_persistent_emsd(
ctx: tractor.Context,
) -> None:
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent
# orders/execs?
await ctx.started()
# allow service tasks to run until cancelled
await trio.sleep_forever()
async def translate_and_relay_brokerd_events(
broker: str,
ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook,
router: _Router,
) -> AsyncIterator[dict]:
"""Trades update loop - receive updates from broker, convert
to EMS responses, transmit to ordering client(s).
'''Trades update loop - receive updates from ``brokerd`` trades
endpoint, convert to EMS response msgs, transmit **only** to
ordering client(s).
This is where trade confirmations from the broker are processed
and appropriate responses relayed back to the original EMS client
actor. There is a messaging translation layer throughout.
This is where trade confirmations from the broker are processed and
appropriate responses relayed **only** back to the original EMS
client actor. There is a messaging translation layer throughout.
Expected message translation(s):
@ -286,10 +498,15 @@ async def translate_and_relay_brokerd_events(
'status' -> relabel as 'broker_<status>', if complete send 'executed'
'fill' -> 'broker_filled'
Currently accepted status values from IB:
Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
"""
'''
book = router.get_dark_book(broker)
relay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name']
@ -298,10 +515,16 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
# relay through position msgs immediately
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd``
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# relay through position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients:
await client_stream.send(pos_msg)
continue
# Get the broker (order) request id, this **must** be normalized
@ -331,7 +554,7 @@ async def translate_and_relay_brokerd_events(
# may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib)
ext = brokerd_msg.get('external')
ext = brokerd_msg['broker_details'].get('external')
if ext:
log.error(f"External trade event {ext}")
@ -377,6 +600,7 @@ async def translate_and_relay_brokerd_events(
resp = None
broker_details = {}
# client_flow_complete: bool = False
if name in (
'error',
@ -407,22 +631,13 @@ async def translate_and_relay_brokerd_events(
elif name in (
'status',
):
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case
msg = BrokerdStatus(**brokerd_msg)
if msg.status == 'cancelled':
# client_flow_complete = True
log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled':
# conditional execution is fully complete, no more
@ -431,6 +646,9 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_executed'
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
# client_flow_complete = True
log.info(f'Execution for {oid} is complete!')
# just log it
@ -460,6 +678,8 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
@ -469,6 +689,15 @@ async def translate_and_relay_brokerd_events(
brokerd_msg=broker_details,
).dict()
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# if client_flow_complete:
# router.dialogues.pop(oid)
async def process_client_order_cmds(
@ -477,11 +706,14 @@ async def process_client_order_cmds(
brokerd_order_stream: tractor.MsgStream,
symbol: str,
feed: 'Feed', # noqa
feed: Feed, # noqa
dark_book: _DarkBook,
router: _Router,
) -> None:
client_dialogues = router.dialogues
# cmd: dict
async for cmd in client_order_stream:
@ -489,6 +721,18 @@ async def process_client_order_cmds(
action = cmd['action']
oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
@ -498,27 +742,32 @@ async def process_client_order_cmds(
# check for live-broker order
if live_entry:
reqid = live_entry.reqid
msg = BrokerdCancel(
oid=oid,
reqid=reqid or live_entry.reqid,
reqid=reqid,
time_ns=time.time_ns(),
)
# send cancel to brokerd immediately!
log.info("Submitting cancel for live order")
# NOTE: cancel response will be relayed back in messages
# from corresponding broker
if reqid:
# send cancel to brokerd immediately!
log.info("Submitting cancel for live order {reqid}")
await brokerd_order_stream.send(msg.dict())
else:
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when
# the order ack does show up later
# the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg
# check for EMS active exec
# dark trigger cancel
else:
try:
# remove from dark book clearing
dark_book.orders[symbol].pop(oid, None)
@ -532,6 +781,8 @@ async def process_client_order_cmds(
time_ns=time.time_ns(),
).dict()
)
# de-register this client dialogue
router.dialogues.pop(oid)
except KeyError:
log.exception(f'No dark order for {symbol}?')
@ -581,17 +832,22 @@ async def process_client_order_cmds(
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict())
# an immediate response should be brokerd ack with order
# id but we register our request as part of the flow
# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
# endpoint, but we register our request as part of the
# flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that live order asap.
dark_book._ems_entries[oid] = msg
elif exec_mode in ('dark', 'paper') or (
action in ('alert')
):
# "DARK" triggers
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
elif exec_mode in ('dark', 'paper') or (
action in ('alert')
):
# Auto-gen scanner predicate:
# we automatically figure out what the alert check
# condition should be based on the current first
@ -637,11 +893,11 @@ async def process_client_order_cmds(
percent_away,
abs_diff_away
)
resp = 'dark_submitted'
# alerts have special msgs to distinguish
if action == 'alert':
resp = 'alert_submitted'
else:
resp = 'dark_submitted'
await client_order_stream.send(
Status(
@ -686,7 +942,7 @@ async def _emsd_main(
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
|
- ``translate_and_relay_brokerd_events()``:
- (maybe) ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker.
@ -697,6 +953,8 @@ async def _emsd_main(
'''
global _router
assert _router
dark_book = _router.get_dark_book(broker)
# TODO: would be nice if in tractor we can require either a ctx arg,
@ -711,8 +969,6 @@ async def _emsd_main(
# spawn one task per broker feed
async with (
trio.open_nursery() as n,
# TODO: eventually support N-brokers
data.open_feed(
broker,
@ -732,43 +988,28 @@ async def _emsd_main(
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
# only open if one isn't already up: we try to keep
# as few duplicate streams as necessary
_router.maybe_open_brokerd_trades_dialogue(
feed,
symbol,
dark_book,
_exec_mode,
loglevel,
) as relay,
trio.open_nursery() as n,
):
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started
# TODO: we could eventually send back **all** brokerd
# positions here?
await ems_ctx.started(positions)
await ems_ctx.started(relay.positions)
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
@ -778,7 +1019,8 @@ async def _emsd_main(
n.start_soon(
clear_dark_triggers,
brokerd_trades_stream,
# relay.brokerd_dialogue,
brokerd_stream,
ems_client_order_stream,
feed.stream,
@ -787,72 +1029,42 @@ async def _emsd_main(
book
)
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
ems_client_order_stream,
brokerd_trades_stream,
dark_book,
)
# start inbound (from attached client) order request processing
try:
_router.clients.add(ems_client_order_stream)
await process_client_order_cmds(
ems_client_order_stream,
brokerd_trades_stream,
# relay.brokerd_dialogue,
brokerd_stream,
symbol,
feed,
dark_book,
_router,
)
finally:
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
class _Router(BaseModel):
'''Order router which manages per-broker dark books, alerts,
and clearing related data feed management.
dialogues = _router.dialogues
'''
nursery: trio.Nursery
for oid, client_stream in dialogues.items():
feeds: dict[tuple[str, str], Any] = {}
books: dict[str, _DarkBook] = {}
if client_stream == ems_client_order_stream:
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
def get_dark_book(
self,
brokername: str,
) -> _DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername))
_router: _Router = None
@tractor.context
async def _setup_persistent_emsd(
ctx: tractor.Context,
) -> None:
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent orders/execs persistent
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.).

View File

@ -233,12 +233,13 @@ async def sample_and_broadcast(
for (stream, tick_throttle) in subs:
try:
if tick_throttle:
await stream.send(quote)
else:
try:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
@ -247,13 +248,19 @@ async def sample_and_broadcast(
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')
log.warning(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
subs.remove((stream, tick_throttle))
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream,
) -> None:
sleep_period = 1/rate - 0.000616
@ -289,8 +296,14 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({first_quote['symbol']: first_quote})
break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
end = time.time()
diff = end - start

View File

@ -305,6 +305,11 @@ async def attach_feed_bus(
):
if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10)
n.start_soon(
uniform_rate_send,
@ -321,7 +326,12 @@ async def attach_feed_bus(
try:
await trio.sleep_forever()
finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub)
@ -473,11 +483,6 @@ async def open_feed(
ctx.open_stream() as stream,
):
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
# init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
@ -520,4 +525,8 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()

View File

@ -18,7 +18,6 @@
Annotations for ur faces.
"""
from PyQt5 import QtCore, QtGui
from PyQt5.QtGui import QGraphicsPathItem
from pyqtgraph import Point, functions as fn, Color
@ -26,9 +25,11 @@ import numpy as np
def mk_marker(
style,
size: float = 20.0,
use_qgpath: bool = True,
) -> QGraphicsPathItem:
"""Add a marker to be displayed on the line wrapped in a ``QGraphicsPathItem``
ready to be placed using scene coordinates (not view).

View File

@ -38,7 +38,7 @@ class Axis(pg.AxisItem):
"""
def __init__(
self,
linked_charts,
linkedsplits,
typical_max_str: str = '100 000.000',
min_tick: int = 2,
**kwargs
@ -49,7 +49,7 @@ class Axis(pg.AxisItem):
# XXX: pretty sure this makes things slower
# self.setCacheMode(QtGui.QGraphicsItem.DeviceCoordinateCache)
self.linked_charts = linked_charts
self.linkedsplits = linkedsplits
self._min_tick = min_tick
self._dpi_font = _font
@ -132,9 +132,9 @@ class DynamicDateAxis(Axis):
) -> List[str]:
# try:
chart = self.linked_charts.chart
bars = chart._ohlc
shm = self.linked_charts.chart._shm
chart = self.linkedsplits.chart
bars = chart._arrays['ohlc']
shm = self.linkedsplits.chart._shm
first = shm._first.value
bars_len = len(bars)
@ -232,7 +232,6 @@ class AxisLabel(pg.GraphicsObject):
p.setPen(self.fg_color)
p.drawText(self.rect, self.text_flags, self.label_str)
def draw(
self,
p: QtGui.QPainter,
@ -250,9 +249,9 @@ class AxisLabel(pg.GraphicsObject):
# reason; ok by us
p.setOpacity(self.opacity)
# this cause the L1 labels to glitch out if used
# in the subtype and it will leave a small black strip
# with the arrow path if done before the above
# this cause the L1 labels to glitch out if used in the subtype
# and it will leave a small black strip with the arrow path if
# done before the above
p.fillRect(self.rect, self.bg_color)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,451 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Higher level annotation editors.
"""
from dataclasses import dataclass, field
from typing import Optional
import pyqtgraph as pg
from pyqtgraph import ViewBox, Point, QtCore, QtGui
from pyqtgraph import functions as fn
from PyQt5.QtCore import QPointF
import numpy as np
from ._style import hcolor, _font
from ._graphics._lines import order_line, LevelLine
from ..log import get_logger
log = get_logger(__name__)
@dataclass
class ArrowEditor:
chart: 'ChartPlotWidget' # noqa
_arrows: field(default_factory=dict)
def add(
self,
uid: str,
x: float,
y: float,
color='default',
pointing: Optional[str] = None,
) -> pg.ArrowItem:
"""Add an arrow graphic to view at given (x, y).
"""
angle = {
'up': 90,
'down': -90,
None: 180, # pointing to right (as in an alert)
}[pointing]
# scale arrow sizing to dpi-aware font
size = _font.font.pixelSize() * 0.8
arrow = pg.ArrowItem(
angle=angle,
baseAngle=0,
headLen=size,
headWidth=size/2,
tailLen=None,
pxMode=True,
# coloring
pen=pg.mkPen(hcolor('papas_special')),
brush=pg.mkBrush(hcolor(color)),
)
arrow.setPos(x, y)
self._arrows[uid] = arrow
# render to view
self.chart.plotItem.addItem(arrow)
return arrow
def remove(self, arrow) -> bool:
self.chart.plotItem.removeItem(arrow)
@dataclass
class LineEditor:
'''The great editor of linez.
'''
chart: 'ChartPlotWidget' = None # type: ignore # noqa
_order_lines: dict[str, LevelLine] = field(default_factory=dict)
_active_staged_line: LevelLine = None
def stage_line(
self,
action: str,
color: str = 'alert_yellow',
hl_on_hover: bool = False,
dotted: bool = False,
# fields settings
size: Optional[int] = None,
) -> LevelLine:
"""Stage a line at the current chart's cursor position
and return it.
"""
# chart.setCursor(QtCore.Qt.PointingHandCursor)
cursor = self.chart.linked.cursor
if not cursor:
return None
chart = cursor.active_plot
y = cursor._datum_xy[1]
symbol = chart._lc.symbol
# add a "staged" cursor-tracking line to view
# and cash it in a a var
if self._active_staged_line:
self.unstage_line()
line = order_line(
chart,
level=y,
level_digits=symbol.digits(),
size=size,
size_digits=symbol.lot_digits(),
# just for the stage line to avoid
# flickering while moving the cursor
# around where it might trigger highlight
# then non-highlight depending on sensitivity
always_show_labels=True,
# kwargs
color=color,
# don't highlight the "staging" line
hl_on_hover=hl_on_hover,
dotted=dotted,
exec_type='dark' if dotted else 'live',
action=action,
show_markers=True,
# prevent flickering of marker while moving/tracking cursor
only_show_markers_on_hover=False,
)
self._active_staged_line = line
# hide crosshair y-line and label
cursor.hide_xhair()
# add line to cursor trackers
cursor._trackers.add(line)
return line
def unstage_line(self) -> LevelLine:
"""Inverse of ``.stage_line()``.
"""
# chart = self.chart._cursor.active_plot
# # chart.setCursor(QtCore.Qt.ArrowCursor)
cursor = self.chart.linked.cursor
# delete "staged" cursor tracking line from view
line = self._active_staged_line
if line:
cursor._trackers.remove(line)
line.delete()
self._active_staged_line = None
# show the crosshair y line and label
cursor.show_xhair()
def create_order_line(
self,
uuid: str,
level: float,
chart: 'ChartPlotWidget', # noqa
size: float,
action: str,
) -> LevelLine:
line = self._active_staged_line
if not line:
raise RuntimeError("No line is currently staged!?")
sym = chart._lc.symbol
line = order_line(
chart,
# label fields default values
level=level,
level_digits=sym.digits(),
size=size,
size_digits=sym.lot_digits(),
# LevelLine kwargs
color=line.color,
dotted=line._dotted,
show_markers=True,
only_show_markers_on_hover=True,
action=action,
)
# for now, until submission reponse arrives
line.hide_labels()
# register for later lookup/deletion
self._order_lines[uuid] = line
return line
def commit_line(self, uuid: str) -> LevelLine:
"""Commit a "staged line" to view.
Submits the line graphic under the cursor as a (new) permanent
graphic in view.
"""
try:
line = self._order_lines[uuid]
except KeyError:
log.warning(f'No line for {uuid} could be found?')
return
else:
assert line.oid == uuid
line.show_labels()
# TODO: other flashy things to indicate the order is active
log.debug(f'Level active for level: {line.value()}')
return line
def lines_under_cursor(self) -> list[LevelLine]:
"""Get the line(s) under the cursor position.
"""
# Delete any hoverable under the cursor
return self.chart.linked.cursor._hovered
def all_lines(self) -> tuple[LevelLine]:
return tuple(self._order_lines.values())
def remove_line(
self,
line: LevelLine = None,
uuid: str = None,
) -> LevelLine:
"""Remove a line by refernce or uuid.
If no lines or ids are provided remove all lines under the
cursor position.
"""
if line:
uuid = line.oid
# try to look up line from our registry
line = self._order_lines.pop(uuid, None)
if line:
# if hovered remove from cursor set
cursor = self.chart.linked.cursor
hovered = cursor._hovered
if line in hovered:
hovered.remove(line)
# make sure the xhair doesn't get left off
# just because we never got a un-hover event
cursor.show_xhair()
line.delete()
return line
class SelectRect(QtGui.QGraphicsRectItem):
def __init__(
self,
viewbox: ViewBox,
color: str = 'dad_blue',
) -> None:
super().__init__(0, 0, 1, 1)
# self.rbScaleBox = QtGui.QGraphicsRectItem(0, 0, 1, 1)
self.vb = viewbox
self._chart: 'ChartPlotWidget' = None # noqa
# override selection box color
color = QtGui.QColor(hcolor(color))
self.setPen(fn.mkPen(color, width=1))
color.setAlpha(66)
self.setBrush(fn.mkBrush(color))
self.setZValue(1e9)
self.hide()
self._label = None
label = self._label = QtGui.QLabel()
label.setTextFormat(0) # markdown
label.setFont(_font.font)
label.setMargin(0)
label.setAlignment(
QtCore.Qt.AlignLeft
# | QtCore.Qt.AlignVCenter
)
# proxy is created after containing scene is initialized
self._label_proxy = None
self._abs_top_right = None
# TODO: "swing %" might be handy here (data's max/min # % change)
self._contents = [
'change: {pchng:.2f} %',
'range: {rng:.2f}',
'bars: {nbars}',
'max: {dmx}',
'min: {dmn}',
# 'time: {nbars}m', # TODO: compute this per bar size
'sigma: {std:.2f}',
]
@property
def chart(self) -> 'ChartPlotWidget': # noqa
return self._chart
@chart.setter
def chart(self, chart: 'ChartPlotWidget') -> None: # noqa
self._chart = chart
chart.sigRangeChanged.connect(self.update_on_resize)
palette = self._label.palette()
# TODO: get bg color working
palette.setColor(
self._label.backgroundRole(),
# QtGui.QColor(chart.backgroundBrush()),
QtGui.QColor(hcolor('papas_special')),
)
def update_on_resize(self, vr, r):
"""Re-position measure label on view range change.
"""
if self._abs_top_right:
self._label_proxy.setPos(
self.vb.mapFromView(self._abs_top_right)
)
def mouse_drag_released(
self,
p1: QPointF,
p2: QPointF
) -> None:
"""Called on final button release for mouse drag with start and
end positions.
"""
self.set_pos(p1, p2)
def set_pos(
self,
p1: QPointF,
p2: QPointF
) -> None:
"""Set position of selection rect and accompanying label, move
label to match.
"""
if self._label_proxy is None:
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
self._label_proxy = self.vb.scene().addWidget(self._label)
start_pos = self.vb.mapToView(p1)
end_pos = self.vb.mapToView(p2)
# map to view coords and update area
r = QtCore.QRectF(start_pos, end_pos)
# old way; don't need right?
# lr = QtCore.QRectF(p1, p2)
# r = self.vb.childGroup.mapRectFromParent(lr)
self.setPos(r.topLeft())
self.resetTransform()
self.scale(r.width(), r.height())
self.show()
y1, y2 = start_pos.y(), end_pos.y()
x1, x2 = start_pos.x(), end_pos.x()
# TODO: heh, could probably use a max-min streamin algo here too
_, xmn = min(y1, y2), min(x1, x2)
ymx, xmx = max(y1, y2), max(x1, x2)
pchng = (y2 - y1) / y1 * 100
rng = abs(y1 - y2)
ixmn, ixmx = round(xmn), round(xmx)
nbars = ixmx - ixmn + 1
data = self._chart._arrays['ohlc'][ixmn:ixmx]
if len(data):
std = data['close'].std()
dmx = data['high'].max()
dmn = data['low'].min()
else:
dmn = dmx = std = np.nan
# update label info
self._label.setText('\n'.join(self._contents).format(
pchng=pchng, rng=rng, nbars=nbars,
std=std, dmx=dmx, dmn=dmn,
))
# print(f'x2, y2: {(x2, y2)}')
# print(f'xmn, ymn: {(xmn, ymx)}')
label_anchor = Point(xmx + 2, ymx)
# XXX: in the drag bottom-right -> top-left case we don't
# want the label to overlay the box.
# if (x2, y2) == (xmn, ymx):
# # could do this too but needs to be added after coords transform
# # label_anchor = Point(x2, y2 + self._label.height())
# label_anchor = Point(xmn, ymn)
self._abs_top_right = label_anchor
self._label_proxy.setPos(self.vb.mapFromView(label_anchor))
# self._label.show()
def clear(self):
"""Clear the selection box from view.
"""
self._label.hide()
self.hide()

View File

@ -19,27 +19,41 @@ Qt event proxying and processing using ``trio`` mem chans.
"""
from contextlib import asynccontextmanager
from typing import Callable
from PyQt5 import QtCore, QtGui
from PyQt5 import QtCore
from PyQt5.QtCore import QEvent
from PyQt5.QtGui import QWidget
import trio
class EventCloner(QtCore.QObject):
"""Clone and forward keyboard events over a trio memory channel
for later async processing.
class EventRelay(QtCore.QObject):
'''
Relay Qt events over a trio memory channel for async processing.
"""
'''
_event_types: set[QEvent] = set()
_send_chan: trio.abc.SendChannel = None
_filter_auto_repeats: bool = True
def eventFilter(
self,
source: QtGui.QWidget,
source: QWidget,
ev: QEvent,
) -> None:
'''
Qt global event filter: return `False` to pass through and `True`
to filter event out.
if ev.type() in self._event_types:
https://doc.qt.io/qt-5/qobject.html#eventFilter
https://doc.qt.io/qtforpython/overviews/eventsandfilters.html#event-filters
'''
etype = ev.type()
# print(f'etype: {etype}')
if etype in self._event_types:
# ev.accept()
# TODO: what's the right way to allow this?
# if ev.isAutoRepeat():
@ -51,41 +65,77 @@ class EventCloner(QtCore.QObject):
# something to do with Qt internals and calling the
# parent handler?
if etype in {QEvent.KeyPress, QEvent.KeyRelease}:
# TODO: is there a global setting for this?
if ev.isAutoRepeat() and self._filter_auto_repeats:
ev.ignore()
return True
key = ev.key()
mods = ev.modifiers()
txt = ev.text()
# run async processing
self._send_chan.send_nowait((ev, key, mods, txt))
# NOTE: the event object instance coming out
# the other side is mutated since Qt resumes event
# processing **before** running a ``trio`` guest mode
# tick, thus special handling or copying must be done.
# never intercept the event
# send elements to async handler
self._send_chan.send_nowait((ev, etype, key, mods, txt))
else:
# send event to async handler
self._send_chan.send_nowait(ev)
# **do not** filter out this event
# and instead forward to the source widget
return False
# filter out this event
# https://doc.qt.io/qt-5/qobject.html#installEventFilter
return False
@asynccontextmanager
async def open_key_stream(
async def open_event_stream(
source_widget: QtGui.QWidget,
source_widget: QWidget,
event_types: set[QEvent] = {QEvent.KeyPress},
# TODO: should we offer some kinda option for toggling releases?
# would it require a channel per event type?
# QEvent.KeyRelease,
filter_auto_repeats: bool = True,
) -> trio.abc.ReceiveChannel:
# 1 to force eager sending
send, recv = trio.open_memory_channel(16)
kc = EventCloner()
kc = EventRelay()
kc._send_chan = send
kc._event_types = event_types
kc._filter_auto_repeats = filter_auto_repeats
source_widget.installEventFilter(kc)
try:
async with send:
yield recv
finally:
await send.aclose()
source_widget.removeEventFilter(kc)
@asynccontextmanager
async def open_handler(
source_widget: QWidget,
event_types: set[QEvent],
async_handler: Callable[[QWidget, trio.abc.ReceiveChannel], None],
**kwargs,
) -> None:
async with (
trio.open_nursery() as n,
open_event_stream(source_widget, event_types, **kwargs) as event_recv_stream,
):
n.start_soon(async_handler, source_widget, event_recv_stream)
yield

View File

@ -18,8 +18,8 @@
Mouse interaction graphics
"""
import math
from typing import Optional, Tuple, Set, Dict
from functools import partial
from typing import Optional, Callable
import inspect
import numpy as np
@ -30,7 +30,6 @@ from PyQt5.QtCore import QPointF, QRectF
from .._style import (
_xaxis_at,
hcolor,
_font,
_font_small,
)
from .._axes import YAxisLabel, XAxisLabel
@ -98,7 +97,7 @@ class LineDot(pg.CurvePoint):
(x, y) = self.curve().getData()
index = self.property('index')
# first = self._plot._ohlc[0]['index']
# first = self._plot._arrays['ohlc'][0]['index']
# first = x[0]
# i = index - first
i = index - x[0]
@ -133,11 +132,15 @@ class ContentsLabel(pg.LabelItem):
}
def __init__(
self,
chart: 'ChartPlotWidget', # noqa
# chart: 'ChartPlotWidget', # noqa
view: pg.ViewBox,
anchor_at: str = ('top', 'right'),
justify_text: str = 'left',
font_size: Optional[int] = None,
) -> None:
font_size = font_size or _font_small.px_size
@ -148,9 +151,10 @@ class ContentsLabel(pg.LabelItem):
)
# anchor to viewbox
self.setParentItem(chart._vb)
chart.scene().addItem(self)
self.chart = chart
self.setParentItem(view)
self.vb = view
view.scene().addItem(self)
v, h = anchor_at
index = (self._corner_anchors[h], self._corner_anchors[v])
@ -163,10 +167,12 @@ class ContentsLabel(pg.LabelItem):
self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc(
self,
name: str,
index: int,
array: np.ndarray,
) -> None:
# this being "html" is the dumbest shit :eyeroll:
first = array[0]['index']
@ -188,25 +194,111 @@ class ContentsLabel(pg.LabelItem):
)
def update_from_value(
self,
name: str,
index: int,
array: np.ndarray,
) -> None:
first = array[0]['index']
if index < array[-1]['index'] and index > first:
data = array[index - first][name]
self.setText(f"{name}: {data:.2f}")
class ContentsLabels:
'''Collection of labels that span a ``LinkedSplits`` set of chart plots
and can be updated from the underlying data from an x-index value sent
as input from a cursor or other query mechanism.
'''
def __init__(
self,
linkedsplits: 'LinkedSplits', # type: ignore # noqa
) -> None:
self.linkedsplits = linkedsplits
self._labels: list[(
'CharPlotWidget', # type: ignore # noqa
str,
ContentsLabel,
Callable
)] = []
def update_labels(
self,
index: int,
# array_name: str,
) -> None:
# for name, (label, update) in self._labels.items():
for chart, name, label, update in self._labels:
if not (index >= 0 and index < chart._arrays['ohlc'][-1]['index']):
# out of range
continue
array = chart._arrays[name]
# call provided update func with data point
try:
label.show()
update(index, array)
except IndexError:
log.exception(f"Failed to update label: {name}")
def hide(self) -> None:
for chart, name, label, update in self._labels:
label.hide()
def add_label(
self,
chart: 'ChartPlotWidget', # type: ignore # noqa
name: str,
anchor_at: tuple[str, str] = ('top', 'left'),
update_func: Callable = ContentsLabel.update_from_value,
) -> ContentsLabel:
label = ContentsLabel(
view=chart._vb,
anchor_at=anchor_at,
)
self._labels.append(
(chart, name, label, partial(update_func, label, name))
)
# label.hide()
return label
class Cursor(pg.GraphicsObject):
def __init__(
self,
linkedsplitcharts: 'LinkedSplitCharts', # noqa
linkedsplits: 'LinkedSplits', # noqa
digits: int = 0
) -> None:
super().__init__()
self.linked = linkedsplits
self.graphics: dict[str, pg.GraphicsObject] = {}
self.plots: List['PlotChartWidget'] = [] # type: ignore # noqa
self.active_plot = None
self.digits: int = digits
self._datum_xy: tuple[int, float] = (0, 0)
self._hovered: set[pg.GraphicsObject] = set()
self._trackers: set[pg.GraphicsObject] = set()
# XXX: not sure why these are instance variables?
# It's not like we can change them on the fly..?
self.pen = pg.mkPen(
@ -217,19 +309,10 @@ class Cursor(pg.GraphicsObject):
color=hcolor('davies'),
style=QtCore.Qt.DashLine,
)
self.lsc = linkedsplitcharts
self.graphics: Dict[str, pg.GraphicsObject] = {}
self.plots: List['PlotChartWidget'] = [] # type: ignore # noqa
self.active_plot = None
self.digits: int = digits
self._datum_xy: Tuple[int, float] = (0, 0)
self._hovered: Set[pg.GraphicsObject] = set()
self._trackers: Set[pg.GraphicsObject] = set()
# value used for rounding y-axis discreet tick steps
# computing once, up front, here cuz why not
self._y_incr_mult = 1 / self.lsc._symbol.tick_size
self._y_incr_mult = 1 / self.linked._symbol.tick_size
# line width in view coordinates
self._lw = self.pixelWidth() * self.lines_pen.width()
@ -239,6 +322,26 @@ class Cursor(pg.GraphicsObject):
self._y_label_update: bool = True
self.contents_labels = ContentsLabels(self.linked)
self._in_query_mode: bool = False
@property
def in_query_mode(self) -> bool:
return self._in_query_mode
@in_query_mode.setter
def in_query_mode(self, value: bool) -> None:
if self._in_query_mode and not value:
# edge trigger "off" hide all labels
self.contents_labels.hide()
elif not self._in_query_mode and value:
# edge trigger "on" hide all labels
self.contents_labels.update_labels(self._datum_xy[0])
self._in_query_mode = value
def add_hovered(
self,
item: pg.GraphicsObject,
@ -320,7 +423,7 @@ class Cursor(pg.GraphicsObject):
# the current sample under the mouse
cursor = LineDot(
curve,
index=plot._ohlc[-1]['index'],
index=plot._arrays['ohlc'][-1]['index'],
plot=plot
)
plot.addItem(cursor)
@ -344,7 +447,7 @@ class Cursor(pg.GraphicsObject):
def mouseMoved(
self,
evt: 'Tuple[QMouseEvent]', # noqa
evt: 'tuple[QMouseEvent]', # noqa
) -> None: # noqa
"""Update horizonal and vertical lines when mouse moves inside
either the main chart or any indicator subplot.
@ -392,10 +495,16 @@ class Cursor(pg.GraphicsObject):
item.on_tracked_source(ix, iy)
if ix != last_ix:
if self.in_query_mode:
# show contents labels on all linked charts and update
# with cursor movement
self.contents_labels.update_labels(ix)
for plot, opts in self.graphics.items():
# update the chart's "contents" label
plot.update_contents_labels(ix)
# plot.update_contents_labels(ix)
# move the vertical line to the current "center of bar"
opts['vl'].setX(ix + line_offset)

View File

@ -259,10 +259,10 @@ class LevelLine(pg.InfiniteLine):
detailed control and start end signalling.
"""
chart = self._chart
cursor = self._chart.linked.cursor
# hide y-crosshair
chart._cursor.hide_xhair()
cursor.hide_xhair()
# highlight
self.currentPen = self.hoverPen
@ -308,7 +308,7 @@ class LevelLine(pg.InfiniteLine):
# This is the final position in the drag
if ev.isFinish():
# show y-crosshair again
chart._cursor.show_xhair()
cursor.show_xhair()
def delete(self) -> None:
"""Remove this line from containing chart/view/scene.
@ -326,7 +326,7 @@ class LevelLine(pg.InfiniteLine):
# remove from chart/cursor states
chart = self._chart
cur = chart._cursor
cur = chart.linked.cursor
if self in cur._hovered:
cur._hovered.remove(self)
@ -352,6 +352,21 @@ class LevelLine(pg.InfiniteLine):
return up_to_l1_sc
def marker_right_points(self) -> (float, float, float):
chart = self._chart
l1_len = chart._max_l1_line_len
ryaxis = chart.getAxis('right')
r_axis_x = ryaxis.pos().x()
up_to_l1_sc = r_axis_x - l1_len
size = self._default_mkr_size
marker_right = up_to_l1_sc - (1.375 * 2*size)
line_end = marker_right - (6/16 * size)
return line_end, marker_right, r_axis_x
def paint(
self,
p: QtGui.QPainter,
@ -366,26 +381,14 @@ class LevelLine(pg.InfiniteLine):
# these are in viewbox coords
vb_left, vb_right = self._endPoints
chart = self._chart
l1_len = chart._max_l1_line_len
ryaxis = chart.getAxis('right')
r_axis_x = ryaxis.pos().x()
up_to_l1_sc = r_axis_x - l1_len
vb = self.getViewBox()
size = self._default_mkr_size
marker_right = up_to_l1_sc - (1.375 * 2*size)
line_end = marker_right - (6/16 * size)
line_end, marker_right, r_axis_x = self.marker_right_points()
if self.show_markers and self.markers:
size = self.markers[0][2]
p.setPen(self.pen)
size = qgo_draw_markers(
qgo_draw_markers(
self.markers,
self.pen.color(),
p,
@ -436,11 +439,11 @@ class LevelLine(pg.InfiniteLine):
def add_marker(
self,
path: QtGui.QGraphicsPathItem,
) -> None:
# chart = self._chart
vb = self.getViewBox()
vb.scene().addItem(path)
# add path to scene
self.getViewBox().scene().addItem(path)
self._marker = path
@ -457,8 +460,7 @@ class LevelLine(pg.InfiniteLine):
"""Mouse hover callback.
"""
chart = self._chart
cur = chart._cursor
cur = self._chart.linked.cursor
# hovered
if (not ev.isExit()) and ev.acceptDrags(QtCore.Qt.LeftButton):
@ -648,7 +650,10 @@ def order_line(
# use ``QPathGraphicsItem``s to draw markers in scene coords
# instead of the old way that was doing the same but by
# resetting the graphics item transform intermittently
# XXX: this is our new approach but seems slower?
# line.add_marker(mk_marker(marker_style, marker_size))
assert not line.markers
# the old way which is still somehow faster?
@ -659,7 +664,10 @@ def order_line(
marker_size,
use_qgpath=False,
)
# manually append for later ``.pain()`` drawing
# manually append for later ``InfiniteLine.paint()`` drawing
# XXX: this was manually tested as faster then using the
# QGraphicsItem around a painter path.. probably needs further
# testing to figure out why tf that's true.
line.markers.append((path, 0, marker_size))
orient_v = 'top' if action == 'sell' else 'bottom'
@ -753,10 +761,34 @@ def position_line(
vr = vb.state['viewRange']
ymn, ymx = vr[1]
level = line.value()
path = line._marker
# provide "nav hub" like indicator for where
# the position is on the y-dimension
# print(path._height)
# print(vb.shape())
# print(vb.boundingRect())
# print(vb.height())
_, marker_right, _ = line.marker_right_points()
if level > ymx: # pin to top of view
path.setPos(
QPointF(
marker_right,
2 + path._height,
)
)
elif level < ymn: # pin to bottom of view
path.setPos(
QPointF(
marker_right,
vb.height() - 16 + path._height,
)
)
if level > ymx or level < ymn:
line._marker.hide()
else:
# pp line is viewable so show marker
line._marker.show()
vb.sigYRangeChanged.connect(update_pp_nav)
@ -787,6 +819,11 @@ def position_line(
style = '>|'
arrow_path = mk_marker(style, size=arrow_size)
# monkey-cache height for sizing on pp nav-hub
arrow_path._height = arrow_path.boundingRect().height()
# XXX: uses new marker drawing approach
line.add_marker(arrow_path)
line.set_level(level)

View File

@ -18,447 +18,209 @@
Chart view box primitives
"""
from dataclasses import dataclass, field
from typing import Optional, Dict
from contextlib import asynccontextmanager
import time
from typing import Optional, Callable
import pyqtgraph as pg
from PyQt5.QtCore import QPointF
from pyqtgraph import ViewBox, Point, QtCore, QtGui
from PyQt5.QtCore import Qt, QEvent
from pyqtgraph import ViewBox, Point, QtCore
from pyqtgraph import functions as fn
import numpy as np
import trio
from ..log import get_logger
from ._style import _min_points_to_show, hcolor, _font
from ._graphics._lines import order_line, LevelLine
from ._style import _min_points_to_show
from ._editors import SelectRect
from ._window import main_window
log = get_logger(__name__)
class SelectRect(QtGui.QGraphicsRectItem):
async def handle_viewmode_inputs(
view: 'ChartView',
recv_chan: trio.abc.ReceiveChannel,
def __init__(
self,
viewbox: ViewBox,
color: str = 'dad_blue',
) -> None:
super().__init__(0, 0, 1, 1)
# self.rbScaleBox = QtGui.QGraphicsRectItem(0, 0, 1, 1)
self.vb = viewbox
self._chart: 'ChartPlotWidget' = None # noqa
mode = view.mode
# override selection box color
color = QtGui.QColor(hcolor(color))
self.setPen(fn.mkPen(color, width=1))
color.setAlpha(66)
self.setBrush(fn.mkBrush(color))
self.setZValue(1e9)
self.hide()
self._label = None
# track edge triggered keys
# (https://en.wikipedia.org/wiki/Interrupt#Triggering_methods)
pressed: set[str] = set()
label = self._label = QtGui.QLabel()
label.setTextFormat(0) # markdown
label.setFont(_font.font)
label.setMargin(0)
label.setAlignment(
QtCore.Qt.AlignLeft
# | QtCore.Qt.AlignVCenter
)
last = time.time()
trigger_mode: str
action: str
# proxy is created after containing scene is initialized
self._label_proxy = None
self._abs_top_right = None
# for quick key sequence-combo pattern matching
# we have a min_tap period and these should not
# ever be auto-repeats since we filter those at the
# event filter level prior to the above mem chan.
min_tap = 1/6
fast_key_seq: list[str] = []
fast_taps: dict[str, Callable] = {
'cc': mode.cancel_all_orders,
}
# TODO: "swing %" might be handy here (data's max/min # % change)
self._contents = [
'change: {pchng:.2f} %',
'range: {rng:.2f}',
'bars: {nbars}',
'max: {dmx}',
'min: {dmn}',
# 'time: {nbars}m', # TODO: compute this per bar size
'sigma: {std:.2f}',
]
async for event, etype, key, mods, text in recv_chan:
log.debug(f'key: {key}, mods: {mods}, text: {text}')
now = time.time()
period = now - last
@property
def chart(self) -> 'ChartPlotWidget': # noqa
return self._chart
# reset mods
ctrl: bool = False
shift: bool = False
@chart.setter
def chart(self, chart: 'ChartPlotWidget') -> None: # noqa
self._chart = chart
chart.sigRangeChanged.connect(self.update_on_resize)
palette = self._label.palette()
# press branch
if etype in {QEvent.KeyPress}:
# TODO: get bg color working
palette.setColor(
self._label.backgroundRole(),
# QtGui.QColor(chart.backgroundBrush()),
QtGui.QColor(hcolor('papas_special')),
)
pressed.add(key)
def update_on_resize(self, vr, r):
"""Re-position measure label on view range change.
if (
# clear any old values not part of a "fast" tap sequence:
# presumes the period since last tap is longer then our
# min_tap period
fast_key_seq and period >= min_tap or
"""
if self._abs_top_right:
self._label_proxy.setPos(
self.vb.mapFromView(self._abs_top_right)
)
# don't support more then 2 key sequences for now
len(fast_key_seq) > 2
):
fast_key_seq.clear()
def mouse_drag_released(
self,
p1: QPointF,
p2: QPointF
) -> None:
"""Called on final button release for mouse drag with start and
end positions.
# capture key to fast tap sequence if we either
# have no previous keys or we do and the min_tap period is
# met
if (
not fast_key_seq or
period <= min_tap and fast_key_seq
):
fast_key_seq.append(text)
log.debug(f'fast keys seqs {fast_key_seq}')
"""
self.set_pos(p1, p2)
# mods run through
if mods == Qt.ShiftModifier:
shift = True
def set_pos(
self,
p1: QPointF,
p2: QPointF
) -> None:
"""Set position of selection rect and accompanying label, move
label to match.
if mods == Qt.ControlModifier:
ctrl = True
"""
if self._label_proxy is None:
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
self._label_proxy = self.vb.scene().addWidget(self._label)
# SEARCH MODE #
# ctlr-<space>/<l> for "lookup", "search" -> open search tree
if (
ctrl and key in {
Qt.Key_L,
Qt.Key_Space,
}
):
view._chart._lc.godwidget.search.focus()
start_pos = self.vb.mapToView(p1)
end_pos = self.vb.mapToView(p2)
# esc and ctrl-c
if key == Qt.Key_Escape or (ctrl and key == Qt.Key_C):
# ctrl-c as cancel
# https://forum.qt.io/topic/532/how-to-catch-ctrl-c-on-a-widget/9
view.select_box.clear()
# map to view coords and update area
r = QtCore.QRectF(start_pos, end_pos)
# cancel order or clear graphics
if key == Qt.Key_C or key == Qt.Key_Delete:
# old way; don't need right?
# lr = QtCore.QRectF(p1, p2)
# r = self.vb.childGroup.mapRectFromParent(lr)
mode.cancel_orders_under_cursor()
self.setPos(r.topLeft())
self.resetTransform()
self.scale(r.width(), r.height())
self.show()
# View modes
if key == Qt.Key_R:
y1, y2 = start_pos.y(), end_pos.y()
x1, x2 = start_pos.x(), end_pos.x()
# edge triggered default view activation
view.chart.default_view()
# TODO: heh, could probably use a max-min streamin algo here too
_, xmn = min(y1, y2), min(x1, x2)
ymx, xmx = max(y1, y2), max(x1, x2)
if len(fast_key_seq) > 1:
# begin matches against sequences
func: Callable = fast_taps.get(''.join(fast_key_seq))
if func:
func()
fast_key_seq.clear()
pchng = (y2 - y1) / y1 * 100
rng = abs(y1 - y2)
# release branch
elif etype in {QEvent.KeyRelease}:
ixmn, ixmx = round(xmn), round(xmx)
nbars = ixmx - ixmn + 1
if key in pressed:
pressed.remove(key)
data = self._chart._ohlc[ixmn:ixmx]
# QUERY MODE #
if {Qt.Key_Q}.intersection(pressed):
view.linkedsplits.cursor.in_query_mode = True
if len(data):
std = data['close'].std()
dmx = data['high'].max()
dmn = data['low'].min()
else:
dmn = dmx = std = np.nan
view.linkedsplits.cursor.in_query_mode = False
# update label info
self._label.setText('\n'.join(self._contents).format(
pchng=pchng, rng=rng, nbars=nbars,
std=std, dmx=dmx, dmn=dmn,
))
# SELECTION MODE #
# print(f'x2, y2: {(x2, y2)}')
# print(f'xmn, ymn: {(xmn, ymx)}')
label_anchor = Point(xmx + 2, ymx)
# XXX: in the drag bottom-right -> top-left case we don't
# want the label to overlay the box.
# if (x2, y2) == (xmn, ymx):
# # could do this too but needs to be added after coords transform
# # label_anchor = Point(x2, y2 + self._label.height())
# label_anchor = Point(xmn, ymn)
self._abs_top_right = label_anchor
self._label_proxy.setPos(self.vb.mapFromView(label_anchor))
# self._label.show()
def clear(self):
"""Clear the selection box from view.
"""
self._label.hide()
self.hide()
# global store of order-lines graphics
# keyed by uuid4 strs - used to sync draw
# order lines **after** the order is 100%
# active in emsd
_order_lines: Dict[str, LevelLine] = {}
@dataclass
class LineEditor:
"""The great editor of linez..
"""
view: 'ChartView'
_order_lines: field(default_factory=_order_lines)
chart: 'ChartPlotWidget' = None # type: ignore # noqa
_active_staged_line: LevelLine = None
_stage_line: LevelLine = None
def stage_line(
self,
action: str,
color: str = 'alert_yellow',
hl_on_hover: bool = False,
dotted: bool = False,
# fields settings
size: Optional[int] = None,
) -> LevelLine:
"""Stage a line at the current chart's cursor position
and return it.
"""
# chart.setCursor(QtCore.Qt.PointingHandCursor)
chart = self.chart._cursor.active_plot
cursor = chart._cursor
y = chart._cursor._datum_xy[1]
symbol = chart._lc.symbol
# line = self._stage_line
# if not line:
# add a "staged" cursor-tracking line to view
# and cash it in a a var
if self._active_staged_line:
self.unstage_line()
line = order_line(
chart,
level=y,
level_digits=symbol.digits(),
size=size,
size_digits=symbol.lot_digits(),
# just for the stage line to avoid
# flickering while moving the cursor
# around where it might trigger highlight
# then non-highlight depending on sensitivity
always_show_labels=True,
# kwargs
color=color,
# don't highlight the "staging" line
hl_on_hover=hl_on_hover,
dotted=dotted,
exec_type='dark' if dotted else 'live',
action=action,
show_markers=True,
# prevent flickering of marker while moving/tracking cursor
only_show_markers_on_hover=False,
)
self._active_staged_line = line
# hide crosshair y-line and label
cursor.hide_xhair()
# add line to cursor trackers
cursor._trackers.add(line)
return line
def unstage_line(self) -> LevelLine:
"""Inverse of ``.stage_line()``.
"""
# chart = self.chart._cursor.active_plot
# # chart.setCursor(QtCore.Qt.ArrowCursor)
cursor = self.chart._cursor
# delete "staged" cursor tracking line from view
line = self._active_staged_line
if line:
cursor._trackers.remove(line)
line.delete()
self._active_staged_line = None
# show the crosshair y line and label
cursor.show_xhair()
def create_order_line(
self,
uuid: str,
level: float,
chart: 'ChartPlotWidget', # noqa
size: float,
action: str,
) -> LevelLine:
line = self._active_staged_line
if not line:
raise RuntimeError("No line is currently staged!?")
sym = chart._lc.symbol
line = order_line(
chart,
# label fields default values
level=level,
level_digits=sym.digits(),
size=size,
size_digits=sym.lot_digits(),
# LevelLine kwargs
color=line.color,
dotted=line._dotted,
show_markers=True,
only_show_markers_on_hover=True,
action=action,
)
# for now, until submission reponse arrives
line.hide_labels()
# register for later lookup/deletion
self._order_lines[uuid] = line
return line
def commit_line(self, uuid: str) -> LevelLine:
"""Commit a "staged line" to view.
Submits the line graphic under the cursor as a (new) permanent
graphic in view.
"""
try:
line = self._order_lines[uuid]
except KeyError:
log.warning(f'No line for {uuid} could be found?')
return
if shift:
if view.state['mouseMode'] == ViewBox.PanMode:
view.setMouseMode(ViewBox.RectMode)
else:
assert line.oid == uuid
line.show_labels()
view.setMouseMode(ViewBox.PanMode)
# TODO: other flashy things to indicate the order is active
# ORDER MODE #
# live vs. dark trigger + an action {buy, sell, alert}
log.debug(f'Level active for level: {line.value()}')
order_keys_pressed = {
Qt.Key_A,
Qt.Key_F,
Qt.Key_D
}.intersection(pressed)
return line
if order_keys_pressed:
if (
# 's' for "submit" to activate "live" order
Qt.Key_S in pressed or
ctrl
):
trigger_mode: str = 'live'
def lines_under_cursor(self):
"""Get the line(s) under the cursor position.
else:
trigger_mode: str = 'dark'
"""
# Delete any hoverable under the cursor
return self.chart._cursor._hovered
# order mode trigger "actions"
if Qt.Key_D in pressed: # for "damp eet"
action = 'sell'
def remove_line(
self,
line: LevelLine = None,
uuid: str = None,
) -> LevelLine:
"""Remove a line by refernce or uuid.
elif Qt.Key_F in pressed: # for "fillz eet"
action = 'buy'
If no lines or ids are provided remove all lines under the
cursor position.
elif Qt.Key_A in pressed:
action = 'alert'
trigger_mode = 'live'
"""
if line:
uuid = line.oid
view.order_mode = True
# try to look up line from our registry
line = self._order_lines.pop(uuid, None)
if line:
# XXX: order matters here for line style!
view.mode._exec_mode = trigger_mode
view.mode.set_exec(action)
# if hovered remove from cursor set
hovered = self.chart._cursor._hovered
if line in hovered:
hovered.remove(line)
prefix = trigger_mode + '-' if action != 'alert' else ''
view._chart.window().mode_label.setText(
f'mode: {prefix}{action}')
# make sure the xhair doesn't get left off
# just because we never got a un-hover event
self.chart._cursor.show_xhair()
else: # none active
# if none are pressed, remove "staged" level
# line under cursor position
view.mode.lines.unstage_line()
line.delete()
return line
if view.hasFocus():
# update mode label
view._chart.window().mode_label.setText('mode: view')
view.order_mode = False
@dataclass
class ArrowEditor:
chart: 'ChartPlotWidget' # noqa
_arrows: field(default_factory=dict)
def add(
self,
uid: str,
x: float,
y: float,
color='default',
pointing: Optional[str] = None,
) -> pg.ArrowItem:
"""Add an arrow graphic to view at given (x, y).
"""
angle = {
'up': 90,
'down': -90,
None: 180, # pointing to right (as in an alert)
}[pointing]
# scale arrow sizing to dpi-aware font
size = _font.font.pixelSize() * 0.8
arrow = pg.ArrowItem(
angle=angle,
baseAngle=0,
headLen=size,
headWidth=size/2,
tailLen=None,
pxMode=True,
# coloring
pen=pg.mkPen(hcolor('papas_special')),
brush=pg.mkBrush(hcolor(color)),
)
arrow.setPos(x, y)
self._arrows[uid] = arrow
# render to view
self.chart.plotItem.addItem(arrow)
return arrow
def remove(self, arrow) -> bool:
self.chart.plotItem.removeItem(arrow)
last = time.time()
class ChartView(ViewBox):
"""Price chart view box with interaction behaviors you'd expect from
'''
Price chart view box with interaction behaviors you'd expect from
any interactive platform:
- zoom on mouse scroll that auto fits y-axis
@ -466,31 +228,48 @@ class ChartView(ViewBox):
- zoom on x to most recent in view datum
- zoom on right-click-n-drag to cursor position
"""
'''
mode_name: str = 'mode: view'
def __init__(
self,
name: str,
parent: pg.PlotItem = None,
**kwargs,
):
super().__init__(parent=parent, **kwargs)
# disable vertical scrolling
self.setMouseEnabled(x=True, y=False)
self.linked_charts = None
self.select_box = SelectRect(self)
self.addItem(self.select_box, ignoreBounds=True)
self.linkedsplits = None
self._chart: 'ChartPlotWidget' = None # noqa
self.mode = None
# add our selection box annotator
self.select_box = SelectRect(self)
self.addItem(self.select_box, ignoreBounds=True)
# kb ctrls processing
self._key_buffer = []
self._key_active: bool = False
self.name = name
self.mode = None
self.order_mode: bool = False
self.setFocusPolicy(QtCore.Qt.StrongFocus)
@asynccontextmanager
async def open_async_input_handler(
self,
) -> 'ChartView':
from . import _event
async with _event.open_handler(
self,
event_types={QEvent.KeyPress, QEvent.KeyRelease},
async_handler=handle_viewmode_inputs,
):
yield self
@property
def chart(self) -> 'ChartPlotWidget': # type: ignore # noqa
return self._chart
@ -501,21 +280,21 @@ class ChartView(ViewBox):
self.select_box.chart = chart
def wheelEvent(self, ev, axis=None):
"""Override "center-point" location for scrolling.
'''Override "center-point" location for scrolling.
This is an override of the ``ViewBox`` method simply changing
the center of the zoom to be the y-axis.
TODO: PR a method into ``pyqtgraph`` to make this configurable
"""
'''
if axis in (0, 1):
mask = [False, False]
mask[axis] = self.state['mouseEnabled'][axis]
else:
mask = self.state['mouseEnabled'][:]
chart = self.linked_charts.chart
chart = self.linkedsplits.chart
# don't zoom more then the min points setting
l, lbar, rbar, r = chart.bars_range()
@ -525,7 +304,7 @@ class ChartView(ViewBox):
log.debug("Max zoom bruh...")
return
if ev.delta() < 0 and vl >= len(chart._ohlc) + 666:
if ev.delta() < 0 and vl >= len(chart._arrays['ohlc']) + 666:
log.debug("Min zoom bruh...")
return
@ -573,7 +352,6 @@ class ChartView(ViewBox):
end_of_l1,
key=lambda p: p.x()
)
# breakpoint()
# focal = pg.Point(last_bar.x() + end_of_l1)
self._resetTarget()
@ -693,131 +471,16 @@ class ChartView(ViewBox):
elif button == QtCore.Qt.LeftButton:
# when in order mode, submit execution
if self._key_active:
if self.order_mode:
ev.accept()
self.mode.submit_exec()
def keyReleaseEvent(self, ev: QtCore.QEvent):
"""
Key release to normally to trigger release of input mode
def keyReleaseEvent(self, event: QtCore.QEvent) -> None:
'''This routine is rerouted to an async handler.
'''
pass
"""
# TODO: is there a global setting for this?
if ev.isAutoRepeat():
ev.ignore()
return
ev.accept()
# text = ev.text()
key = ev.key()
mods = ev.modifiers()
if key == QtCore.Qt.Key_Shift:
# if self.state['mouseMode'] == ViewBox.RectMode:
self.setMouseMode(ViewBox.PanMode)
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
# if self.state['mouseMode'] == ViewBox.RectMode:
# if key == QtCore.Qt.Key_Space:
if mods == QtCore.Qt.ControlModifier or key == QtCore.Qt.Key_Control:
self.mode._exec_mode = 'dark'
if key in {QtCore.Qt.Key_A, QtCore.Qt.Key_F, QtCore.Qt.Key_D}:
# remove "staged" level line under cursor position
self.mode.lines.unstage_line()
self._key_active = False
def keyPressEvent(self, ev: QtCore.QEvent) -> None:
"""
This routine should capture key presses in the current view box.
"""
# TODO: is there a global setting for this?
if ev.isAutoRepeat():
ev.ignore()
return
ev.accept()
text = ev.text()
key = ev.key()
mods = ev.modifiers()
print(f'text: {text}, key: {key}')
if mods == QtCore.Qt.ShiftModifier:
if self.state['mouseMode'] == ViewBox.PanMode:
self.setMouseMode(ViewBox.RectMode)
# ctrl
ctrl = False
if mods == QtCore.Qt.ControlModifier:
ctrl = True
self.mode._exec_mode = 'live'
self._key_active = True
# ctrl + alt
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
# ctlr-<space>/<l> for "lookup", "search" -> open search tree
if ctrl and key in {
QtCore.Qt.Key_L,
QtCore.Qt.Key_Space,
}:
search = self._chart._lc.chart_space.search
search.focus()
# esc
if key == QtCore.Qt.Key_Escape or (ctrl and key == QtCore.Qt.Key_C):
# ctrl-c as cancel
# https://forum.qt.io/topic/532/how-to-catch-ctrl-c-on-a-widget/9
self.select_box.clear()
# cancel order or clear graphics
if key == QtCore.Qt.Key_C or key == QtCore.Qt.Key_Delete:
# delete any lines under the cursor
mode = self.mode
for line in mode.lines.lines_under_cursor():
mode.book.cancel(uuid=line.oid)
self._key_buffer.append(text)
# View modes
if key == QtCore.Qt.Key_R:
self.chart.default_view()
# Order modes: stage orders at the current cursor level
elif key == QtCore.Qt.Key_D: # for "damp eet"
self.mode.set_exec('sell')
elif key == QtCore.Qt.Key_F: # for "fillz eet"
self.mode.set_exec('buy')
elif key == QtCore.Qt.Key_A:
self.mode.set_exec('alert')
# XXX: Leaving this for light reference purposes, there
# seems to be some work to at least gawk at for history mgmt.
# Key presses are used only when mouse mode is RectMode
# The following events are implemented:
# ctrl-A : zooms out to the default "full" view of the plot
# ctrl-+ : moves forward in the zooming stack (if it exists)
# ctrl-- : moves backward in the zooming stack (if it exists)
# self.scaleHistory(-1)
# elif ev.text() in ['+', '=']:
# self.scaleHistory(1)
# elif ev.key() == QtCore.Qt.Key_Backspace:
# self.scaleHistory(len(self.axHistory))
else:
# maybe propagate to parent widget
ev.ignore()
self._key_active = False
def keyPressEvent(self, event: QtCore.QEvent) -> None:
'''This routine is rerouted to an async handler.
'''
pass

View File

@ -89,11 +89,16 @@ def right_axis(
class Label:
"""
A plain ol' "scene label" using an underlying ``QGraphicsTextItem``.
After hacking for many days on multiple "label" systems inside
``pyqtgraph`` yet again we're left writing our own since it seems
all of those are over complicated, ad-hoc, pieces of garbage that
can't accomplish the simplest things, such as pinning to the left
hand side of a view box.
all of those are over complicated, ad-hoc, transform-mangling,
messes which can't accomplish the simplest things via their inputs
(such as pinning to the left hand side of a view box).
Here we do the simple thing where the label uses callables to figure
out the (x, y) coordinate "pin point": nice and simple.
This type is another effort (see our graphics) to start making
small, re-usable label components that can actually be used to build
@ -104,6 +109,7 @@ class Label:
self,
view: pg.ViewBox,
fmt_str: str,
color: str = 'bracket',
x_offset: float = 0,

View File

@ -447,7 +447,7 @@ class SearchBar(QtWidgets.QLineEdit):
self.view: CompleterView = view
self.dpi_font = font
self.chart_app = parent_chart
self.godwidget = parent_chart
# size it as we specify
# https://doc.qt.io/qt-5/qsizepolicy.html#Policy-enum
@ -496,12 +496,12 @@ class SearchWidget(QtGui.QWidget):
def __init__(
self,
chart_space: 'ChartSpace', # type: ignore # noqa
godwidget: 'GodWidget', # type: ignore # noqa
columns: List[str] = ['src', 'symbol'],
parent=None,
) -> None:
super().__init__(parent or chart_space)
super().__init__(parent or godwidget)
# size it as we specify
self.setSizePolicy(
@ -509,7 +509,7 @@ class SearchWidget(QtGui.QWidget):
QtWidgets.QSizePolicy.Fixed,
)
self.chart_app = chart_space
self.godwidget = godwidget
self.vbox = QtGui.QVBoxLayout(self)
self.vbox.setContentsMargins(0, 0, 0, 0)
@ -540,7 +540,7 @@ class SearchWidget(QtGui.QWidget):
)
self.bar = SearchBar(
parent=self,
parent_chart=chart_space,
parent_chart=godwidget,
view=self.view,
)
self.bar_hbox.addWidget(self.bar)
@ -557,7 +557,7 @@ class SearchWidget(QtGui.QWidget):
# fill cache list if nothing existing
self.view.set_section_entries(
'cache',
list(reversed(self.chart_app._chart_cache)),
list(reversed(self.godwidget._chart_cache)),
clear_all=True,
)
@ -611,7 +611,7 @@ class SearchWidget(QtGui.QWidget):
return None
provider, symbol = value
chart = self.chart_app
chart = self.godwidget
log.info(f'Requesting symbol: {symbol}.{provider}')
@ -632,7 +632,7 @@ class SearchWidget(QtGui.QWidget):
# Re-order the symbol cache on the chart to display in
# LIFO order. this is normally only done internally by
# the chart on new symbols being loaded into memory
chart.set_chart_symbol(fqsn, chart.linkedcharts)
chart.set_chart_symbol(fqsn, chart.linkedsplits)
self.view.set_section_entries(
'cache',
@ -650,6 +650,7 @@ _search_enabled: bool = False
async def pack_matches(
view: CompleterView,
has_results: dict[str, set[str]],
matches: dict[(str, str), [str]],
@ -823,7 +824,7 @@ async def fill_results(
async def handle_keyboard_input(
search: SearchWidget,
searchbar: SearchBar,
recv_chan: trio.abc.ReceiveChannel,
) -> None:
@ -831,8 +832,9 @@ async def handle_keyboard_input(
global _search_active, _search_enabled
# startup
chart = search.chart_app
bar = search.bar
bar = searchbar
search = searchbar.parent()
chart = search.godwidget
view = bar.view
view.set_font_size(bar.dpi_font.px_size)
@ -851,7 +853,7 @@ async def handle_keyboard_input(
)
)
async for event, key, mods, txt in recv_chan:
async for event, etype, key, mods, txt in recv_chan:
log.debug(f'key: {key}, mods: {mods}, txt: {txt}')
@ -889,7 +891,7 @@ async def handle_keyboard_input(
# kill the search and focus back on main chart
if chart:
chart.linkedcharts.focus()
chart.linkedsplits.focus()
continue

View File

@ -21,7 +21,8 @@ Qt main window singletons and stuff.
import os
import signal
import time
from typing import Callable
from typing import Callable, Optional, Union
import uuid
from pyqtgraph import QtGui
from PyQt5 import QtCore
@ -42,25 +43,101 @@ class MultiStatus:
def __init__(self, bar, statuses) -> None:
self.bar = bar
self.statuses = statuses
self._to_clear: set = set()
self._status_groups: dict[str, (set, Callable)] = {}
def open_status(
self,
msg: str,
) -> Callable[..., None]:
final_msg: Optional[str] = None,
clear_on_next: bool = False,
group_key: Optional[Union[bool, str]] = False,
) -> Union[Callable[..., None], str]:
'''Add a status to the status bar and return a close callback which
when called will remove the status ``msg``.
'''
for old_msg in self._to_clear:
try:
self.statuses.remove(old_msg)
except ValueError:
pass
self.statuses.append(msg)
def remove_msg() -> None:
try:
self.statuses.remove(msg)
self.render()
except ValueError:
pass
self.render()
return remove_msg
if final_msg is not None:
self.statuses.append(final_msg)
self.render()
self._to_clear.add(final_msg)
ret = remove_msg
# create a "status group" such that new `.open_status()`
# calls can be made passing in the returned group key.
# once all clear callbacks have been called from all statuses
# in the group the final status msg to be removed will be the one
# the one provided when `group_key=True`, this way you can
# create a long living status that completes once all
# sub-statuses have finished.
if group_key is True:
if clear_on_next:
ValueError("Can't create group status and clear it on next?")
# generate a key for a new "status group"
new_group_key = str(uuid.uuid4())
def pop_group_and_clear():
subs, final_clear = self._status_groups.pop(new_group_key)
assert not subs
return remove_msg()
self._status_groups[new_group_key] = (set(), pop_group_and_clear)
ret = new_group_key
elif group_key:
def pop_from_group_and_maybe_clear_group():
# remove the message for this sub-status
remove_msg()
# check to see if all other substatuses have cleared
group_tup = self._status_groups.get(group_key)
if group_tup:
subs, group_clear = group_tup
try:
subs.remove(msg)
except KeyError:
raise KeyError(f'no msg {msg} for group {group_key}!?')
if not subs:
group_clear()
self._status_groups[group_key][0].add(msg)
ret = pop_from_group_and_maybe_clear_group
self.render()
if clear_on_next:
self._to_clear.add(msg)
return ret
def render(self) -> None:
'''Display all open statuses to bar.
'''
if self.statuses:
self.bar.showMessage(f'{" ".join(self.statuses)}')
else:

View File

@ -26,11 +26,12 @@ from typing import Optional, Dict, Callable, Any
import uuid
import pyqtgraph as pg
import trio
from pydantic import BaseModel
import trio
from ._graphics._lines import LevelLine, position_line
from ._interaction import LineEditor, ArrowEditor, _order_lines
from ._editors import LineEditor, ArrowEditor
from ._window import MultiStatus, main_window
from ..clearing._client import open_ems, OrderBook
from ..data._source import Symbol
from ..log import get_logger
@ -48,18 +49,31 @@ class Position(BaseModel):
@dataclass
class OrderMode:
"""Major mode for placing orders on a chart view.
'''Major mode for placing orders on a chart view.
This is the default mode that pairs with "follow mode"
(when wathing the rt price update at the current time step)
and allows entering orders using the ``a, d, f`` keys and
cancelling moused-over orders with the ``c`` key.
and allows entering orders using mouse and keyboard.
This object is chart oriented, so there is an instance per
chart / view currently.
"""
Current manual:
a -> alert
s/ctrl -> submission type modifier {on: live, off: dark}
f (fill) -> buy limit order
d (dump) -> sell limit order
c (cancel) -> cancel order under cursor
cc -> cancel all submitted orders on chart
mouse click and drag -> modify current order under cursor
'''
chart: 'ChartPlotWidget' # type: ignore # noqa
book: OrderBook
lines: LineEditor
arrows: ArrowEditor
status_bar: MultiStatus
name: str = 'order'
_colors = {
'alert': 'alert_yellow',
'buy': 'buy_green',
@ -71,7 +85,8 @@ class OrderMode:
_position: Dict[str, Any] = field(default_factory=dict)
_position_line: dict = None
key_map: Dict[str, Callable] = field(default_factory=dict)
_pending_submissions: dict[str, (LevelLine, Callable)] = field(
default_factory=dict)
def on_position_update(
self,
@ -108,12 +123,18 @@ class OrderMode:
"""Set execution mode.
"""
# not initialized yet
if not self.chart.linked.cursor:
return
self._action = action
self.lines.stage_line(
color=self._colors[action],
# hl_on_hover=True if self._exec_mode == 'live' else False,
dotted=True if self._exec_mode == 'dark' else False,
dotted=True if (
self._exec_mode == 'dark' and action != 'alert'
) else False,
size=size or self._size,
action=action,
)
@ -127,6 +148,13 @@ class OrderMode:
"""
line = self.lines.commit_line(uuid)
pending = self._pending_submissions.get(uuid)
if pending:
order_line, func = pending
assert order_line is line
func()
return line
def on_fill(
@ -182,8 +210,12 @@ class OrderMode:
if msg is not None:
self.lines.remove_line(uuid=uuid)
self.chart._cursor.show_xhair()
self.chart.linked.cursor.show_xhair()
pending = self._pending_submissions.pop(uuid, None)
if pending:
order_line, func = pending
func()
else:
log.warning(
f'Received cancel for unsubmitted order {pformat(msg)}'
@ -206,8 +238,9 @@ class OrderMode:
size = size or self._size
chart = self.chart._cursor.active_plot
y = chart._cursor._datum_xy[1]
cursor = self.chart.linked.cursor
chart = cursor.active_plot
y = cursor._datum_xy[1]
symbol = self.chart._lc._symbol
@ -238,17 +271,70 @@ class OrderMode:
)
line.oid = uid
# enter submission which will be popped once a response
# from the EMS is received to move the order to a different# status
self._pending_submissions[uid] = (
line,
self.status_bar.open_status(
f'submitting {self._exec_mode}-{action}',
final_msg=f'submitted {self._exec_mode}-{action}',
clear_on_next=True,
)
)
# hook up mouse drag handlers
line._on_drag_start = self.order_line_modify_start
line._on_drag_end = self.order_line_modify_complete
return line
def cancel_order_under_cursor(self) -> None:
for line in self.lines.lines_under_cursor():
self.book.cancel(uuid=line.oid)
def cancel_orders_under_cursor(self) -> list[str]:
return self.cancel_orders_from_lines(
self.lines.lines_under_cursor()
)
def cancel_all_orders(self) -> list[str]:
'''Cancel all orders for the current chart.
'''
return self.cancel_orders_from_lines(
self.lines.all_lines()
)
def cancel_orders_from_lines(
self,
lines: list[LevelLine],
) -> list[str]:
ids: list = []
if lines:
key = self.status_bar.open_status(
f'cancelling {len(lines)} orders',
final_msg=f'cancelled {len(lines)} orders',
group_key=True
)
# cancel all active orders and triggers
for line in lines:
oid = getattr(line, 'oid', None)
if oid:
self._pending_submissions[oid] = (
line,
self.status_bar.open_status(
f'cancelling order {oid[:6]}',
group_key=key,
),
)
ids.append(oid)
self.book.cancel(uuid=oid)
return ids
# order-line modify handlers
def order_line_modify_start(
self,
line: LevelLine,
@ -274,13 +360,14 @@ async def open_order_mode(
chart: pg.PlotWidget,
book: OrderBook,
):
status_bar: MultiStatus = main_window().status_bar
view = chart._vb
lines = LineEditor(view=view, chart=chart, _order_lines=_order_lines)
lines = LineEditor(chart=chart)
arrows = ArrowEditor(chart, {})
log.info("Opening order mode")
mode = OrderMode(chart, book, lines, arrows)
mode = OrderMode(chart, book, lines, arrows, status_bar)
view.mode = mode
asset_type = symbol.type_key
@ -306,10 +393,13 @@ async def open_order_mode(
async def start_order_mode(
chart: 'ChartPlotWidget', # noqa
symbol: Symbol,
brokername: str,
started: trio.Event,
) -> None:
'''Activate chart-trader order mode loop:
- connect to emsd
@ -317,12 +407,16 @@ async def start_order_mode(
- begin order handling loop
'''
done = chart.window().status_bar.open_status('Starting order mode...')
done = chart.window().status_bar.open_status('starting order mode..')
# spawn EMS actor-service
async with (
open_ems(brokername, symbol) as (book, trades_stream, positions),
open_order_mode(symbol, chart, book) as order_mode
open_order_mode(symbol, chart, book) as order_mode,
# # start async input handling for chart's view
# # await godwidget._task_stack.enter_async_context(
# chart._vb.open_async_input_handler(),
):
# update any exising positions
@ -345,6 +439,13 @@ async def start_order_mode(
# Begin order-response streaming
done()
# start async input handling for chart's view
async with chart._vb.open_async_input_handler():
# signal to top level symbol loading task we're ready
# to handle input since the ems connection is ready
started.set()
# this is where we receive **back** messages
# about executions **from** the EMS actor
async for msg in trades_stream: