Add a couple functions for storing and retrieving live json data feed
recordings to disk using a very rudimentary character + newline delimited
format.
Also, split out the pub-sub logic from `stream_quotes()` into a new
func, `fan_out_to_chans()`. Eventually I want to formalize this pattern
into a decorator exposed through `tractor`.
Makes it easy to request all the option contracts for a particular symbol.
Also, let `option_chain()` accept a `date` arg which can be used to only
retrieve quotes for a single expiry date (much faster then getting all
of them).
Every actor now registers (and unregisters) with the arbiter at
startup/teardown. For now the registry is stored in a plain `dict` in
the arbiter's memory. This makes it possible to easily coordinate actors
started as plain Python processes or via `multiprocessing`.
A whole smörgåsbord of changes was required to accomplish this:
- factor handshake steps into a func
- track *every* channel connected to an actor including multiples to the
same remote peer (may want to optimize this later)
- handle `trio.ClosedStreamError` gracefully in the message loop
- add an `open_portal` asynccontextmanager which handles channel
creation, handshaking, and spawning a bg task for msg processing
- add a `start_actor()` for starting in-process actors directly
- add working `get_arbiter()` and `find_actor()` public routines
- `_main` now tries an anonymous channel connect to the stated
arbiter sockaddr and uses that to determine whether to crown itself
Fail gracefully (by "aborting") the same way `trio` does. This avoids
ugly sub-proc tracebacks thrown at the console. Unset the local actor
when `tractor._main` completes. Cancel all tasks for a peer channel on
disconnect.
Drop all channel/connection handling from the core and break up all the
start up steps into compact and useful functions. The main difference is
the daemon now only needs to worry about spawning per broker streaming
tasks and handling symbol list subscription requests.
When an error is raised inside a nursery block (in the local actor)
cancel all spawned children and ensure the error is properly
unsuppressed.
Also,
- change `invoke_cmd` to `send_cmd` and expect the caller to use
`result_from_q` (avoids implicit blocking for responses that might
never arrive)
- `nursery.start()` the channel server block such that we wait for the
underlying listener to spawn before making outbound connections
- cancel the channel server when an actor's main task completes
(given that `outlive_main == False`)
- raise subactor errors directly in the local actors's msg loop
- enforce that `treat_as_gen` async functions respond with a caller id
(`cid`) in each yield packet
Command requests are sent out and responses are handled in a "message
loop" where each command is associated with a "caller id" and multiple
cmds and results are multiplexed on the came inter-actor channel. When
a cmd result arrives it is pushed into a local queue and delivered to the
appropriate calling actor's task. Errors from a remote actor are always shipped
in an "error" packet back to their spawning-parent actor such that any error
in a subactor is always raised directly in the parent. Based on the
first response to a cmd (either a 'return' or 'yield' packet) the caller
side portal will retrieve values by wrapping the local response queue in
either of an async function or generator as appropriate.
- Rename the `Client` to `Channel`
- Add better `__repr__()`
- use laddr, raddr instead of sockaddr, peer
- don't allow re-entrant `Channel.connect()` calls
- Make `Channel` an async iterable
Couple fixes here:
- if no tickers for a watchlist name -> bail
- swallow the symbol data response in the reconnect handler coro
- don't sleep 5 seconds before connecting to subproc daemon...
Resolves#43
When a client loses a connection it will currently need to re-subscribe
for symbols and receive a symbol data summary as a first quote response.
Only run the provided coroutine on reconnect and call the kwarg
`on_reconnect`. The client consuming code is entirely expected at this
point to know how the symbol registration protocol works.
Event if a broker client is already spawned new clients should still
receive a detailed symbol data packet as the first response. Avoid
exposing the new client's queue to the broker (i.e. subscribing it for
quotes) until after first pushing this packet with all bad symbols
filtered out.
Oh boy where to start.
- Handle broken streams in the `StreamQueue` gracefully; terminate the
async generator.
- When a stream queue connection is unwritable discard its subscriptions
inside the quoter task
- If all subscriptions are discarded for a broker then tear down its
quoter task
- Use listener parent nursery for spawning quoter tasks
- Make broker subs data structures global/shared between conn
handler tasks
- Register the `tickers2qs` entry *after* instantiating broker client(s)
(avoids race condition when mulitple client connections are coming
online simultaneously)
- Push smoke quotes to every client not just the first that connects
- Track quoter tasks in a cross-task set
- Handle unsubscriptions more correctly
In order to start working toward a HA distributed
architecture make apps use a `Client` type to talk to daemons.
The `Client` provides fault-tolerance for connection failures such
that the app will continue running until a connection to the original
service can be made or the process is killed. This will make it easier
to simply spawn up new daemon child processes when faults are detected.
Filter out bad symbols by processing an initial batch quote and
pushing to the subscribing client before spawning a quoter task.
This also avoids exposing the quoter task to anything but the
broker module and a `get_quotes()` routine.
Allow client connections to subscribe for quote streams from specific
brokers and spawn broker-client quoter tasks on-demand according
to client connection demands. Support multiple subscribers to a
single daemon process.
Async generators are faster and less code. Handle segmented packets
which can happen during periods of high quote volume. Move per-broker
rate limit logic into daemon task.
Quote queries will hang indefinitely when the network goes down.
Instead poll for network reestablishment such that roaming on
wifi is supported and real-time feeds will resume once the network is
back.
- Add a rate limit cli option
- Allow broker backends to define a max quote query limit
- Add an index ETF list to demonstrate robinhood's real-time prices
- make the % daily change use the previous days close as the reference
price
- color each cell on every change (results in "pulsed" colors on changes)
- tweak some quote fields
- redraw and sort all rows on every quotes update cycle
- error when the QT api is returning None values
Push all ticker quotes to the queue regardless of duplicate
content. That is, don't worry about only pushing new quote changes
(turns out it is useful when coloring a watchlist where multiple
of the same quote may indicate multiple similar trades and we only
want to quickly "pulse" color changes on value changes).
If it is desired to only push new changes, the ``cache`` flag enables
the old behaviour.
Also add `Client.symbols()` for returning symbol data from a sequence of
tickers.
Add `piker quote <tickerA> <tickerB> <tickerC>` command for easily
dumping quote data to the console. With `-df` will dump as a pandas data
frame. Add key filtering to `piker api` calls.
- Extend the qt api to include candles (not working yet), balances, positions.
- Add a `quote()` method to the `Client` for batch ticker quotes and expose
it through a CLI subcommand.
- Make `poll_tickers` push new quotes to a `trio.Queue`
Add a ``poll_tickers`` coro which can be used to "stream" quotes at
a requested rate. Expose through a cli subcommand `piker stream`.
Drop the `pikerd` command for now.
- colorize json response data in logs
- support ``refresh_token`` retrieval from user if the token for some
reason expires while the client is live
- extend api method support for markets, search, symbols, and quotes
- support "proxying" through api calls via an ``api`` coro for one off
client queries (useful for cli testing)
Store tokens in a local config file avoiding any refresh delay
unless necessary when the current access token expires.
Summary:
- move draft main routine into the `brokers` package mod
- start an api wrapper type
- always write the current access tokens to the config on teardown