This is a first attempt at a financial signal processing subsystem which
utilizes async generators for streaming frames of numpy array data
between actors. In this initial attempt the focus is on processing price
data and relaying it to the chart app for real-time display. So far this
seems to work (with decent latency) but much more work is likely needed
around improving the data model for even better latency and less data
duplication.
Surprisingly (or not?) a lot of simplifications to the charting code
came out of this in terms of conducting graphics updates in streaming
tasks instead of hiding them inside the obfuscated mess that is the
Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be
to enforce strict semantics around reading and writing of data such that
state is kept outside "object trees" as much as possible and streaming
function semantics guide our flow model. Unsurprisingly, this reduction
in "instance state" is happening wherever we use `trio` ;)
A little summary on the technical changes:
- not going to explain the fsp system yet; it's too nascent and
probably going to get some heavy editing.
- drop any "update" methods from the `LinkedCharts` type since each
sub-chart will have it's own update task and thus a separate update
loop; further individual graphics (per chart) may eventually require
this same design.
- delete `ChartView`; moved into separate mod.
- add "stream from fsp" task to start our foray into real-time actor
processed numpy streaming.
Wait for a first actual real-time quote before starting graphics update
tasks. Use the new normalized tick format brokers are expected to emit
as a `quotes['ticks']` list. Auto detect time frame from historical
bars.
Add `ChartPlotWidget.add_plot()` to add sub charts for indicators which
can be updated independently. Clean up rt bar update code and drop some
legacy ohlc loading cruft.
Stop with all this "main chart" special treatment.
Manage all lines in the same way across all referenced plots.
Add `CrossHair.add_plot()` for adding new plots dynamically.
Just, smh.
There's really nothing coupling it to the graphics class (which frankly
also seems like it doesn't need to be a class.. Qt).
Add support to `.update_from_array()` for diffing with the input array
and creating additional bar-lines where necessary. Note, there are still
issues with the "correctness" here in terms of bucketing open/close
values in the time frame / bar range. Also, this jamming of each bar's 3
lines into a homogeneous array seems like it could be better done with
struct arrays and avoid all this "index + 3" stuff.
Flat bars have a rendering issue we work around by hacking values in `QLineF`
but we have to revert those on any last bar that is being updated in
real-time. Comment out candle implementations for now; we can get back
to it if/when the tinas unite. Oh, and make bars have a little space
between them.
Don't allow zooming to less then a min number of data points. Allow
panning "outside" the data set (i.e. moving one of the sequence "ends"
to the middle of the view. Start adding logging.
For whatever reason if the `QLineF` high/low values are the same a weird
little rectangle is drawn (my guess is a `float` precision error of some
sort). Instead, if they're the same just use one of the values.
Also, store local vars to avoid so many lookups.
`pg.PlotCurveItem.setData()` is normally used for real-time updates to
curves and takes in a whole new array of data to graphics.
It makes sense to stick with this interface especially if
the current datum graphic will originally be drawn from tick quotes and
later filled in when bars data is available (eg. IB has this option in
TWS charts for volume). Additionally, having a data feed api where the push
process/task can write to shared memory and the UI task(s) can read from
that space is ideal. It allows for indicator and algo calculations to be
run in parallel (via actors) with initial price draw instructions
such that plotting of downstream metrics can be "pipelined" into the
chart UI's render loop. This essentially makes the chart UI async
programmable from multiple remote processes (or at least that's the
goal).
Some details:
- Only store a single ref to the source array data on the
`LinkedSplitCharts`. There should only be one reference since the main
relation is **that** x-time aligned sequence.
- Add `LinkedSplitCharts.update_from_quote()` which takes in a quote
dict and updates the OHLC array from it's contents.
- Add `ChartPlotWidget.update_from_array()` method to trigger graphics
updates per chart with consideration for overlay curves.
This makes a OHLC graphics "sequence" update very similar (actually API
compatible) with `pg.PlotCurveItem.setData()`. The difference here is
that only latest OHLC datum is used to update the charts last bar.
This was a mess before with a weird loop using the parent split charts
to update all "indicators". Instead just have each plot do its own
yrange updates since the signals are being handled just fine per plot.
Handle both the OHLC and plane line chart cases with a hacky `try:,
except IndexError:` for now.
Oh, and move the main entry point for the chart app to the relevant
module. I added some WIP bar update code for the moment.
Speed up the lines array creation using proper slice assignment.
This gives another 10% speedup to the historical price rendering.
Drop ``_tina_mode`` support for now since we're not testing it.
Previously graphics were loaded and rendered implicitly during the
import and creation of certain objects. Remove all this and instead
expect client code to pass in the OHLC sequence to plot. Speed up
the bars graphics rendering by simplifying to a single iteration of
the input array; gives about a 2x speedup.
Move chart resize code into our ``ViewBox`` subtype (a ``ChartView``)
in an effort to start organizing interaction behaviour closer to the
appropriate underlying objects. Add some docs for all this and do some
renaming.
Modify the default ``ViewBox`` scroll to zoom behaviour such that
whatever right-most point is visible is used as the "center" for
zooming. Add a "traditional" cross-hair cursor.
- Move out equity plotting to new module.
- Make axis margins and fonts look good on i3.
- Adjust axis labels colors to gray.
- Start commenting a lot of the code after figuring out what it all does
when cross referencing with ``pyqtgraph``.
- Add option to move date axis to middle.
Hand select necessary components to get real-time charting with
`pyqtgraph` from the `Quantdom` projects:
https://github.com/constverum/Quantdom
We've offered to collaborate with the author but have received no
response and the project has not been updated in over a year.
Given this, we are moving forward with taking the required components to
make further improvements upon especially since the `pyqtgraph` project
is now being actively maintained again.
If the author comes back we will be more then happy to contribute
modified components upstream:
https://github.com/constverum/Quantdom/issues/18
Relates to #80
Since the new FSP system will require time aligned data amongst actors,
it makes sense to share broker data feeds as much as possible on a local
system. There doesn't seem to be downside to this approach either since
if not fanning-out in our code, the broker (server) has to do it anyway
(and who knows how junk their implementation is) though with more
clients, sockets etc. in memory on our end. It also preps the code for
introducing a more "serious" pub-sub systems like zeromq/nanomessage.
Start a draft normalization format for (sampled) tick data.
Ideally we move toward the dense tick format (DFT) enforced by
techtonicDB, but for now let's just get a dict of something simple
going: `{'type': 'trade', 'price': <price}` kind of thing. This
gets us started being able to real-time chart from all data feed
back-ends. Oh, and hack in support for XAUUSD..and get subactor
logging workin.
Add a `Client.find_contract()` which internally takes
a <symbol>.<exchange> str as input and uses `IB.qualifyContractsAsync()`
internally to try and validate the most likely contract. Make the module
script call this using `asyncio.run()` for console testing.
Infected `asyncio` support is being added to `tractor` in
goodboy/tractor#121 so delegate to all that new machinery.
Start building out an "actor-aware" api which takes care of all the
`trio`-`asyncio` interaction for data streaming and request handling.
Add a little (shudder) method proxy system which can be used to invoke
client methods from another actor. Start on a streaming api in
preparation for real-time charting.
Start working towards meeting the backend client api.
Infect `asyncio` using `trio`'s new guest mode and demonstrate
real-time ticker streaming to console.
Since the new FSP system will require time aligned data amongst actors,
it makes sense to share broker data feeds as much as possible on a local
system. There doesn't seem to be downside to this approach either since
if not fanning-out in our code, the broker (server) has to do it anyway
(and who knows how junk their implementation is) though with more
clients, sockets etc. in memory on our end. It also preps the code for
introducing a more "serious" pub-sub systems like zeromq/nanomessage.
Wrap the sync client in an async interface in anticipation of an actual
async client. This starts support for the `open_fee()`/`stream_quotes()`
api though the tick normalization isn't correct yet.
This is something I've been meaning to try for a while and will likely
make writing tick data to a db more straight forward (filling in NaN
values is more matter of fact) plus it should minimize bandwidth usage.
Note, it'll require stream consumers to be considerate of non-full
quotes arriving and thus using the first "full" quote message to fill
out dynamically formatted systems or displays.
For easy testing of questrade historical data from cli.
Re-org the common cli components into a new package to avoid having all
commands defined in a top-level module.
There's some expected limitations with the number of sticks allowed in
a single query (they say 2k but I've been able to pull 20k). Also note
without a paid data sub there's a 15m delay on 1m sticks (we'll hack
around that shortly, don't worry).
Gets us better throughput when polling multiple endpoints (eg. option
and stock quotes simultaneously) since slower round trip request won't
block faster ones when using multiple connections.
This required some copy-paste of code from @matham's branch:
https://github.com/kivy/kivy/pull/5241
namely, the stuff in the `utils_async.py` module. I've added all that as
a standalone file for now.
Update the pipfile to use `kivy`'s master branch (since there seems to
be some lingering cython issues in the current release wheels).
- stop displaying search bar widget on <ctrl-c>
- if there's existing search bar content highlight it automatically
to allow user to start typing new content right away
- when activated allow search bar to insert its own set of keybinding
controls; restore prior bindings on exit
Look up the broker module and set up the loglevel locally.
Ask the arbiter for a portal to the data daemon since we can't
pass one to a subactor by reference.
Fixes to `tractor` that resolve issues with async generators being
non-task safe make the need for the mutex lock in
`DataFeed.open_stream()` unnecessary. Also, don't bother pushing empty
quotes from the publisher; avoids hitting the network when possible.
Questrade's API is half baked and can't handle concurrency.
It allows multiple concurrent requests to most endpoints *except*
for the auth endpoint used to refresh tokens:
https://www.questrade.com/api/documentation/security
I've gone through extensive dialogue with their API team and despite
making what I think are very good arguments for doing the request
serialization on the server side, they decided that I should instead
do the "locking" on the client side. Frankly it doesn't seem like they
have that competent an engineering department as it took me a long time
to explain the issue even though it's rather trivial and probably not
that hard to fix; maybe it's better this way.
This adds a few things to ensure more reliable token refreshes on
expiry:
- add a `@refresh_token_on_err` decorator which can be used on `_API`
methods that should refresh tokens on failure
- decorate most endpoints with this *except* for the auth ep
- add locking logic for the troublesome scenario as follows:
* every time a request is sent out set a "request in progress" event
variable that can be used to determine when no requests are currently
outstanding
* every time the auth end point is hit in order to refresh tokens set
an event that locks out other tasks from making requests
* only allow hitting the auth endpoint when there are no "requests in
progress" using the first event
* mutex all auth endpoint requests; there can only be one outstanding
- don't hit the accounts endpoint at client startup; we want to
eventually support keys from multiple accounts and you can disable
account info per key and just share the market data function
Adjust feed locking around internal manager `yields` to make this work.
Also, change quote publisher to deliver a list of quotes for each
retrieved batch. This was actually broken for option streaming since
each quote was being overwritten due to a common `key` value for all
expiries. Asjust the `packetizer` function accordingly to work for
both options and stocks.
The pub-sub data feed system was factored into `tractor` as an
experimental api / subsystem. Move to using that which greatly
simplifies the data feed architecture.
Start working toward a more general (on-demand) pub-sub system which
can be brought into ``tractor``. Right now this just means making
the code in the `fan_out_to_ctxs()` less specific but, eventually
I think this function should be coupled with a decorator and shipped
as a standard "message pattern".
Additionally,
- try out making `BrokerFeed` a `@dataclass`
- strip out all the `trio.Event` / uneeded nursery / extra task crap
from `start_quote_stream()`
This allows for using a monitor to select the current option chain
symbol!
The deats:
- start a bg task which streams the monitor selected symbol
- dynamically repopulate expiry buttons on a newly published symbol
- move static widget creation into a chain method to avoid multiple
quotes requests at startup
- rename a bunch of methods
If quotes are pushed using the adjusted contract symbol (i.e. with
trailing '-1' suffix) the subscriber won't receive them under the
normal symbol. The logic was wrong for determining whether to add
a suffix (was failing for any symbol with an exchange suffix)
which was causing normal data feed subscriptions to fail to match
in every case.
I did some testing of the `optionsIds` parameter to the option quote
endpoint and found that it limits you to 100 symbols so it's not
practical for real-time "all-strike"" chain updating; we have to stick
to filters for now. The only real downside of this is that it seems
multiple filters across multiple symbols is quite latent. I need to
toy with it more to be sure it's not something slow on the client side.
Oh, and store option contract to ids in a `dict` for now as we may want
to try the `optionsIds` thing again down the road as I coordinate with
the QT tech team.
This is an optimization to improve performance when the UI is fed real
time data. Instead of resorting all rows on every quote update, only
re-render when the sort key appears in the quote data, and further, only
resort rows which are changed using bisection based widget insertion to
avoid having `kivy` re-add widgets (and thus re-render graphics) more
often than absolutely necessary.
There's still a ton to polish (and some bugs to fix) but this is a first
working draft of a real-time option chain!
Insights and todos:
- `kivy` widgets need to be cached and reused (eg. rows, cells, etc.)
for speed since it seems creating new ones constantly is quite taxing
on the CPU
- the chain will tear down and re-setup the option data feed stream each
time a different contract expiry button set is clicked
- there's still some weird bug with row highlighting where it seems rows
added from a new expiry set (which weren't previously rendered) aren't
being highlighted reliably
`Row`:
- `no_cell`: support a list of keys for which no cells will be created
- allow passing in a `cell_type` at instantiation
`TickerTable`:
- keep track of rendered rows via a private `_rendered` set
- don't create rows inside `append_row()` expect caller to do it
- never render already rendered widgets in `render_rows()`
Miscellaneous:
- generalize `update_quotes()` to not be tied to specific quote fields
and allow passing in a quote `formatter()` func
- don't bother creating a nursery block until necessary in main
- more commenting
Add some extra fields to each quote that QT should already be
providing (instead of hiding them in the symbol and request contract
info); namely, the expiry and contact type (i.e. put or call).
Define the base set of fields to be displayed in an option chain
UI and add a quote formatter.
Copy out `kivy.clock.triggered` from version 1.10.1 since it isn't yet
available in the `trio`/async branch and use it to throttle the callback
rate. Use a `collections.deque` to LIFO iterate widgets each call
using the heuristic that it's more likely the mouse is still within the
currently highlighted (or it's adjacent neighbors) widget as opposed
to some far away widget (the case when the mouse is moved very
drastically across the window).
Thanks yet again to @tshirtman for suggesting this.
Instead of defining a `on_mouse_pos()` on every widget simply
register and track each widget and loop through them all once (or as much
as is necessary) in a single callback. The assumption here is that we
get a performance boost by looping widgets instead of having `kivy` loop
and call back each widget thus avoiding costly python function calls.
Well that was a doozy; had to rejig pretty much all of it.
The deats:
- Track broker components in a new `DataFeed` namedtuple
- port to new list based batch quotes (not dicts any more)
- lock access to cached broker-client / data-feed instantiation
- respawn tasks that fail due to the network
So much changed to get this working for both stocks and options:
- Index contracts by a new `ContractsKey` named tuple
- Move to pushing lists of quotes instead of dicts since option
subscriptions are often not identified by their "symbol" key and
this makes it difficult at fan out time to know how a quote should
be indexed and delivered. Instead add a special `key` entry to each
quote dict which is the quote's subscription key.
Instead of all this adding/removing of canvas instructions nonsense
simple add a static "highlighted" rectangle to each row and make its
size very small when there's no mouse over.
Mad props to @tshirtman for showing me the light :D
It's still a bit of a shit show, and I've left a lot of commented tweaks
that need to be further played with, but I think this is a much
better look for what I'm considering to be one of the main "entry point"
apps for `piker`. To get any more serious fine tuning the way I want
I may have to talk to some kivy experts as I'm having some headaches
with button borders, padding, and the header row height..
Some of the new changes include:
- port to the new `brokers.data` module
- much darker theme with a stronger terminal vibe
- last trade price and volume amount flash on each trade
- fixed the symbol search bar to be a static height; before it was
getting squashed oddly when using stacked windows
- make all the cells transparent (for now) such that I can just use
a row color (relates to cell padding/spacing - can't seem to ditch it)
- start adding type annotations
Add `contracts` and `optsquote` commands for querying option contracts
info and market quotes respectively. Add a `record` command for
streaming real-time data feed quotes to disk. Port `monitor` to the
new `piker.brokers.data` module. Forward loglevel flags through to
`tractor` for relevant commands.
Add a couple functions for storing and retrieving live json data feed
recordings to disk using a very rudimentary character + newline delimited
format.
Also, split out the pub-sub logic from `stream_quotes()` into a new
func, `fan_out_to_chans()`. Eventually I want to formalize this pattern
into a decorator exposed through `tractor`.
Makes it easy to request all the option contracts for a particular symbol.
Also, let `option_chain()` accept a `date` arg which can be used to only
retrieve quotes for a single expiry date (much faster then getting all
of them).
Every actor now registers (and unregisters) with the arbiter at
startup/teardown. For now the registry is stored in a plain `dict` in
the arbiter's memory. This makes it possible to easily coordinate actors
started as plain Python processes or via `multiprocessing`.
A whole smörgåsbord of changes was required to accomplish this:
- factor handshake steps into a func
- track *every* channel connected to an actor including multiples to the
same remote peer (may want to optimize this later)
- handle `trio.ClosedStreamError` gracefully in the message loop
- add an `open_portal` asynccontextmanager which handles channel
creation, handshaking, and spawning a bg task for msg processing
- add a `start_actor()` for starting in-process actors directly
- add working `get_arbiter()` and `find_actor()` public routines
- `_main` now tries an anonymous channel connect to the stated
arbiter sockaddr and uses that to determine whether to crown itself
Fail gracefully (by "aborting") the same way `trio` does. This avoids
ugly sub-proc tracebacks thrown at the console. Unset the local actor
when `tractor._main` completes. Cancel all tasks for a peer channel on
disconnect.
Drop all channel/connection handling from the core and break up all the
start up steps into compact and useful functions. The main difference is
the daemon now only needs to worry about spawning per broker streaming
tasks and handling symbol list subscription requests.
When an error is raised inside a nursery block (in the local actor)
cancel all spawned children and ensure the error is properly
unsuppressed.
Also,
- change `invoke_cmd` to `send_cmd` and expect the caller to use
`result_from_q` (avoids implicit blocking for responses that might
never arrive)
- `nursery.start()` the channel server block such that we wait for the
underlying listener to spawn before making outbound connections
- cancel the channel server when an actor's main task completes
(given that `outlive_main == False`)
- raise subactor errors directly in the local actors's msg loop
- enforce that `treat_as_gen` async functions respond with a caller id
(`cid`) in each yield packet
Command requests are sent out and responses are handled in a "message
loop" where each command is associated with a "caller id" and multiple
cmds and results are multiplexed on the came inter-actor channel. When
a cmd result arrives it is pushed into a local queue and delivered to the
appropriate calling actor's task. Errors from a remote actor are always shipped
in an "error" packet back to their spawning-parent actor such that any error
in a subactor is always raised directly in the parent. Based on the
first response to a cmd (either a 'return' or 'yield' packet) the caller
side portal will retrieve values by wrapping the local response queue in
either of an async function or generator as appropriate.
- Rename the `Client` to `Channel`
- Add better `__repr__()`
- use laddr, raddr instead of sockaddr, peer
- don't allow re-entrant `Channel.connect()` calls
- Make `Channel` an async iterable
Couple fixes here:
- if no tickers for a watchlist name -> bail
- swallow the symbol data response in the reconnect handler coro
- don't sleep 5 seconds before connecting to subproc daemon...
Resolves#43
When a client loses a connection it will currently need to re-subscribe
for symbols and receive a symbol data summary as a first quote response.
Only run the provided coroutine on reconnect and call the kwarg
`on_reconnect`. The client consuming code is entirely expected at this
point to know how the symbol registration protocol works.
Event if a broker client is already spawned new clients should still
receive a detailed symbol data packet as the first response. Avoid
exposing the new client's queue to the broker (i.e. subscribing it for
quotes) until after first pushing this packet with all bad symbols
filtered out.
Oh boy where to start.
- Handle broken streams in the `StreamQueue` gracefully; terminate the
async generator.
- When a stream queue connection is unwritable discard its subscriptions
inside the quoter task
- If all subscriptions are discarded for a broker then tear down its
quoter task
- Use listener parent nursery for spawning quoter tasks
- Make broker subs data structures global/shared between conn
handler tasks
- Register the `tickers2qs` entry *after* instantiating broker client(s)
(avoids race condition when mulitple client connections are coming
online simultaneously)
- Push smoke quotes to every client not just the first that connects
- Track quoter tasks in a cross-task set
- Handle unsubscriptions more correctly
In order to start working toward a HA distributed
architecture make apps use a `Client` type to talk to daemons.
The `Client` provides fault-tolerance for connection failures such
that the app will continue running until a connection to the original
service can be made or the process is killed. This will make it easier
to simply spawn up new daemon child processes when faults are detected.
Filter out bad symbols by processing an initial batch quote and
pushing to the subscribing client before spawning a quoter task.
This also avoids exposing the quoter task to anything but the
broker module and a `get_quotes()` routine.
Allow client connections to subscribe for quote streams from specific
brokers and spawn broker-client quoter tasks on-demand according
to client connection demands. Support multiple subscribers to a
single daemon process.
Async generators are faster and less code. Handle segmented packets
which can happen during periods of high quote volume. Move per-broker
rate limit logic into daemon task.
Quote queries will hang indefinitely when the network goes down.
Instead poll for network reestablishment such that roaming on
wifi is supported and real-time feeds will resume once the network is
back.
- Add a rate limit cli option
- Allow broker backends to define a max quote query limit
- Add an index ETF list to demonstrate robinhood's real-time prices
- make the % daily change use the previous days close as the reference
price
- color each cell on every change (results in "pulsed" colors on changes)
- tweak some quote fields
- redraw and sort all rows on every quotes update cycle
- error when the QT api is returning None values
Push all ticker quotes to the queue regardless of duplicate
content. That is, don't worry about only pushing new quote changes
(turns out it is useful when coloring a watchlist where multiple
of the same quote may indicate multiple similar trades and we only
want to quickly "pulse" color changes on value changes).
If it is desired to only push new changes, the ``cache`` flag enables
the old behaviour.
Also add `Client.symbols()` for returning symbol data from a sequence of
tickers.
Add `piker quote <tickerA> <tickerB> <tickerC>` command for easily
dumping quote data to the console. With `-df` will dump as a pandas data
frame. Add key filtering to `piker api` calls.
- Extend the qt api to include candles (not working yet), balances, positions.
- Add a `quote()` method to the `Client` for batch ticker quotes and expose
it through a CLI subcommand.
- Make `poll_tickers` push new quotes to a `trio.Queue`
Add a ``poll_tickers`` coro which can be used to "stream" quotes at
a requested rate. Expose through a cli subcommand `piker stream`.
Drop the `pikerd` command for now.
- colorize json response data in logs
- support ``refresh_token`` retrieval from user if the token for some
reason expires while the client is live
- extend api method support for markets, search, symbols, and quotes
- support "proxying" through api calls via an ``api`` coro for one off
client queries (useful for cli testing)
Store tokens in a local config file avoiding any refresh delay
unless necessary when the current access token expires.
Summary:
- move draft main routine into the `brokers` package mod
- start an api wrapper type
- always write the current access tokens to the config on teardown