Compare commits

...

46 Commits

Author SHA1 Message Date
Tyler Goodlet db34999553 Bump `brokers.toml`, update ib and deribit sections
For `[ib]` adjust content to match changes to the
`dockering/ib/README.rst` and for `[deribit]` toss in the WIP options
related params for anyone who wants to play around with @nt's work.
2026-01-06 23:44:33 -05:00
Tyler Goodlet a63c7b13e2 Bump ib-container docs and compose file
Add necessary details for the `brokers.toml`, cleanup and link to the
new GH container repo in the `docker-compose.yml`.
2026-01-06 23:44:33 -05:00
Tyler Goodlet 3b18233573 Bump various `.brokers.core` doc string content/style 2026-01-06 23:44:33 -05:00
Tyler Goodlet b525fa35fe ib: multiline stylings, typing, timeout report 2026-01-06 23:44:33 -05:00
Tyler Goodlet 731166072d Woops, fix to read `.api_port` ref from the `Client.ib.client`.. 2026-01-06 23:44:33 -05:00
Tyler Goodlet 4cdef4d101 Support per-`ib.vnc_addrs` vnc passwords
Such that the `brokers.toml` can contain any of the following
<port> = dict|tuple styles,

```toml
    [ib.vnc_addrs]
    4002 = {host = 'localhost', port = 5900, pw = 'doggy'}  # host, port, pw
    4002 = {host = 'localhost', port = 5900}  # host, port, pw
    4002 = ['localhost', 5900]  # host, port, pw
```

With the first line demonstrating a vnc-server password (as normally set
via a `.env` file in the `dockering/ib/` subdir) with the `pw =` field.
This obviously removes the hardcoded `'doggy'` password from prior.

Impl details in `.brokers.ib._util`:
- pass the `ib.api.Client` down into `vnc_click_hack()` doing all config
 reading within and removing host, port unpacking in the callingn
 `data_reset_hack()`.
- also pass the client `try_xdo_manual()` and comment (with plans to
  remove) the recently added localhost-only fallback section since
  we now have a fully working py vnc client again with `pyvnc` B)
- in `vnc_click_hack()` match for all the possible config line styles
  and,
  * pass any `pw` field to `pyvncVNCConfig`,
  * continue matching host, port without password,
  * fallthrough to raising a val-err when neither ^ match.
2026-01-06 23:44:33 -05:00
Tyler Goodlet fd725743a1 ib: bump `docker/ib/README.rst`
For the new github image, a high-level look at its basic
features/usage/docs and prosing around our expected default usage with
the `piker.brokers.ib` backend.
2026-01-06 23:44:33 -05:00
Tyler Goodlet 64ebfd226f ib.feed: better no-bars error-log message format 2026-01-06 23:44:33 -05:00
Tyler Goodlet e20aa4a2a6 Set `.bs_mktid` on all IB position-msg emissions.. 2026-01-06 23:44:33 -05:00
Tyler Goodlet 007e7e343b Switch to `pyvnc` for IB reset hackz
It actually works for vncAuth(2) (thank god!) which the previous
`asyncvnc` **did not**, and seems to be mostly based on the work
from the `asyncvnc` author anyway (so all my past efforts don't seem to
have been in vain XD).

NOTE, the below deats ended up being factored in earlier into the
`pyproject.toml` alongside nix(os) support needed for testing and
landing this history. As the such, the comments are the originals but
the changes are not.

Deats,
- switch to `pyvnc` async API (using `asyncio` again obvi) in
  `.ib._util._vnc_click_hack()`.
- add `pyvnc` as src installed dep from GH.
- drop `asyncvnc` as dep.

Other,
- update `pytest` version range to avoid weird auto-load plugin exposed
  by `xonsh`?
- add a `tool.pytest.ini_options` to project file with vars to,
  - disable that^ `xonsh` plug using `addopts = '-p no:xonsh'`.
  - set a `testpaths` to avoid running anything but that subdir.
  - try out the `'progress'` style console output (does it work?).
2026-01-06 23:44:33 -05:00
Tyler Goodlet 0817b2ab4b Convert remaining `.to_asyncio.open_channel_from()` to `chan` fn-sig usage 2026-01-06 23:44:33 -05:00
Tyler Goodlet 43e505e88f `ib.feed`: finally solve `push()` exc propagation
Such that if/when the `push()` ticker callback (closure) errors
internally, we actually eventually bubble the error out-and-up from the
`asyncio.Task` and from there out the `.to_asyncio.open_channel_from()` to
the parent `trio.Task`..

It ended up being much more subtle to solve then i would have liked
thanks to,

- whatever `Ticker.updateEvent.connect()` does behind the scenes in
  terms of (clearly) swallowing with only log reporting any exc raised
  in the registered callback (in our case `push()`),

- `asyncio.Task.set_excepion()` never working and instead needing to
  resort to `Task.cancel()`, catching `CancelledError` and re-raising
  the stashed `maybe_exc` from `push()` when set..

Further this ports `.to_asyncio.open_channel_from()` usage to use
the new `chan: tractor.to_asyncio.LinkedTaskChannel` fn-sig API, namely
for `_setup_quote_stream()` task. Requires the latest `tractor` updates
to the inter-eventloop-chan iface providing a `.set_nowait()` and
`.get()` for the `asyncio`-side.

Impl deats within `_setup_quote_stream()`,
- implement `push()` error-bubbling by adding a `maybe_exc` which can be
  set by that callback itself or by its registering task; when set it is
  both,
  * reported on by the `teardown()` cb,
  * re-raised by the terminated (via `.cancel()`) `asyncio.Task` after
    woken from its sleep, aka "cancelled" (since that's apparently one
    of the only options.. see big rant further todo comments).
- add explicit error-tolerance-tuning via a `handler_tries: int` counter
  and `tries_before_raise: int` limit such that we only bubble
  a `push()` raised exc once enough tries have consecutively failed.
- as mentioned, use the new `chan` fn-sig support and thus the new
  method API for `asyncio` -> `trio` comms.
- a big TODO XXX around the need to use a better sys for terminating
  `asyncio.Task`s whether it's by delegating to some `.to_asyncio`
  internals after a factor-out OR by potentially going full bore `anyio`
  throughout `.to_asyncio`'s impl in general..
- mk `teardown()` use appropriate `log.<level>()`s based on outcome.

Surroundingly,
- add a ton of doc-strings to mod fns previously missing them.
- improved / added-new comments to `wait_on_data_reset()` internals and
  anything changed per ^above.

NOTE, resolved conflicts on `piker/brokers/ib/feed.py` due to
`brokers_refinery` commit:

d809c797 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout!
2026-01-06 23:44:33 -05:00
Tyler Goodlet b31ca5c3c6 `ib`: various type-annot, multiline styling and todos updates 2026-01-06 23:44:33 -05:00
Tyler Goodlet 5342f163fb ib: add venue-hours checking
Such that we can avoid other (pretty unreliable) "alternative" checks to
determine whether a real-time quote should be waited on or (when venue
is closed) we should just signal that historical backfilling can
commence immediately.

This has been a todo for a very long time and it turned out to be much
easier to accomplish than anticipated..

Deats,
- add a new `is_current_time_in_range()` dt range checker to predicate
  whether an input range contains `datetime.now(start_dt.tzinfo)`.
- in `.ib.feed.stream_quotes()` add a `venue_is_open: bool` which uses
  all of the new ^^ to determine whether to branch for the
  short-circuit-and-do-history-now-case or the std real-time-quotes
  should-be-awaited-since-venue-is-open, case; drop all the old hacks
  trying to workaround not figuring that venue state stuff..

Other,
- also add a gpt5 composed parser to `._util` for the
  `ib_insync.ContractDetails.tradingHours: str` for before i realized
  there was a `.tradingSessions` property XD
- in `.ib_feed`,
  * add various EG-collapsings per recent tractor/trio updates.
  * better logging / exc-handling around ticker quote pushes.
  * stop clearing `Ticker.ticks` each quote iteration; not sure if this
    is needed/correct tho?
  * add masked `Ticker.ticks` poll loop that logs.
- fix some `str.format()` usage in `._util.try_xdo_manual()`

NOTE, resolved conflicts on `piker/brokers/ib/feed.py` due to
rebasing onto up stream `brokers_refinery` commit,

d809c797 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout
2026-01-06 23:44:33 -05:00
Tyler Goodlet 09059ce956 ib: never relay "Warning:" errors to EMS..
You'd think they could be bothered to make either a "log" or "warning"
msg type instead of a `type='error'`.. but alas, this attempts to detect
all such "warning"-errors and never proxy them to the clearing engine
thus avoiding the cancellation of any associated (by `reqid`)
pre-existing orders (control dialogs).

Also update all surrounding log messages to a more multiline style.
2026-01-06 23:44:33 -05:00
Tyler Goodlet 8283a1d482 ib: jig `.data_reset_hack()` with vnc-client failover
Since apparently porting to the new docker container enforces using
a vnc password and `asyncvnc` seems to have a bug/mis-config whenever
i've tried a pw over a wg tunnel..?

Soo, this tries out the old `i3ipc`-win-focus + `xdo` click hack when
the above fails.

Deats,
- add a mod-level `try_xdo_manual()` to wrap calling
  `i3ipc_xdotool_manual_click_hack()` with an oserr handler, ensure we
  don't bother trying if `i3ipc` import fails beforehand tho.
- call ^ from both the orig case block and the failover from the
  vnc-client case.
- factor the `+no_setup_msg: str` out to mod level and expect it to be
  `.format()`-ed.
- refresh todo around `asyncvnc` pw ish..
- add a new `i3ipc_fin_wins_titled()` window-title scanner which
  predicates input `titles` and delivers any matches alongside the orig
  focused win at call time.
- tweak `i3ipc_xdotool_manual_click_hack()` to call ^ and remove prior
  unfactored window scanning logic.
2026-01-06 23:44:33 -05:00
Tyler Goodlet 9be8ca6097 binance: add `AggTrade.nq: float`: "normal quantity" field.. 2026-01-06 23:43:44 -05:00
Tyler Goodlet bda8154d55 binance: handle new `TRADIFI_PERPETUAL`.. 2026-01-06 23:43:44 -05:00
Tyler Goodlet fd4dca9963 binance: add `Pair.opoAllowed` field
Handle new API field per 2025-12-02 update.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-06 23:43:44 -05:00
Tyler Goodlet 3c024206d4 binance: set `Pair.pegInstructionsAllowed = False`
Lol, a cheeky unforeseen bug due to TOML's lack of a null type and
thinking i can render an `Optional` field on a `msgspec.Struct`
(defaulted to `None`) the `binance.symcache.toml` cache file..

I didn't catch this when i first updated to the 3.1 API in f7caa75228
because i never did a cache-files flush.. lesson learned and we **really
need tests for this**!!
2026-01-06 23:43:44 -05:00
Tyler Goodlet 4e9394f24b Add fix for binance API 3.1 rollout..
See https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
2026-01-06 23:43:44 -05:00
Tyler Goodlet cc0da23687 kraken: add crash-handling around `Pair()` init
Since it can otherwise be difficult to debug due to nursery cancellation
(we need that taskman yo!).
2026-01-06 23:43:44 -05:00
Tyler Goodlet c6998431ea kraken: `Pair.costmin` is now optional?
Some pairs don't seem to define it but it's not listed as deprecated on
official API page (new one now linked in type def's doc string).
2026-01-06 23:43:44 -05:00
Tyler Goodlet af39a8d0a7 binance: add new `permissionSets` to base `Pair` 2026-01-06 23:43:44 -05:00
Tyler Goodlet 85834b41eb Update `binance` spot pairs with `amendAllowed`
As per API updates,
https://developers.binance.com/docs/binance-spot-api-docs
https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority

I also slightly tweaked the filed mismatch exception note to include the
`repr(pair_type)` so the dev can know which pair types should be
changed.
2026-01-06 23:43:44 -05:00
Tyler Goodlet 04be48e2d2 `.kraken`: add masked pauses for order req debug
Such that the next time i inevitably must debug the some order-request
error status or precision discrepancy, i have the mkt-symbol branch
ready to go. Also, switch to `'action': 'buy'|'sell' as action,` style
`case` matching instead of the post-`if` predicate style.
2026-01-06 23:43:44 -05:00
Tyler Goodlet b6d8ddae94 `.questrade`: link in ws-API issue! 2026-01-06 23:43:44 -05:00
Tyler Goodlet 925a12bd81 `.kraken.broker`: need to `await verify_balances()` .. 2026-01-06 23:43:44 -05:00
Tyler Goodlet 13b7dfe1d0 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! 2026-01-06 23:43:44 -05:00
Tyler Goodlet 19609b3214 `.brokers.cli`: module type and todo for `--pdb` flag to NOT src from sub-cmd 2026-01-06 23:43:44 -05:00
Tyler Goodlet 51541b46be Type loaded backend modules 2026-01-06 23:43:44 -05:00
Gud Boi f218cf450e Merge pull request 'port_to_latest_tractor'
#45 from port_to_latest_tractor into main
Reviewed-on: https://www.pikers.dev/pikers/piker/pulls/45
2026-01-07 04:43:27 +00:00
Tyler Goodlet c77aca1f90 Flip (back) `pikerd` to use TCP by default
It'll break all non-linux OS-platforms atm and bc it should only be set
to a "non-std transport" through the config anyways.

Yeah yeah, we're slowly appealing to the frickin masses..
2026-01-06 23:34:32 -05:00
Tyler Goodlet 3adbabcba6 Use `pytest` plugin now exposed by `tractor` 2026-01-06 22:27:58 -05:00
Tyler Goodlet 2b17b99964 `.ui._search`: collapse EGs as needed, use `tn` naming. 2026-01-06 22:27:58 -05:00
Tyler Goodlet f3767e4269 Port `.data._web_bs` stuff to strict-EGs
Using `tractor.trionics.collapse_eg()` as needed and doing
some renames, in similar style as elsewhere:
- `pcs` -> `rent_cs`,
- `n` -> `tn` for nursery handles,

Also,
- tweak the `._reconnect_forever()` while loop to use the
  (also) `trio`-internal
  `mc_state: trio._channel.MemoryChannelState = snd._state` instead
  of `snd._close` to poll for open send/receive consumer task counts
  since,
    1. it seems more reliable then using the `snd._closed`,
    2. there's no other way to access the info.. afaik?

- handle `ConnectionRejected` explicitly alongside handshake-errs as
  a retry case.
- add a base-exc handler which `.exception()` reports the reconnect
  attempt failure explicitly.
- drop some lingering `Optional` usage.
2026-01-06 22:27:58 -05:00
Tyler Goodlet c065ff6b86 Port `.cli` & `.service` to latest `tractor` registry APIs
Namely changes for the `registry_addrs: list`, enable_transports: list`
and related `tractor._addr` primitive requirements.

Other updates include,
- passing `maybe_enable_greenback=True`,
- additional exc logging around `pikerd` syncing/booting,
- changing to newer `Context.wait_for_result()`,
- dropping (unnecessary?) `maybe_open_crash_handler()` around `pikerd` ep.
2026-01-06 22:27:58 -05:00
Tyler Goodlet 5dc0ecc802 binance; unmask around send-chan @acm usage 2026-01-06 22:27:58 -05:00
Tyler Goodlet ff81e57e73 Spurious first-draft of EG collapsing
Topically, throughout various (seemingly) console-UX-affecting or benign
spots in the code base; nothing that required more intervention beyond
things superficial. A few spots also include `trio.Nursery` ref renames
(always to something with a `tn` in it) and log-level reductions to
quiet (benign) console noise oriented around issues meant to be solved
long..

Note there's still a couple spots i left with the loose-ify flag because
i haven't fully tested them without using the latest version of
`tractor.trionics.collapse_eg()`, but more then likely they should flip
over fine.
2026-01-06 22:27:58 -05:00
Tyler Goodlet ef748c7599 Use `.trionics.collapse_eg()` in `.deribit.api`
Commit this change separate from the (original) broader set applied to
the entire code base since the `.deribit.api` mod contained changes from
upstream max-pain work (from our very own @nt) which caused a noticeable
conflict and intros un-required changes from his work to re-enable
`deribit` support.

Note the original commit, "69eac7bb Spurious first-draft of EG
collapsing", applied similar changes through the rest of the code base.
AGAIN, this mod's change is only being broken out to minimize upstream
change conflicts due to updates to the `deribit` backend done earlier in
time-history.
2026-01-06 22:27:58 -05:00
Tyler Goodlet 3f6853a437 Try running daemons on UDS tpt
The root daemon, pikerd, needs to be adjusted to use diff default
registry addrs to also utilize non-TCP, but for now this gets us started
testing; so far so good B)
2026-01-06 22:27:58 -05:00
Tyler Goodlet 0bd8cd1882 Adjust feed status fields/display-pane to new actor-ID
That is to use the new `tractor.msg.types.Aid` struct to pull the
`brokerd` info from the `tractor.Channel.aid: Aid` attr as well as more
generally handling the new `Channel.raddr.proto_key: str` and no longer
assuming a TCP IPC transport; this per the recent `tractor.ipc`
subsys which adds multi-IPC-transports!

Downstream tweaks to match,
- use an "opt-in" field set to display in the `brokerd` info pane in
  `.ui._feedstatus.mk_feed_label()`.
 |_ also add some todos and drop some seemingly unneeded form sizing
    calcs?
- tweak `.ui._label` to allow not using markdown, though ended up not
  doing that since it looked too plain..
2026-01-06 22:27:58 -05:00
Tyler Goodlet 28db478da1 Adjust to `trio`'s strict eg nurseries throughout!
Using `tractor.trionics.collapse_eg()` as needed to avoid, at the least,
crash-worthy (in debug-mode REPL-ing terms) nested cancellation egs that
exhibit on SIGINT/ctl-c of each "app" (chart & daemon).

Also a bit of renaming of all `trio.Nursery`s to `tn`, the new "task
nursery" shorthand-var-name being used in all our other `tractor`
related projects.
2026-01-06 22:27:58 -05:00
Tyler Goodlet d36575cd0d Port to newer `tractor.get_registry()` 2026-01-06 22:27:58 -05:00
Tyler Goodlet 9a2b43495d Update legacy type to `tractor.MsgStream` 2026-01-06 22:27:58 -05:00
Gud Boi 8a17a75ba2 Merge pull request 'decimal_prices_thru_ems
Yeah, just suck it up and do `Order.price: Decimal` for now..'

(#44) from decimal_prices_thru_ems into main
Reviewed-on: https://www.pikers.dev/pikers/piker/pulls/44
2026-01-07 03:25:27 +00:00
44 changed files with 1455 additions and 608 deletions

View File

@ -1,6 +1,5 @@
################
# ---- CEXY ----
################
[binance]
accounts.paper = 'paper'
@ -13,28 +12,41 @@ accounts.spot = 'spot'
spot.use_testnet = false
spot.api_key = ''
spot.api_secret = ''
# ------ binance ------
[deribit]
# std assets
key_id = ''
key_secret = ''
# options
accounts.option = 'option'
option.use_testnet = false
option.key_id = ''
option.key_secret = ''
# aux logging from `cryptofeed`
option.log.filename = 'cryptofeed.log'
option.log.level = 'DEBUG'
option.log.disabled = true
# ------ deribit ------
[kraken]
key_descr = ''
api_key = ''
secret = ''
# ------ kraken ------
[kucoin]
key_id = ''
key_secret = ''
key_passphrase = ''
# ------ kucoin ------
################
# -- BROKERZ ---
################
[questrade]
refresh_token = ''
access_token = ''
@ -42,44 +54,55 @@ api_server = 'https://api06.iq.questrade.com/'
expires_in = 1800
token_type = 'Bearer'
expires_at = 1616095326.355846
# ------ questrade ------
[ib]
# define the (set of) host-port socketaddrs that
# brokerd.ib will scan to connect to an API endpoint
# (ib-gw or ib-tws listening instances)
hosts = [
'127.0.0.1',
]
# XXX: the order in which ports will be scanned
# (by the `brokerd` daemon-actor)
# is determined # by the line order here.
# TODO: when we eventually spawn gateways in our
# container, we can just dynamically allocate these
# using IBC.
ports = [
4002, # gw
7497, # tws
]
# XXX: for a paper account the flex web query service
# is not supported so you have to manually download
# and XML report and put it in a location that can be
# accessed by the ``brokerd.ib`` backend code for parsing.
flex_token = ''
flex_trades_query_id = '' # live account
# when clients are being scanned this determines
# which clients are preferred to be used for data
# feeds based on the order of account names, if
# detected as active on an API client.
# When API endpoints are being scanned durin startup, the order
# of user-defined-account "names" (as defined below) here
# determines which py-client connection is given priority to be
# used for data-feed-requests by according to whichever client
# connected to an API endpoing which reported the equivalent
# account number for that name.
prefer_data_account = [
'paper',
'margin',
'ira',
]
# For long-term trades txn (transaction) history
# processing (i.e your txn ledger with IB) you can
# (automatically for live accounts) query the FLEX
# report system for past history.
#
# (For paper accounts the web query service
# is not supported so you have to manually download
# an XML report and put it in a location that can be
# accessed by our `brokerd.ib` backend code for parsing).
#
flex_token = ''
flex_trades_query_id = '' # live account
# define "aliases" (names) for each account number
# such that the names can be reffed and logged throughout
# `piker.accounting` subsys and more easily
# referred to by the user.
#
# These keys will be the set exposed through the order-mode
# account-selection UI so that numbers are never shown.
[ib.accounts]
# the order in which accounts will be selectable
# in the order mode UI (if found via clients during
# API-app scanning)when a new symbol is loaded.
paper = 'XX0000000'
margin = 'X0000000'
ira = 'X0000000'
paper = 'DU0000000' # <- literal account #
margin = 'U0000000'
ira = 'U0000000'
# ------ ib ------

View File

@ -1,30 +1,138 @@
running ``ib`` gateway in ``docker``
------------------------------------
We have a config based on the (now defunct)
image from "waytrade":
We have a config based on a well maintained community
image from `@gnzsnz`:
https://github.com/waytrade/ib-gateway-docker
https://github.com/gnzsnz/ib-gateway-docker
To startup this image with our custom settings
simply run the command::
To startup this image simply run the command::
docker compose up
And you should have the following socket-available services:
(For further usage^ see the official `docker-compose`_ docs)
- ``x11vnc1@127.0.0.1:3003``
- ``ib-gw@127.0.0.1:4002``
You can attach to the container via a VNC client
without password auth.
And you should have the following socket-available services by
default:
SECURITY STUFF!?!?!
-------------------
Though "``ib``" claims they host filter connections outside
localhost (aka ``127.0.0.1``) it's probably better if you filter
the socket at the OS level using a stateless firewall rule::
- ``x11vnc1 @ 127.0.0.1:5900``
- ``ib-gw @ 127.0.0.1:4002``
You can now attach to the container via a VNC client with password-auth;
here is an example using ``vncclient`` on ``linux``::
vncviewer localhost:5900
now enter the pw (password) you set via an (see second code blob)
`.env file`_ or pw-file according to the `credentials section`_.
If you want to change away from their default config see the example
`docker-compose.yml`-config issue and config-section of the readme,
- https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration
- https://github.com/gnzsnz/ib-gateway-docker/discussions/103
.. _.env file: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#how-to-use-it
.. _docker-compose: https://docs.docker.com/compose/
.. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials
Connecting to the API from `piker`
---------------------------------
In order to expose the container's API endpoint to the
`brokerd/datad/ib` actor, we need to add a section to the user's
`brokers.toml` config (note the below is similar to the repo-shipped
template file),
.. code:: toml
[ib]
# define the (set of) host-port socketaddrs that
# brokerd.ib will scan to connect to an API endpoint
# (ib-gw or ib-tws listening instances)
hosts = [
'127.0.0.1',
]
ports = [
4002, # gw
7497, # tws
]
# When API endpoints are being scanned durin startup, the order
# of user-defined-account "names" (as defined below) here
# determines which py-client connection is given priority to be
# used for data-feed-requests by according to whichever client
# connected to an API endpoing which reported the equivalent
# account number for that name.
prefer_data_account = [
'paper',
'margin',
'ira',
]
# define "aliases" (names) for each account number
# such that the names can be reffed and logged throughout
# `piker.accounting` subsys and more easily
# referred to by the user.
#
# These keys will be the set exposed through the order-mode
# account-selection UI so that numbers are never shown.
[ib.accounts]
paper = 'XX0000000'
margin = 'X0000000'
ira = 'X0000000'
the broker daemon can also connect to the container's VNC server for
added functionalies including,
- viewing the API endpoint program's GUI for manual interventions,
- workarounds for historical data throttling using hotkey hacks,
Add a further section to `brokers.toml` which maps each API-ep's
port to a table of VNC server connection info like,
.. code:: toml
[ib.vnc_addrs]
4002 = {host = 'localhost', port = 5900, pw = 'doggy'}
The `pw = 'doggy'` here ^ should the same value as the particular
container instances `.env` file setting (when it was run),
.. code:: ini
VNC_SERVER_PASSWORD='doggy'
IF you also want to run ``TWS``
-------------------------------
You can also run it containerized,
https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#using-tws
SECURITY stuff (advanced, only if you're paranoid)
--------------------------------------------------
First and foremost if doing a "distributed" container setup where you
run the ``ib-gw`` docker container and your connecting API client
(likely ``ib_async`` from python) on **different hosts** be sure to
read the `security considerations`_ section!
And for a further (somewhat paranoid) perspective from
a long-time-ago serious devops eng..
Though "``ib``" claims they filter remote host connections outside
``localhost`` (aka ``127.0.0.1`` on ipv4) it's prolly justified if
you'd like to filter the socket at the *OS level* using a stateless
firewall rule::
ip rule add not unicast iif lo to 0.0.0.0/0 dport 4002
We will soon have this baked into our own custom image but for
now you'll have to do it urself dawgy.
We will soon have this either baked into our own custom derivative
image (or patched into the current upstream one after further testin)
but for now you'll have to do it urself, diggity dawg.
.. _security considerations: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#security-considerations

View File

@ -1,10 +1,15 @@
# rework from the original @
# https://github.com/waytrade/ib-gateway-docker/blob/master/docker-compose.yml
version: "3.5"
# a community maintained IB API container!
#
# https://github.com/gnzsnz/ib-gateway-docker
#
# For piker we (currently) include some minor deviations
# for some config files in the `volumes` section.
#
# See full configuration settings @
# - https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration
# - https://github.com/gnzsnz/ib-gateway-docker/discussions/103
services:
ib_gw_paper:
# apparently java is a mega cukc:
@ -50,16 +55,22 @@ services:
target: /root/scripts/run_x11_vnc.sh
read_only: true
# NOTE:to fill these out, define an `.env` file in the same dir as
# this compose file which looks something like:
# TWS_USERID='myuser'
# TWS_PASSWORD='guest'
# NOTE: an alt method to fill these out is to
# define an `.env` file in the same dir as
# this compose file.
environment:
TWS_USERID: ${TWS_USERID}
# TWS_USERID: 'myuser'
TWS_PASSWORD: ${TWS_PASSWORD}
TRADING_MODE: 'paper'
VNC_SERVER_PASSWORD: 'doggy'
VNC_SERVER_PORT: '3003'
# TWS_PASSWORD: 'guest'
TRADING_MODE: ${TRADING_MODE}
# TRADING_MODE: 'paper'
VNC_SERVER_PASSWORD: ${VNC_SERVER_PASSWORD}
# VNC_SERVER_PASSWORD: 'doggy'
# TODO, see if we can get this supported like it
# was on the old `waytrade` image?
# VNC_SERVER_PORT: '3003'
# ports:
# - target: 4002
@ -76,6 +87,9 @@ services:
# - "127.0.0.1:4002:4002"
# - "127.0.0.1:5900:5900"
# TODO, a masked but working example of dual paper + live
# ib-gw instances running in a single app run!
#
# ib_gw_live:
# image: waytrade/ib-gateway:1012.2i
# restart: no

View File

@ -121,6 +121,7 @@ async def bot_main():
# tick_throttle=10,
) as feed,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
assert accounts

View File

@ -365,7 +365,11 @@ class Position(Struct):
# added: bool = False
tid: str = t.tid
if tid in self._events:
log.warning(f'{t} is already added?!')
log.debug(
f'Txn is already added?\n'
f'\n'
f'{t}\n'
)
# return added
# TODO: apparently this IS possible with a dict but not
@ -731,7 +735,7 @@ class Account(Struct):
else:
# TODO: we reallly need a diff set of
# loglevels/colors per subsys.
log.warning(
log.debug(
f'Recent position for {fqme} was closed!'
)

View File

@ -98,13 +98,14 @@ async def open_cached_client(
If one has not been setup do it and cache it.
'''
brokermod = get_brokermod(brokername)
brokermod: ModuleType = get_brokermod(brokername)
# TODO: make abstract or `typing.Protocol`
# client: Client
async with maybe_open_context(
acm_func=brokermod.get_client,
kwargs=kwargs,
) as (cache_hit, client):
if cache_hit:
log.runtime(f'Reusing existing {client}')

View File

@ -96,7 +96,10 @@ async def _setup_persistent_brokerd(
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with trio.open_nursery() as service_nursery:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,

View File

@ -374,9 +374,14 @@ class Client:
pair: Pair = pair_type(**item)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
f'\n'
f'New or removed field we need to codify!\n'
f'pair-type: {pair_type!r}\n'
f'\n'
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
f'Check out their API docs here:\n'
f'\n'
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
)
raise
pair_table[pair.symbol.upper()] = pair

View File

@ -440,6 +440,7 @@ async def open_trade_dialog(
# - ledger: TransactionLedger
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
ctx.open_stream() as ems_stream,
):

View File

@ -94,13 +94,15 @@ class L1(Struct):
# validation type
# https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Aggregate-Trade-Streams#response-example
class AggTrade(Struct, frozen=True):
e: str # Event type
E: int # Event time
s: str # Symbol
a: int # Aggregate trade ID
p: float # Price
q: float # Quantity
q: float # Quantity with all the market trades
nq: float # Normal quantity without the trades involving RPI orders
f: int # First trade ID
l: int # noqa Last trade ID
T: int # Trade time
@ -448,7 +450,6 @@ async def subscribe(
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
@ -460,6 +461,7 @@ async def stream_quotes(
) -> None:
async with (
tractor.trionics.maybe_raise_from_masking_exc(),
send_chan as send_chan,
open_cached_client('binance') as client,
):

View File

@ -97,6 +97,16 @@ class Pair(Struct, frozen=True, kw_only=True):
baseAsset: str
baseAssetPrecision: int
permissionSets: list[list[str]]
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
# will become non-optional 2025-08-28?
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
pegInstructionsAllowed: bool = False
# https://developers.binance.com/docs/binance-spot-api-docs#2025-12-02
opoAllowed: bool = False
filters: dict[
str,
str | int | float,
@ -142,7 +152,11 @@ class SpotPair(Pair, frozen=True):
defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str]
permissions: list[str]
permissionSets: list[list[str]]
# can the paint botz creat liq gaps even easier on this asset?
# Bp
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
amendAllowed: bool
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair'
@ -209,7 +223,10 @@ class FutesPair(Pair):
assert pair == self.pair # sanity
return f'{expiry}'
case 'PERPETUAL':
case (
'PERPETUAL'
| 'TRADIFI_PERPETUAL'
):
return 'PERP'
case '':
@ -238,7 +255,10 @@ class FutesPair(Pair):
margin: str = self.marginAsset
match ctype:
case 'PERPETUAL':
case (
'PERPETUAL'
| 'TRADIFI_PERPETUAL'
):
return f'{margin}M'
case (

View File

@ -471,11 +471,15 @@ def search(
'''
# global opts
brokermods = list(config['brokermods'].values())
brokermods: list[ModuleType] = list(config['brokermods'].values())
# TODO: this is coming from the `search --pdb` NOT from
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
# define tractor entrypoint
async def main(func):
async with maybe_open_pikerd(
loglevel=config['loglevel'],
debug_mode=pdb,

View File

@ -22,7 +22,9 @@ routines should be primitive data types where possible.
"""
import inspect
from types import ModuleType
from typing import List, Dict, Any, Optional
from typing import (
Any,
)
import trio
@ -34,8 +36,10 @@ from ..accounting import MktPair
async def api(brokername: str, methname: str, **kwargs) -> dict:
"""Make (proxy through) a broker API call by name and return its result.
"""
'''
Make (proxy through) a broker API call by name and return its result.
'''
brokermod = get_brokermod(brokername)
async with brokermod.get_client() as client:
meth = getattr(client, methname, None)
@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
async def stocks_quote(
brokermod: ModuleType,
tickers: List[str]
) -> Dict[str, Dict[str, Any]]:
"""Return quotes dict for ``tickers``.
"""
tickers: list[str]
) -> dict[str, dict[str, Any]]:
'''
Return a `dict` of snapshot quotes for the provided input
`tickers`: a `list` of fqmes.
'''
async with brokermod.get_client() as client:
return await client.quote(tickers)
@ -74,13 +82,15 @@ async def stocks_quote(
async def option_chain(
brokermod: ModuleType,
symbol: str,
date: Optional[str] = None,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option chain for ``symbol`` for ``date``.
date: str|None = None,
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return option chain for ``symbol`` for ``date``.
By default all expiries are returned. If ``date`` is provided
then contract quotes for that single expiry are returned.
"""
'''
async with brokermod.get_client() as client:
if date:
id = int((await client.tickers2ids([symbol]))[symbol])
@ -98,7 +108,7 @@ async def option_chain(
# async def contracts(
# brokermod: ModuleType,
# symbol: str,
# ) -> Dict[str, Dict[str, Dict[str, Any]]]:
# ) -> dict[str, dict[str, dict[str, Any]]]:
# """Return option contracts (all expiries) for ``symbol``.
# """
# async with brokermod.get_client() as client:
@ -110,15 +120,24 @@ async def bars(
brokermod: ModuleType,
symbol: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option contracts (all expiries) for ``symbol``.
"""
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return option contracts (all expiries) for ``symbol``.
'''
async with brokermod.get_client() as client:
return await client.bars(symbol, **kwargs)
async def search_w_brokerd(name: str, pattern: str) -> dict:
async def search_w_brokerd(
name: str,
pattern: str,
) -> dict:
# TODO: WHY NOT WORK!?!
# when we `step` through the next block?
# import tractor
# await tractor.pause()
async with open_cached_client(name) as client:
# TODO: support multiple asset type concurrent searches.
@ -130,12 +149,12 @@ async def symbol_search(
pattern: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return symbol info from broker.
'''
results = []
results: list[str] = []
async def search_backend(
brokermod: ModuleType
@ -143,6 +162,13 @@ async def symbol_search(
brokername: str = mod.name
# TODO: figure this the FUCK OUT
# -> ok so obvi in the root actor any async task that's
# spawned outside the main tractor-root-actor task needs to
# call this..
# await tractor.devx._debug.maybe_init_greenback()
# tractor.pause_from_sync()
async with maybe_spawn_brokerd(
mod.name,
infect_asyncio=getattr(
@ -162,7 +188,6 @@ async def symbol_search(
))
async with trio.open_nursery() as n:
for mod in brokermods:
n.start_soon(search_backend, mod.name)
@ -172,11 +197,13 @@ async def symbol_search(
async def mkt_info(
brokermod: ModuleType,
fqme: str,
**kwargs,
) -> MktPair:
'''
Return MktPair info from broker including src and dst assets.
Return the `piker.accounting.MktPair` info struct from a given
backend broker tradable src/dst asset pair.
'''
async with open_cached_client(brokermod.name) as client:

View File

@ -31,7 +31,7 @@ from typing import (
Callable,
)
import pendulum
from pendulum import now
import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
@ -39,6 +39,7 @@ import numpy as np
from tractor.trionics import (
broadcast_receiver,
maybe_open_context
collapse_eg,
)
from tractor import to_asyncio
# XXX WOOPS XD
@ -432,6 +433,7 @@ async def get_client(
) -> Client:
async with (
collapse_eg(),
trio.open_nursery() as n,
open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc

View File

@ -20,6 +20,11 @@ runnable script-programs.
'''
from __future__ import annotations
from datetime import ( # noqa
datetime,
date,
tzinfo as TzInfo,
)
from functools import partial
from typing import (
Literal,
@ -33,7 +38,7 @@ from piker.brokers._util import get_logger
if TYPE_CHECKING:
from .api import Client
from ib_insync import IB
import i3ipc
log = get_logger('piker.brokers.ib')
@ -48,8 +53,39 @@ _reset_tech: Literal[
] = 'vnc'
no_setup_msg:str = (
'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
def try_xdo_manual(
client: Client,
):
'''
Do the "manual" `xdo`-based screen switch + click
combo since apparently the `asyncvnc` client ain't workin..
Note this is only meant as a backup method for Xorg users,
ideally you can use a real vnc client and the `vnc_click_hack()`
impl!
'''
global _reset_tech
try:
i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
vnc_sockaddr: str = client.conf.vnc_addrs
log.exception(
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
)
return False
async def data_reset_hack(
# vnc_host: str,
client: Client,
reset_type: Literal['data', 'connection'],
@ -81,65 +117,60 @@ async def data_reset_hack(
that need to be wrangle.
'''
ib_client: IB = client.ib
# look up any user defined vnc socket address mapped from
# a particular API socket port.
api_port: str = str(ib_client.client.port)
vnc_host: str
vnc_port: int
vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs')
no_setup_msg:str = (
f'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
if not vnc_sockaddr:
vnc_addrs: tuple[str]|None = client.conf.get('vnc_addrs')
if not vnc_addrs:
log.warning(
no_setup_msg
no_setup_msg.format(vnc_sockaddr=client.conf)
+
'REQUIRES A `vnc_addrs: array` ENTRY'
)
vnc_host, vnc_port = vnc_sockaddr.get(
api_port,
('localhost', 3003)
)
global _reset_tech
match _reset_tech:
case 'vnc':
try:
await tractor.to_asyncio.run_task(
partial(
vnc_click_hack,
host=vnc_host,
port=vnc_port,
client=client,
)
)
except OSError:
if vnc_host != 'localhost':
log.warning(no_setup_msg)
return False
except (
OSError, # no VNC server avail..
PermissionError, # asyncvnc pw fail..
):
try:
import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError:
log.warning(no_setup_msg)
log.warning(
no_setup_msg.format(vnc_sockaddr=client.conf)
)
return False
try:
i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
log.exception(no_setup_msg)
return False
# XXX, Xorg only workaround..
# TODO? remove now that we have `pyvnc`?
# if vnc_host not in {
# 'localhost',
# '127.0.0.1',
# }:
# focussed, matches = i3ipc_fin_wins_titled()
# if not matches:
# log.warning(
# no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
# )
# return False
# else:
# try_xdo_manual(vnc_sockaddr)
# localhost but no vnc-client or it borked..
else:
try_xdo_manual(client)
case 'i3ipc_xdotool':
i3ipc_xdotool_manual_click_hack()
try_xdo_manual(client)
# i3ipc_xdotool_manual_click_hack()
case _ as tech:
raise RuntimeError(f'{tech} is not supported for reset tech!?')
@ -149,21 +180,66 @@ async def data_reset_hack(
async def vnc_click_hack(
host: str,
port: int,
reset_type: str = 'data'
client: Client,
reset_type: str = 'data',
pw: str|None = None,
) -> None:
'''
Reset the data or network connection for the VNC attached
ib gateway using magic combos.
ib-gateway using a (magic) keybinding combo.
A vnc-server password can be set either by an input `pw` param or
set in the client's config with the latter loaded from the user's
`brokers.toml` in a vnc-addrs-port-mapping section,
.. code:: toml
[ib.vnc_addrs]
4002 = {host = 'localhost', port = 5900, pw = 'doggy'}
'''
api_port: str = str(client.ib.client.port)
conf: dict = client.conf
vnc_addrs: dict[int, tuple] = conf.get('vnc_addrs')
if not vnc_addrs:
return None
addr_entry: dict|tuple = vnc_addrs.get(
api_port,
('localhost', 5900) # a typical default
)
if pw is None:
match addr_entry:
case (
host,
port,
):
pass
case {
'host': host,
'port': port,
'pw': pw
}:
pass
case _:
raise ValueError(
f'Invalid `ib.vnc_addrs` entry ?\n'
f'{addr_entry!r}\n'
)
try:
import asyncvnc
from pyvnc import (
AsyncVNCClient,
VNCConfig,
Point,
MOUSE_BUTTON_LEFT,
)
except ModuleNotFoundError:
log.warning(
"In order to leverage `piker`'s built-in data reset hacks, install "
"the `asyncvnc` project: https://github.com/barneygale/asyncvnc"
"the `pyvnc` project: https://github.com/regulad/pyvnc.git"
)
return
@ -174,24 +250,79 @@ async def vnc_click_hack(
'connection': 'r'
}[reset_type]
async with asyncvnc.connect(
host,
port=port,
# TODO: doesn't work see:
# https://github.com/barneygale/asyncvnc/issues/7
# password='ibcansmbz',
) as client:
# move to middle of screen
# 640x1800
client.mouse.move(
x=500,
y=500,
with tractor.devx.open_crash_handler():
client = await AsyncVNCClient.connect(
VNCConfig(
host=host,
port=port,
password=pw,
)
)
client.mouse.click()
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
async with client:
# move to middle of screen
# 640x1800
await client.move(
Point(
500,
500,
)
)
# ensure the ib-gw window is active
await client.click(MOUSE_BUTTON_LEFT)
# send the hotkeys combo B)
await client.press('Ctrl', 'Alt', key) # keys are stacked
def i3ipc_fin_wins_titled(
titles: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
# !TODO, remote vnc instance
# -[ ] something in title (or other Con-props) that indicates
# this is explicitly for ibrk sw?
# |_[ ] !can use modden spawn eventually!
'TigerVNC',
# 'vncviewer', # the terminal..
],
) -> tuple[
i3ipc.Con, # orig focussed win
list[tuple[str, i3ipc.Con]], # matching wins by title
]:
'''
Attempt to find a local-DE window titled with an entry in
`titles`.
If found deliver the current focussed window and all matching
`i3ipc.Con`s in a list.
'''
import i3ipc
ipc = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
tree = ipc.get_tree()
focussed: i3ipc.Con = tree.find_focused()
matches: list[i3ipc.Con] = []
for name in titles:
results = tree.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
matches.append((
name,
con,
))
return (
focussed,
matches,
)
def i3ipc_xdotool_manual_click_hack() -> None:
@ -199,65 +330,46 @@ def i3ipc_xdotool_manual_click_hack() -> None:
Do the data reset hack but expecting a local X-window using `xdotool`.
'''
import i3ipc
i3 = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
t = i3.get_tree()
orig_win_id = t.find_focused().window
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
]
focussed, matches = i3ipc_fin_wins_titled()
orig_win_id = focussed.window
try:
for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
for name, con in matches:
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,
# move mouse to bottom left of window (where
# there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
# move mouse to bottom left of window (where
# there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',
# hackzorzes
'key', key_combo,
],
timeout=timeout,
)
# hackzorzes
'key', key_combo,
],
timeout=timeout,
)
# re-activate and focus original window
subprocess.call([
@ -267,3 +379,99 @@ def i3ipc_xdotool_manual_click_hack() -> None:
])
except subprocess.TimeoutExpired:
log.exception('xdotool timed out?')
def is_current_time_in_range(
start_dt: datetime,
end_dt: datetime,
) -> bool:
'''
Check if current time is within the datetime range.
Use any/the-same timezone as provided by `start_dt.tzinfo` value
in the range.
'''
now: datetime = datetime.now(start_dt.tzinfo)
return start_dt <= now <= end_dt
# TODO, put this into `._util` and call it from here!
#
# NOTE, this was generated by @guille from a gpt5 prompt
# and was originally thot to be needed before learning about
# `ib_insync.contract.ContractDetails._parseSessions()` and
# it's downstream meths..
#
# This is still likely useful to keep for now to parse the
# `.tradingHours: str` value manually if we ever decide
# to move off `ib_async` and implement our own `trio`/`anyio`
# based version Bp
#
# >attempt to parse the retarted ib "time stampy thing" they
# >do for "venue hours" with this.. written by
# >gpt5-"thinking",
#
def parse_trading_hours(
spec: str,
tz: TzInfo|None = None
) -> dict[
date,
tuple[datetime, datetime]
]|None:
'''
Parse venue hours like:
'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...'
Returns `dict[date] = (open_dt, close_dt)` or `None` if
closed.
'''
if (
not isinstance(spec, str)
or
not spec
):
raise ValueError('spec must be a non-empty string')
out: dict[
date,
tuple[datetime, datetime]
]|None = {}
for part in (p.strip() for p in spec.split(';') if p.strip()):
if part.endswith(':CLOSED'):
day_s, _ = part.split(':', 1)
d = datetime.strptime(day_s, '%Y%m%d').date()
out[d] = None
continue
try:
start_s, end_s = part.split('-', 1)
start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M')
end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M')
except ValueError as exc:
raise ValueError(f'invalid segment: {part}') from exc
if tz is not None:
start_dt = start_dt.replace(tzinfo=tz)
end_dt = end_dt.replace(tzinfo=tz)
out[start_dt.date()] = (start_dt, end_dt)
return out
# ORIG desired usage,
#
# TODO, for non-drunk tomorrow,
# - call above fn and check that `output[today] is not None`
# trading_hrs: dict = parse_trading_hours(
# details.tradingHours
# )
# liq_hrs: dict = parse_trading_hours(
# details.liquidHours
# )

View File

@ -48,6 +48,7 @@ from bidict import bidict
import trio
import tractor
from tractor import to_asyncio
from tractor import trionics
from pendulum import (
from_timestamp,
DateTime,
@ -333,15 +334,15 @@ class Client:
fqme: str,
# EST in ISO 8601 format is required... below is EPOCH
start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00",
end_dt: datetime | str = "",
start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00",
end_dt: datetime|str = "",
# ohlc sample period in seconds
sample_period_s: int = 1,
# optional "duration of time" equal to the
# length of the returned history frame.
duration: str | None = None,
duration: str|None = None,
**kwargs,
@ -715,8 +716,8 @@ class Client:
async def find_contracts(
self,
pattern: str | None = None,
contract: Contract | None = None,
pattern: str|None = None,
contract: Contract|None = None,
qualify: bool = True,
err_on_qualify: bool = True,
@ -861,7 +862,7 @@ class Client:
self,
fqme: str,
) -> datetime | None:
) -> datetime|None:
'''
Return the first datetime stamp for `fqme` or `None`
on request failure.
@ -917,7 +918,7 @@ class Client:
tries: int = 100,
raise_on_timeout: bool = False,
) -> Ticker | None:
) -> Ticker|None:
'''
Return a single (snap) quote for symbol.
@ -929,7 +930,7 @@ class Client:
ready: ticker.TickerUpdateEvent = ticker.updateEvent
# ensure a last price gets filled in before we deliver quote
timeouterr: Exception | None = None
timeouterr: Exception|None = None
warnset: bool = False
for _ in range(tries):
@ -943,6 +944,7 @@ class Client:
)
if tkr:
break
except TimeoutError as err:
timeouterr = err
await asyncio.sleep(0.01)
@ -951,7 +953,9 @@ class Client:
else:
if not warnset:
log.warning(
f'Quote req timed out..maybe venue is closed?\n'
f'Quote req timed out..\n'
f'Maybe the venue is closed?\n'
f'\n'
f'{asdict(contract)}'
)
warnset = True
@ -963,9 +967,11 @@ class Client:
)
break
else:
if timeouterr and raise_on_timeout:
import pdbp
pdbp.set_trace()
if (
timeouterr
and
raise_on_timeout
):
raise timeouterr
if not warnset:
@ -1362,23 +1368,20 @@ async def load_aio_clients(
async def load_clients_for_trio(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
chan: tractor.to_asyncio.LinkedTaskChannel,
) -> None:
'''
Pure async mngr proxy to ``load_aio_clients()``.
This is a bootstrap entrypoing to call from
a ``tractor.to_asyncio.open_channel_from()``.
This is a bootstrap entrypoint to call from
a `tractor.to_asyncio.open_channel_from()`.
'''
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
to_trio.send_nowait(accts2clients)
chan.started_nowait(accts2clients)
# TODO: maybe a sync event to wait on instead?
await asyncio.sleep(float('inf'))
@ -1391,7 +1394,10 @@ async def open_client_proxies() -> tuple[
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={'target': load_clients_for_trio},
kwargs={
'target': load_clients_for_trio,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access
# TODO: maybe this should be the default in tractor?
@ -1501,7 +1507,7 @@ class MethodProxy:
self,
pattern: str,
) -> dict[str, Any] | trio.Event:
) -> dict[str, Any]|trio.Event:
ev = self.event_table.get(pattern)
@ -1522,23 +1528,22 @@ class MethodProxy:
async def open_aio_client_method_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
chan: tractor.to_asyncio.LinkedTaskChannel,
client: Client,
event_consumers: dict[str, trio.Event],
) -> None:
# sync with `open_client_proxy()` caller
to_trio.send_nowait(client)
chan.started_nowait(client)
# TODO: separate channel for error handling?
client.inline_errors(to_trio)
client.inline_errors(chan)
# relay all method requests to ``asyncio``-side client and deliver
# back results
while not to_trio._closed:
msg: tuple[str, dict] | dict | None = await from_trio.get()
while not chan._to_trio._closed: # <- TODO, better check like `._web_bs`?
msg: tuple[str, dict]|dict|None = await chan.get()
match msg:
case None: # termination sentinel
log.info('asyncio `Client` method-proxy SHUTDOWN!')
@ -1551,7 +1556,7 @@ async def open_aio_client_method_relay(
try:
resp = await meth(**kwargs)
# echo the msg back
to_trio.send_nowait({'result': resp})
chan.send_nowait({'result': resp})
except (
RequestError,
@ -1559,10 +1564,10 @@ async def open_aio_client_method_relay(
# TODO: relay all errors to trio?
# BaseException,
) as err:
to_trio.send_nowait({'exception': err})
chan.send_nowait({'exception': err})
case {'error': content}:
to_trio.send_nowait({'exception': content})
chan.send_nowait({'exception': content})
case _:
raise ValueError(f'Unhandled msg {msg}')
@ -1584,7 +1589,8 @@ async def open_client_proxy(
event_consumers=event_table,
) as (first, chan),
trio.open_nursery() as relay_n,
trionics.collapse_eg(), # loose-ify
trio.open_nursery() as relay_tn,
):
assert isinstance(first, Client)
@ -1624,7 +1630,7 @@ async def open_client_proxy(
continue
relay_n.start_soon(relay_events)
relay_tn.start_soon(relay_events)
yield proxy

View File

@ -34,6 +34,7 @@ import trio
from trio_typing import TaskStatus
import tractor
from tractor.to_asyncio import LinkedTaskChannel
from tractor import trionics
from ib_insync.contract import (
Contract,
)
@ -116,7 +117,11 @@ def pack_position(
symbol=fqme,
currency=con.currency,
size=float(pos.position),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
avg_price=(
float(pos.avgCost)
/
float(con.multiplier or 1.0)
),
),
)
@ -357,6 +362,10 @@ async def update_and_audit_pos_msg(
size=ibpos.position,
avg_price=pikerpos.ppu,
# XXX ensures matching even if multiple venue-names
# in `.bs_fqme`, likely from txn records..
bs_mktid=mkt.bs_mktid,
)
ibfmtmsg: str = pformat(ibpos._asdict())
@ -407,7 +416,7 @@ async def update_and_audit_pos_msg(
# TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price:
log.warning(
log.debug(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n'
'---------------------------\n'
@ -425,7 +434,8 @@ async def aggr_open_orders(
) -> None:
'''
Collect all open orders from client and fill in `order_msgs: list`.
Collect all open orders from client and fill in `order_msgs:
list`.
'''
trades: list[Trade] = client.ib.openTrades()
@ -546,7 +556,10 @@ async def open_trade_dialog(
),
# TODO: do this as part of `open_account()`!?
open_symcache('ib', only_from_memcache=True) as symcache,
open_symcache(
'ib',
only_from_memcache=True,
) as symcache,
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
@ -554,8 +567,10 @@ async def open_trade_dialog(
ledgers: dict[str, TransactionLedger] = {}
tables: dict[str, Account] = {}
order_msgs: list[Status] = []
conf = get_config()
accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse
conf: dict = get_config()
accounts_def_inv: bidict[str, str] = bidict(
conf['accounts']
).inverse
with (
ExitStack() as lstack,
@ -705,7 +720,11 @@ async def open_trade_dialog(
# client-account and build out position msgs to deliver to
# EMS.
for acctid, acnt in tables.items():
active_pps, closed_pps = acnt.dump_active()
active_pps: dict[str, Position]
(
active_pps,
closed_pps,
) = acnt.dump_active()
for pps in [active_pps, closed_pps]:
piker_pps: list[Position] = list(pps.values())
@ -721,6 +740,7 @@ async def open_trade_dialog(
)
if ibpos:
bs_mktid: str = str(ibpos.contract.conId)
msg = await update_and_audit_pos_msg(
acctid,
pikerpos,
@ -738,7 +758,7 @@ async def open_trade_dialog(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
)
log.error(logmsg)
log.debug(logmsg)
await ctx.started((
all_positions,
@ -747,21 +767,22 @@ async def open_trade_dialog(
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
trionics.collapse_eg(),
trio.open_nursery() as tn,
):
# relay existing open orders to ems
for msg in order_msgs:
await ems_stream.send(msg)
for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await n.start(
trade_event_stream: LinkedTaskChannel = await tn.start(
open_trade_event_stream,
client,
)
# start order request handler **before** local trades
# event loop
n.start_soon(
tn.start_soon(
handle_order_requests,
ems_stream,
accounts_def,
@ -769,7 +790,7 @@ async def open_trade_dialog(
)
# allocate event relay tasks for each client connection
n.start_soon(
tn.start_soon(
deliver_trade_events,
trade_event_stream,
@ -1241,32 +1262,47 @@ async def deliver_trade_events(
# never relay errors for non-broker related issues
# https://interactivebrokers.github.io/tws-api/message_codes.html
code: int = err['error_code']
if code in {
200, # uhh
reason: str = err['reason']
reqid: str = str(err['reqid'])
# "Warning:" msg codes,
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# - 2109: 'Outside Regular Trading Hours'
if 'Warning:' in reason:
log.warning(
f'Order-API-warning: {code!r}\n'
f'reqid: {reqid!r}\n'
f'\n'
f'{pformat(err)}\n'
# ^TODO? should we just print the `reason`
# not the full `err`-dict?
)
continue
# XXX known special (ignore) cases
elif code in {
200, # uhh.. ni idea
# hist pacing / connectivity
162,
165,
# WARNING codes:
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# Attribute 'Outside Regular Trading Hours' is
# " 'ignored based on the order type and
# destination. PlaceOrder is now ' 'being
# processed.',
2109,
# XXX: lol this isn't even documented..
# 'No market data during competing live session'
1669,
}:
log.error(
f'Order-API-error which is non-cancel-causing ?!\n'
f'\n'
f'{pformat(err)}\n'
)
continue
reqid: str = str(err['reqid'])
reason: str = err['reason']
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
log.error(
f'TWS external order error ??\n'
f'{pformat(err)}\n'
)
flow: dict = dict(
flows.get(reqid)

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -13,10 +13,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
"""
'''
Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio`
via "infected-asyncio-mode".
'''
from __future__ import annotations
import asyncio
from contextlib import (
@ -26,7 +28,6 @@ from dataclasses import asdict
from datetime import datetime
from functools import partial
from pprint import pformat
from math import isnan
import time
from typing import (
Any,
@ -40,7 +41,6 @@ import numpy as np
from pendulum import (
now,
from_timestamp,
# DateTime,
Duration,
duration as mk_duration,
)
@ -69,7 +69,10 @@ from .api import (
Contract,
RequestError,
)
from ._util import data_reset_hack
from ._util import (
data_reset_hack,
is_current_time_in_range,
)
from .symbols import get_mkt_info
if TYPE_CHECKING:
@ -184,7 +187,8 @@ async def open_history_client(
if (
start_dt
and start_dt.timestamp() == 0
and
start_dt.timestamp() == 0
):
await tractor.pause()
@ -203,14 +207,16 @@ async def open_history_client(
):
count += 1
mean += latency / count
print(
log.debug(
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}'
)
# could be trying to retreive bars over weekend
if out is None:
log.error(f"Can't grab bars starting at {end_dt}!?!?")
log.error(
f"No bars starting at {end_dt!r} !?!?"
)
if (
end_dt
and head_dt
@ -285,8 +291,9 @@ _pacing: str = (
async def wait_on_data_reset(
proxy: MethodProxy,
reset_type: str = 'data',
timeout: float = 16, # float('inf'),
timeout: float = 16,
task_status: TaskStatus[
tuple[
@ -295,29 +302,47 @@ async def wait_on_data_reset(
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
Wait on a (global-ish) "data-farm" event to be emitted
by the IB api server.
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
Allows syncing to reconnect event-messages emitted on the API
console, such as:
- 'HMDS data farm connection is OK:ushmds'
- 'Market data farm is connecting:usfuture'
- 'Market data farm connection is OK:usfuture'
Deliver a `(cs, done: Event)` pair to the caller to support it
waiting or cancelling the associated "data-reset-request";
normally a manual data-reset-req is expected to be the cause and
thus trigger such events (such as our click-hack-magic from
`.ib._util`).
'''
# ?TODO, do we need a task-lock around this method?
#
# register for an API "status event" wrapped for `trio`-sync.
hist_ev: trio.Event = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
# TODO: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
#
# ^TODO: other event-messages we might want to support waiting-for
# but i wasn't able to get reliable..
#
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
client: Client = proxy._aio_ns
done = trio.Event()
with trio.move_on_after(timeout) as cs:
task_status.started((cs, done))
log.warning(
@ -396,8 +421,9 @@ async def get_bars(
bool, # timed out hint
]:
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
Request-n-retrieve historical data frames from a `trio.Task`
using a `MethoProxy` to query the `asyncio`-side's
`.ib.api.Client` methods.
'''
global _data_resetter_task, _failed_resets
@ -587,7 +613,7 @@ async def get_bars(
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
data_cs, reset_done = await tn.start(
partial(
wait_on_data_reset,
proxy,
@ -607,11 +633,14 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
tn.start_soon(query)
# start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
@ -631,7 +660,7 @@ async def get_bars(
unset_resetter: bool = True
# spawn new data reset task
data_cs, reset_done = await nurse.start(
data_cs, reset_done = await tn.start(
partial(
wait_on_data_reset,
proxy,
@ -653,14 +682,12 @@ async def get_bars(
)
# per-actor cache of inter-eventloop-chans
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
async def _setup_quote_stream(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
chan: tractor.to_asyncio.LinkedTaskChannel,
symbol: str,
opts: tuple[int] = (
'375', # RT trade volume (excludes utrades)
@ -678,10 +705,13 @@ async def _setup_quote_stream(
) -> trio.abc.ReceiveChannel:
'''
Stream a ticker using the std L1 api.
Stream L1 quotes via the `Ticker.updateEvent.connect(push)`
callback API by registering a `push` callback which simply
`chan.send_nowait()`s quote msgs back to the calling
parent-`trio.Task`-side.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY
and is thus run via `tractor.to_asyncio.open_channel_from()`.
'''
global _quote_streams
@ -689,37 +719,79 @@ async def _setup_quote_stream(
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
to_trio.send_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe?
# XXX since this is an `asyncio.Task`, we must use
# tractor.pause_from_sync()
caccount_name, client = get_preferred_data_client(accts2clients)
contract = (
contract
or
(await client.find_contract(symbol))
)
chan.started_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(
contract,
','.join(opts),
)
maybe_exc: BaseException|None = None
handler_tries: int = 0
aio_task: asyncio.Task = asyncio.current_task()
# ?TODO? this API is batch-wise and quite slow-af but,
# - seems to be 5s updates?
# - maybe we could use it for backchecking?
#
# ticker: Ticker = client.ib.reqTickByTickData(
# contract, 'Last',
# )
# # define a simple queue push routine that streams quote packets
# # to trio over the ``to_trio`` memory channel.
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
# define a very naive queue-pushing callback that relays
# quote-packets directly the calling (parent) `trio.Task`.
# Ensure on teardown we cancel the feed via their cancel API.
#
def teardown():
'''
Disconnect our `push`-er callback and cancel the data-feed
for `contract`.
'''
nonlocal maybe_exc
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
report: str = f'Disconnected mkt-data for {symbol!r} due to '
if maybe_exc is not None:
report += (
'error,\n'
f'{maybe_exc!r}\n'
)
log.error(report)
else:
report += (
'cancellation.\n'
)
log.cancel(report)
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
def push(t: Ticker) -> None:
"""
Push quotes to trio task.
def push(
t: Ticker,
tries_before_raise: int = 6,
) -> None:
'''
Push quotes verbatim to parent-side `trio.Task`.
"""
# log.debug(t)
'''
nonlocal maybe_exc, handler_tries
# log.debug(f'new IB quote: {t}\n')
try:
to_trio.send_nowait(t)
chan.send_nowait(t)
# XXX TODO XXX replicate in `tractor` tests
# as per `CancelledError`-handler notes below!
# assert 0
except (
trio.BrokenResourceError,
@ -734,35 +806,107 @@ async def _setup_quote_stream(
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
teardown()
except trio.WouldBlock:
# log.warning(
# f'channel is blocking symbol feed for {symbol}?'
# f'\n{to_trio.statistics}'
# )
pass
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
# # with log msgs
# pass
# for slow debugging purposes to avoid clobbering prompt
# with log msgs
except trio.WouldBlock:
log.exception(
f'Asyncio->Trio `chan.send_nowait()` blocked !?\n'
f'\n'
f'{chan._to_trio.statistics()}\n'
)
# ?TODO, handle re-connection attempts?
except BaseException as _berr:
berr = _berr
if handler_tries >= tries_before_raise:
# breakpoint()
maybe_exc = _berr
# task.set_exception(berr)
aio_task.cancel(msg=berr.args)
raise berr
else:
handler_tries += 1
log.exception(
f'Failed to push ticker quote !?\n'
f'handler_tries={handler_tries!r}\n'
f'ticker: {t!r}\n'
f'\n'
f'{chan._to_trio.statistics()}\n'
f'\n'
f'CAUSE: {berr}\n'
)
ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
finally:
teardown()
# return from_aio
# XXX, for debug.. TODO? can we rm again?
#
# tractor.pause_from_sync()
# while True:
# await asyncio.sleep(1.6)
# if ticker.ticks:
# log.debug(
# f'ticker.ticks = \n'
# f'{ticker.ticks}\n'
# )
# else:
# log.warning(
# 'UHH no ticker.ticks ??'
# )
# XXX TODO XXX !?!?
# apparently **without this handler** and the subsequent
# re-raising of `maybe_exc from _taskc` cancelling the
# `aio_task` from the `push()`-callback will cause a very
# strange chain of exc raising that breaks alll sorts of
# downstream callers, tasks and remote-actor tasks!?
#
# -[ ] we need some lowlevel reproducting tests to replicate
# those worst-case scenarios in `tractor` core!!
# -[ ] likely we should factor-out the `tractor.to_asyncio`
# attempts at workarounds in `.translate_aio_errors()`
# for failed `asyncio.Task.set_exception()` to either
# call `aio_task.cancel()` and/or
# `aio_task._fut_waiter.set_exception()` to a re-useable
# toolset in something like a `.to_asyncio._utils`??
#
except asyncio.CancelledError as _taskc:
if maybe_exc is not None:
raise maybe_exc from _taskc
raise _taskc
except BaseException as _berr:
# stash any crash cause for reporting in `teardown()`
maybe_exc = _berr
raise _berr
finally:
# always disconnect our `push()` and cancel the
# ib-"mkt-data-feed".
teardown()
@acm
async def open_aio_quote_stream(
symbol: str,
contract: Contract | None = None,
contract: Contract|None = None,
) -> trio.abc.ReceiveStream:
) -> (
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
'''
Open a real-time `Ticker` quote stream from an `asyncio.Task`
spawned via `tractor.to_asyncio.open_channel_from()`, deliver the
inter-event-loop channel to the `trio.Task` caller and cache it
globally for re-use.
'''
from tractor.trionics import broadcast_receiver
global _quote_streams
@ -778,6 +922,7 @@ async def open_aio_quote_stream(
yield from_aio
return
from_aio: tractor.to_asyncio.LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream,
symbol=symbol,
@ -787,6 +932,10 @@ async def open_aio_quote_stream(
assert contract
# TODO? de-reg on teardown of last consumer task?
# -> why aren't we using `.trionics.maybe_open_context()`
# here again?? (we are in `open_client_proxies()` tho?)
#
# cache feed for later consumers
_quote_streams[symbol] = from_aio
@ -801,7 +950,12 @@ def normalize(
calc_price: bool = False
) -> dict:
'''
Translate `ib_async`'s `Ticker.ticks` values to a `piker`
normalized `dict` form for transmit to downstream `.data` layer
consumers.
'''
# check for special contract types
con = ticker.contract
fqme, calc_price = con2fqme(con)
@ -820,7 +974,7 @@ def normalize(
tbt = ticker.tickByTicks
if tbt:
print(f'tickbyticks:\n {ticker.tickByTicks}')
log.info(f'tickbyticks:\n {ticker.tickByTicks}')
ticker.ticks = new_ticks
@ -856,27 +1010,39 @@ def normalize(
return data
# ?TODO? feels like this task-fn could be factored to reduce some
# indentation levels?
# -[ ] the reconnect while loop on ib-gw "data farm connection.."s
# -[ ] everything embedded under the `async with aclosing(stream):`
# as the "meat" of the quote delivery once the connection is
# stable.
#
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# TODO? we need to hook into the `ib_async` logger like
# we can with i3ipc from modden!
# loglevel: str|None = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Stream symbol quotes.
Stream `symbols[0]` quotes back via `send_chan`.
This is a ``trio`` callable routine meant to be invoked
once the brokerd is up.
The `feed_is_live: Event` is set to signal the caller that it can
begin processing msgs from the mem-chan.
'''
# TODO: support multiple subscriptions
sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
sym: str = symbols[0]
log.info(
f'request for real-time quotes\n'
f'sym: {sym!r}\n'
)
init_msgs: list[FeedInit] = []
@ -885,34 +1051,52 @@ async def stream_quotes(
details: ibis.ContractDetails
async with (
open_data_client() as proxy,
# trio.open_nursery() as tn,
):
mkt, details = await get_mkt_info(
sym,
proxy=proxy, # passed to avoid implicit client load
)
# is venue active rn?
venue_is_open: bool = any(
is_current_time_in_range(
start_dt=sesh.start,
end_dt=sesh.end,
)
for sesh in details.tradingSessions()
)
init_msg = FeedInit(mkt_info=mkt)
# NOTE, tell sampler (via config) to skip vlm summing for dst
# assets which provide no vlm data..
if mkt.dst.atype in {
'fiat',
'index',
'commodity',
}:
# tell sampler config that it shouldn't do vlm summing.
init_msg.shm_write_opts['sum_tick_vlm'] = False
init_msg.shm_write_opts['has_vlm'] = False
init_msgs.append(init_msg)
con: Contract = details.contract
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker|None = None
timeout: float = 1.6
with trio.move_on_after(timeout) as quote_cs:
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
# XXX should never happen with this ep right?
# but if so then, more then likely mkt is closed?
if quote_cs.cancelled_caught:
log.warning(
f'First quote req timed out after {timeout!r}s'
)
if first_ticker:
first_quote: dict = normalize(first_ticker)
@ -924,28 +1108,27 @@ async def stream_quotes(
f'{pformat(first_quote)}\n'
)
# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
# type is NOT within the NON-NORMAL-venue set: aka not
# commodities, forex or crypto currencies which CAN
# always return a NaN on a snap quote request during
# normal venue hours. In the case of a closed venue
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
# dst asset venue with a lot of closed operating hours.
and mkt.dst.atype not in {
'commodity',
'fiat',
'crypto',
}
):
# XXX NOTE: whenever we're "outside regular trading hours"
# (only relevant for assets coming from the "legacy markets"
# space) so we basically (from an API/runtime-operational
# perspective) "pretend the feed is live" even if it's
# actually closed.
#
# IOW, we signal to the effective caller (task) that the live
# feed is "already up" but really we're just indicating that
# the OHLCV history can start being loaded immediately by the
# `piker.data`/`.tsp` layers.
#
# XXX, deats: the "pretend we're live" is just done by
# a `feed_is_live.set()` even though nothing is actually live
# Bp
if not venue_is_open:
log.warning(
f'Venue is closed, unable to establish real-time feed.\n'
f'mkt: {mkt!r}\n'
f'\n'
f'first_ticker: {first_ticker}\n'
)
task_status.started((
init_msgs,
first_quote,
@ -956,10 +1139,12 @@ async def stream_quotes(
feed_is_live.set()
# block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to
# terminate this task.
await trio.sleep_forever()
return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed
# ?TODO, we could instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
@ -981,23 +1166,27 @@ async def stream_quotes(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
cs: trio.CancelScope | None = None
cs: trio.CancelScope|None = None
startup: bool = True
iter_quotes: trio.abc.Channel
while (
startup
or cs.cancel_called
or
cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
) as iter_quotes,
):
# ?TODO? can we rm this - particularly for `ib_async`?
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
# first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
@ -1011,8 +1200,8 @@ async def stream_quotes(
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# ??TODO? this seems to be surpressed from the
# traceback in `tractor`?
# assert 0
rt_ev = proxy.status_event(
@ -1021,9 +1210,9 @@ async def stream_quotes(
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
tn.start_soon(reset_on_feed)
async with aclosing(stream):
async with aclosing(iter_quotes):
# if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']:
@ -1038,25 +1227,27 @@ async def stream_quotes(
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await stream.receive()
ticker = await iter_quotes.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
False
# not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
ticker.ticks = []
# ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
@ -1066,15 +1257,24 @@ async def stream_quotes(
log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live
# quotes are now streaming.
# quotes are now active desptie not having
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# last = time.time()
async for ticker in stream:
async for ticker in iter_quotes:
quote = normalize(ticker)
fqme = quote['fqme']
fqme: str = quote['fqme']
log.debug(
f'Sending quote\n'
f'{quote}'
)
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# ticker.ticks = []
# last = time.time()

View File

@ -34,6 +34,7 @@ import urllib.parse
import hashlib
import hmac
import base64
import tractor
import trio
from piker import config
@ -372,8 +373,7 @@ class Client:
# 1658347714, 'status': 'Success'}]}
if xfers:
import tractor
await tractor.pp()
await tractor.pause()
trans: dict[str, Transaction] = {}
for entry in xfers:
@ -501,7 +501,8 @@ class Client:
for xkey, data in resp['result'].items():
# NOTE: always cache in pairs tables for faster lookup
pair = Pair(xname=xkey, **data)
with tractor.devx.maybe_open_crash_handler(): # as bxerr:
pair = Pair(xname=xkey, **data)
# register the above `Pair` structs for all
# key-sets/monikers: a set of 4 (frickin) tables

View File

@ -175,9 +175,8 @@ async def handle_order_requests(
case {
'account': 'kraken.spot' as account,
'action': action,
} if action in {'buy', 'sell'}:
'action': 'buy'|'sell',
}:
# validate
order = BrokerdOrder(**msg)
@ -262,6 +261,12 @@ async def handle_order_requests(
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
# NOTE HOWTO, debug order requests
#
# if 'XRP' in pair:
# await tractor.pause()
await ws.send_msg(req)
# placehold for sanity checking in relay loop
@ -544,7 +549,7 @@ async def open_trade_dialog(
# to be reloaded.
balances: dict[str, float] = await client.get_balances()
verify_balances(
await verify_balances(
acnt,
src_fiat,
balances,
@ -1085,6 +1090,8 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
# if tractor._state.debug_mode():
# await tractor.pause()
symbol: str = 'N/A'
if chain := apiflows.get(reqid):

View File

@ -21,7 +21,6 @@ Symbology defs and search.
from decimal import Decimal
import tractor
from rapidfuzz import process as fuzzy
from piker._cacheables import (
async_lifo_cache,
@ -41,8 +40,13 @@ from piker.accounting._mktinfo import (
)
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct):
'''
A tradable asset pair as schema-defined by,
https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs
'''
xname: str # idiotic bs_mktid equiv i guess?
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
@ -53,7 +57,6 @@ class Pair(Struct):
lot: str # volume lot size
cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume
@ -79,6 +82,7 @@ class Pair(Struct):
tick_size: float # min price step size
status: str
costmin: str|None = None # XXX, only some mktpairs?
short_position_limit: float = 0
long_position_limit: float = float('inf')

View File

@ -37,6 +37,12 @@ import tractor
from async_generator import asynccontextmanager
import numpy as np
import wrapt
# TODO, port to `httpx`/`trio-websocket` whenver i get back to
# writing a proper ws-api streamer for this backend (since the data
# feeds are free now) as per GH feat-req:
# https://github.com/pikers/piker/issues/509
#
import asks
from ..calc import humanize, percent_change

View File

@ -25,7 +25,10 @@ from typing import TYPE_CHECKING
import trio
import tractor
from tractor.trionics import broadcast_receiver
from tractor.trionics import (
broadcast_receiver,
collapse_eg,
)
from ._util import (
log, # sub-sys logger
@ -285,8 +288,11 @@ async def open_ems(
client._ems_stream = trades_stream
# start sync code order msg delivery task
async with trio.open_nursery() as n:
n.start_soon(
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
tn.start_soon(
relay_orders_from_sync_code,
client,
fqme,
@ -302,4 +308,4 @@ async def open_ems(
)
# stop the sync-msg-relay task on exit.
n.cancel_scope.cancel()
tn.cancel_scope.cancel()

View File

@ -42,6 +42,7 @@ from bidict import bidict
import trio
from trio_typing import TaskStatus
import tractor
from tractor import trionics
from ._util import (
log, # sub-sys logger
@ -161,7 +162,7 @@ async def clear_dark_triggers(
router: Router,
brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
quote_stream: tractor.MsgStream,
broker: str,
fqme: str,
@ -177,6 +178,7 @@ async def clear_dark_triggers(
'''
# XXX: optimize this for speed!
# TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this!
# - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False
@ -499,7 +501,7 @@ class Router(Struct):
'''
# setup at actor spawn time
nursery: trio.Nursery
_tn: trio.Nursery
# broker to book map
books: dict[str, DarkBook] = {}
@ -669,7 +671,7 @@ class Router(Struct):
# dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no
# client is connected.
self.nursery.start_soon(
self._tn.start_soon(
clear_dark_triggers,
self,
relay.brokerd_stream,
@ -692,7 +694,7 @@ class Router(Struct):
# spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon(
self._tn.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
@ -766,10 +768,12 @@ async def _setup_persistent_emsd(
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = Router(nursery=service_nursery)
# open a root "service task-nursery" for the `emsd`-actor
async with (
trionics.collapse_eg(),
trio.open_nursery() as tn
):
_router = Router(_tn=tn)
# TODO: send back the full set of persistent
# orders/execs?
@ -1518,7 +1522,7 @@ async def maybe_open_trade_relays(
loglevel: str = 'info',
):
fqme, relay, feed, client_ready = await _router.nursery.start(
fqme, relay, feed, client_ready = await _router._tn.start(
_router.open_trade_relays,
fqme,
exec_mode,

View File

@ -134,67 +134,65 @@ def pikerd(
Spawn the piker broker-daemon.
'''
from tractor.devx import maybe_open_crash_handler
with maybe_open_crash_handler(pdb=pdb):
log = get_console_log(loglevel, name='cli')
# from tractor.devx import maybe_open_crash_handler
# with maybe_open_crash_handler(pdb=False):
log = get_console_log(loglevel, name='cli')
if pdb:
log.warning((
"\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n"
"\n"
if pdb:
log.warning((
"\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n"
"\n"
))
# service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
conf, _ = config.load(
conf_name='conf',
)
network: dict = conf.get('network')
if (
network is None
and not maddr
):
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
# service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
from .. import service
conf, _ = config.load(
conf_name='conf',
)
network: dict = conf.get('network')
if (
network is None
and not maddr
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
# enable_transports=['uds'],
enable_transports=['tcp'],
) as service_mngr,
):
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
await trio.sleep_forever()
else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
from .. import service
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
) as service_mngr, # normally delivers a ``Services`` handle
# AsyncExitStack() as stack,
):
assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
await trio.sleep_forever()
trio.run(main)
trio.run(main)
@click.group(context_settings=config._context_defaults)
@ -309,6 +307,10 @@ def services(config, tl, ports):
if not ports:
ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)
async def list_services():
nonlocal host
async with (
@ -316,16 +318,18 @@ def services(config, tl, ports):
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
host=host,
port=ports[0]
tractor.get_registry(
addr=addr,
) as portal
):
registry = await portal.run_from_ns('self', 'get_registry')
registry = await portal.run_from_ns(
'self',
'get_registry',
)
json_d = {}
for key, socket in registry.items():
host, port = socket
json_d[key] = f'{host}:{port}'
json_d[key] = f'{socket}'
click.echo(f"{colorize_json(json_d)}")
trio.run(list_services)

View File

@ -27,7 +27,6 @@ from functools import partial
from types import ModuleType
from typing import (
Any,
Optional,
Callable,
AsyncContextManager,
AsyncGenerator,
@ -35,6 +34,7 @@ from typing import (
)
import json
import tractor
import trio
from trio_typing import TaskStatus
from trio_websocket import (
@ -167,7 +167,7 @@ async def _reconnect_forever(
async def proxy_msgs(
ws: WebSocketConnection,
pcs: trio.CancelScope, # parent cancel scope
rent_cs: trio.CancelScope, # parent cancel scope
):
'''
Receive (under `timeout` deadline) all msgs from from underlying
@ -192,7 +192,7 @@ async def _reconnect_forever(
f'{url} connection bail with:'
)
await trio.sleep(0.5)
pcs.cancel()
rent_cs.cancel()
# go back to reonnect loop in parent task
return
@ -204,7 +204,7 @@ async def _reconnect_forever(
f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n'
)
pcs.cancel()
rent_cs.cancel()
# go back to reonnect loop in parent task
return
@ -228,7 +228,12 @@ async def _reconnect_forever(
nobsws._connected = trio.Event()
task_status.started()
while not snd._closed:
mc_state: trio._channel.MemoryChannelState = snd._state
while (
mc_state.open_receive_channels > 0
and
mc_state.open_send_channels > 0
):
log.info(
f'{src_mod}\n'
f'{url} trying (RE)CONNECT'
@ -237,10 +242,11 @@ async def _reconnect_forever(
ws: WebSocketConnection
try:
async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
cs = nobsws._cs = n.cancel_scope
cs = nobsws._cs = tn.cancel_scope
nobsws._ws = ws
log.info(
f'{src_mod}\n'
@ -248,7 +254,7 @@ async def _reconnect_forever(
)
# begin relay loop to forward msgs
n.start_soon(
tn.start_soon(
proxy_msgs,
ws,
cs,
@ -262,7 +268,7 @@ async def _reconnect_forever(
# TODO: should we return an explicit sub-cs
# from this fixture task?
await n.start(
await tn.start(
open_fixture,
fixture,
nobsws,
@ -272,11 +278,23 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above.
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
except (
HandshakeError,
ConnectionRejected,
):
log.exception('Retrying connection')
await trio.sleep(0.5) # throttle
# ws & nursery block ends
except BaseException as _berr:
berr = _berr
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
raise berr
#|_ws & nursery block ends
nobsws._connected = trio.Event()
if cs.cancelled_caught:
log.cancel(
@ -324,21 +342,25 @@ async def open_autorecon_ws(
connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for (re)requesting
subscriptions without requiring streaming setup code to rerun.
entered/exitted around each connection reset; eg. for
(re)requesting subscriptions without requiring streaming setup
code to rerun.
'''
snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
nobsws = NoBsWs(
url,
rcv,
msg_recv_timeout=msg_recv_timeout,
)
await n.start(
await tn.start(
partial(
_reconnect_forever,
url,
@ -351,11 +373,10 @@ async def open_autorecon_ws(
await nobsws._connected.wait()
assert nobsws._cs
assert nobsws.connected()
try:
yield nobsws
finally:
n.cancel_scope.cancel()
tn.cancel_scope.cancel()
'''
@ -368,8 +389,8 @@ of msgs over a `NoBsWs`.
class JSONRPCResult(Struct):
id: int
jsonrpc: str = '2.0'
result: Optional[dict] = None
error: Optional[dict] = None
result: dict|None = None
error: dict|None = None
@acm

View File

@ -39,6 +39,7 @@ from typing import (
AsyncContextManager,
Awaitable,
Sequence,
TYPE_CHECKING,
)
import trio
@ -75,6 +76,10 @@ from ._sampling import (
uniform_rate_send,
)
if TYPE_CHECKING:
from tractor._addr import Address
from tractor.msg.types import Aid
class Sub(Struct, frozen=True):
'''
@ -725,7 +730,10 @@ class Feed(Struct):
async for msg in stream:
await tx.send(msg)
async with trio.open_nursery() as nurse:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
# spawn a relay task for each stream so that they all
# multiplex to a common channel.
for brokername in mods:
@ -899,19 +907,19 @@ async def open_feed(
feed.portals[brokermod] = portal
# fill out "status info" that the UI can show
host, port = portal.channel.raddr
if host == '127.0.0.1':
host = 'localhost'
chan: tractor.Channel = portal.chan
raddr: Address = chan.raddr
aid: Aid = chan.aid
# TAG_feed_status_update
feed.status.update({
'actor_name': portal.channel.uid[0],
'host': host,
'port': port,
'actor_id': aid,
'actor_short_id': f'{aid.name}@{aid.pid}',
'ipc': chan.raddr.proto_key,
'ipc_addr': raddr,
'hist_shm': 'NA',
'rt_shm': 'NA',
'throttle_rate': tick_throttle,
'throttle_hz': tick_throttle,
})
# feed.status.update(init_msg.pop('status', {}))
# (allocate and) connect to any feed bus for this broker
bus_ctxs.append(

View File

@ -498,6 +498,7 @@ async def cascade(
func_name: str = func.__name__
async with (
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
trio.open_nursery() as tn,
):
# TODO: might be better to just make a "restart" method where

View File

@ -107,17 +107,22 @@ async def open_piker_runtime(
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
# passed through to `open_root_actor`
registry_addrs=registry_addrs,
name=name,
start_method=start_method,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# XXX NOTE MEMBER DAT der's a perf hit yo!!
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback=True,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
hide_tb=False,
**tractor_kwargs,
) as actor,
@ -200,7 +205,8 @@ async def open_pikerd(
reg_addrs,
),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_tn,
):
for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
@ -211,7 +217,7 @@ async def open_pikerd(
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.service_n = service_tn
Services.debug_mode = debug_mode
try:
@ -221,7 +227,7 @@ async def open_pikerd(
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
service_tn.cancel_scope.cancel()
# TODO: do we even need this?
@ -256,7 +262,10 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> tractor._portal.Portal | ClassVar[Services]:
) -> (
tractor._portal.Portal
|ClassVar[Services]
):
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
@ -281,10 +290,11 @@ async def maybe_open_pikerd(
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or [_default_reg_addr]
or
[_default_reg_addr]
)
pikerd_portal: tractor.Portal | None
pikerd_portal: tractor.Portal|None
async with (
open_piker_runtime(
name=query_name,

View File

@ -28,6 +28,7 @@ from contextlib import (
)
import tractor
from trio.lowlevel import current_task
from ._util import (
log, # sub-sys logger
@ -70,69 +71,84 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name]
await lock.acquire()
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
) as portal:
if portal is not None:
lock.release()
yield portal
return
try:
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
) as portal:
if portal is not None:
lock.release()
yield portal
return
log.warning(
f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..'
)
log.warning(
f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..'
)
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**pikerd_kwargs,
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**pikerd_kwargs,
) as pikerd_portal:
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
except BaseException as _err:
err = _err
if (
lock.locked()
and
lock.statistics().owner is current_task()
):
log.exception(
f'Releasing stale lock after crash..?'
f'{err!r}\n'
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
raise err
async def spawn_emsd(

View File

@ -109,7 +109,7 @@ class Services:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context

View File

@ -101,13 +101,15 @@ async def open_registry(
if (
not tractor.is_root_process()
and not Registry.addrs
and
not Registry.addrs
):
Registry.addrs.extend(actor.reg_addrs)
if (
ensure_exists
and not Registry.addrs
and
not Registry.addrs
):
raise RuntimeError(
f"`{uid}` registry should already exist but doesn't?"
@ -146,7 +148,7 @@ async def find_service(
| list[Portal]
| None
):
# try:
reg_addrs: list[tuple[str, int]]
async with open_registry(
addrs=(
@ -157,22 +159,39 @@ async def find_service(
or Registry.addrs
),
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
maybe_portals: list[Portal] | Portal | None
log.info(
f'Scanning for service {service_name!r}'
)
# attach to existing daemon by name if possible
maybe_portals: list[Portal]|Portal|None
async with tractor.find_actor(
service_name,
registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref
) as maybe_portals:
if not maybe_portals:
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None
return
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals
# except BaseException as _berr:
# berr = _berr
# log.exception(
# 'tractor.find_actor() failed with,\n'
# )
# raise berr
async def check_for_service(
service_name: str,

View File

@ -963,7 +963,10 @@ async def tsdb_backfill(
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon(
push_latest_frame,
dt_eps,
@ -1012,9 +1015,16 @@ async def tsdb_backfill(
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
if def_frame_size != calced_frame_size:
log.warning(
f'Expected frame size {def_frame_size}\n'
f'Rxed frame {calced_frame_size}\n'
)
# await tractor.pause()
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
@ -1043,7 +1053,9 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
async with trio.open_nursery() as tn:
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
bf_done = await tn.start(
partial(
@ -1308,6 +1320,7 @@ async def manage_history(
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
log.info(

View File

@ -21,6 +21,7 @@ Main app startup and run.
from functools import partial
from types import ModuleType
import tractor
import trio
from piker.ui.qt import (
@ -116,6 +117,7 @@ async def _async_main(
needed_brokermods[brokername] = brokers[brokername]
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as root_n,
):
# set root nursery and task stack for spawning other charts/feeds

View File

@ -33,7 +33,6 @@ import trio
from piker.ui.qt import (
QtCore,
QtWidgets,
Qt,
QLineF,
QFrame,

View File

@ -1445,7 +1445,10 @@ async def display_symbol_data(
# for pause/resume on mouse interaction
rt_chart.feed = feed
async with trio.open_nursery() as ln:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as ln,
):
# if available load volume related built-in display(s)
vlm_charts: dict[
str,

View File

@ -22,7 +22,10 @@ from contextlib import asynccontextmanager as acm
from typing import Callable
import trio
from tractor.trionics import gather_contexts
from tractor.trionics import (
gather_contexts,
collapse_eg,
)
from piker.ui.qt import (
QtCore,
@ -207,7 +210,10 @@ async def open_signal_handler(
async for args in recv:
await async_handler(*args)
async with trio.open_nursery() as tn:
async with (
collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon(proxy_to_handler)
async with send:
yield
@ -242,6 +248,7 @@ async def open_handlers(
widget: QWidget
streams: list[trio.abc.ReceiveChannel]
async with (
collapse_eg(),
trio.open_nursery() as tn,
gather_contexts([
open_event_stream(

View File

@ -18,10 +18,11 @@
Feed status and controls widget(s) for embedding in a UI-pane.
"""
from __future__ import annotations
from textwrap import dedent
from typing import TYPE_CHECKING
from typing import (
Any,
TYPE_CHECKING,
)
# from PyQt5.QtCore import Qt
@ -49,35 +50,55 @@ def mk_feed_label(
a feed control protocol.
'''
status = feed.status
status: dict[str, Any] = feed.status
assert status
msg = dedent("""
actor: **{actor_name}**\n
|_ @**{host}:{port}**\n
""")
# SO tips on ws/nls,
# https://stackoverflow.com/a/15721400
ws: str = '&nbsp;'
# nl: str = '<br>' # dun work?
actor_info_repr: str = (
f')> **{status["actor_short_id"]}**\n'
'\n' # bc md?
)
for key, val in status.items():
if key in ('host', 'port', 'actor_name'):
continue
msg += f'\n|_ {key}: **{{{key}}}**\n'
# fields to select *IN* for display
# (see `.data.feed.open_feed()` status
# update -> TAG_feed_status_update)
for key in [
'ipc',
'hist_shm',
'rt_shm',
'throttle_hz',
]:
# NOTE, the 2nd key is filled via `.format()` updates.
actor_info_repr += (
f'\n' # bc md?
f'{ws}|_{key}: **{{{key}}}**\n'
)
# ^TODO? formatting and content..
# -[ ] showing which fqme is "forward" on the
# chart/fsp/order-mode?
# '|_ flows: **{symbols}**\n'
#
# -[x] why isn't the indent working?
# => markdown, now solved..
feed_label = FormatLabel(
fmt_str=msg,
# |_ streams: **{symbols}**\n
fmt_str=actor_info_repr,
font=_font.font,
font_size=_font_small.px_size,
font_color='default_lightest',
)
# ?TODO, remove this?
# form.vbox.setAlignment(feed_label, Qt.AlignBottom)
# form.vbox.setAlignment(Qt.AlignBottom)
_ = chart.height() - (
form.height() +
form.fill_bar.height()
# feed_label.height()
)
# _ = chart.height() - (
# form.height() +
# form.fill_bar.height()
# # feed_label.height()
# )
feed_label.format(**feed.status)
return feed_label

View File

@ -600,6 +600,7 @@ async def open_fsp_admin(
kwargs=kwargs,
) as (cache_hit, cluster_map),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
if cache_hit:
@ -613,6 +614,8 @@ async def open_fsp_admin(
)
try:
yield admin
# ??TODO, does this *need* to be inside a finally?
finally:
# terminate all tasks via signals
for key, entry in admin._registry.items():

View File

@ -285,18 +285,20 @@ class FormatLabel(QLabel):
font_size: int,
font_color: str,
use_md: bool = True,
parent=None,
) -> None:
super().__init__(parent)
# by default set the format string verbatim and expect user to
# call ``.format()`` later (presumably they'll notice the
# by default set the format string verbatim and expect user
# to call ``.format()`` later (presumably they'll notice the
# unformatted content if ``fmt_str`` isn't meant to be
# unformatted).
self.fmt_str = fmt_str
self.setText(fmt_str)
# self.setText(fmt_str) # ?TODO, why here?
self.setStyleSheet(
f"""QLabel {{
@ -306,9 +308,10 @@ class FormatLabel(QLabel):
"""
)
self.setFont(_font.font)
self.setTextFormat(
Qt.TextFormat.MarkdownText
)
if use_md:
self.setTextFormat(
Qt.TextFormat.MarkdownText
)
self.setMargin(0)
self.setSizePolicy(
@ -316,7 +319,10 @@ class FormatLabel(QLabel):
size_policy.Expanding,
)
self.setAlignment(
Qt.AlignVCenter | Qt.AlignLeft
Qt.AlignLeft
|
Qt.AlignBottom
# Qt.AlignVCenter
)
self.setText(self.fmt_str)

View File

@ -15,7 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
qompleterz: embeddable search and complete using trio, Qt and rapidfuzz.
qompleterz: embeddable search and complete using trio, Qt and
rapidfuzz.
"""
@ -46,6 +47,7 @@ import time
from pprint import pformat
from rapidfuzz import process as fuzzy
import tractor
import trio
from trio_typing import TaskStatus
@ -53,7 +55,7 @@ from piker.ui.qt import (
size_policy,
align_flag,
Qt,
QtCore,
# QtCore,
QtWidgets,
QModelIndex,
QItemSelectionModel,
@ -920,7 +922,10 @@ async def fill_results(
# issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
for provider, (search, pause) in (
_searcher_cache.copy().items()
@ -944,7 +949,7 @@ async def fill_results(
status_field='-> searchin..',
)
await n.start(
await tn.start(
pack_matches,
view,
has_results,
@ -1004,12 +1009,14 @@ async def handle_keyboard_input(
view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(), # needed?
trio.open_nursery() as tn
):
# start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and
# async updates the completer view's results.
n.start_soon(
tn.start_soon(
partial(
fill_results,
searchw,

View File

@ -792,6 +792,7 @@ async def open_order_mode(
brokerd_accounts,
ems_dialog_msgs,
),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):

View File

@ -15,6 +15,12 @@ from piker.service import (
from piker.log import get_console_log
# include `tractor`'s built-in fixtures!
pytest_plugins: tuple[str] = (
"tractor._testing.pytest",
)
def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing")

View File

@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
# async with tractor.open_nursery() as n:
# await n.run_in_actor('other', intermittently_refresh_tokens)
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
) as n
):
quoter = await qt.stock_quoter(client, us_symbols)
@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
else:
symbols = [tmx_symbols]
async with trio.open_nursery() as n:
async with trio.open_nursery(
strict_exception_groups=False,
) as n:
for syms, func in zip(symbols, stream_what):
n.start_soon(func, feed, syms)