Compare commits

..

29 Commits

Author SHA1 Message Date
Tyler Goodlet 70f952a083 Bump various `.brokers.core` doc string content/style 2026-01-06 14:03:15 -05:00
Tyler Goodlet ae7cd8df7c ib: multiline stylings, typing, timeout report 2026-01-06 13:53:53 -05:00
Tyler Goodlet b994dd85af Woops, fix to read `.api_port` ref from the `Client.ib.client`.. 2026-01-06 13:53:53 -05:00
Tyler Goodlet 7bea7518a5 Support per-`ib.vnc_addrs` vnc passwords
Such that the `brokers.toml` can contain any of the following
<port> = dict|tuple styles,

```toml
    [ib.vnc_addrs]
    4002 = {host = 'localhost', port = 5900, pw = 'doggy'}  # host, port, pw
    4002 = {host = 'localhost', port = 5900}  # host, port, pw
    4002 = ['localhost', 5900]  # host, port, pw
```

With the first line demonstrating a vnc-server password (as normally set
via a `.env` file in the `dockering/ib/` subdir) with the `pw =` field.
This obviously removes the hardcoded `'doggy'` password from prior.

Impl details in `.brokers.ib._util`:
- pass the `ib.api.Client` down into `vnc_click_hack()` doing all config
 reading within and removing host, port unpacking in the callingn
 `data_reset_hack()`.
- also pass the client `try_xdo_manual()` and comment (with plans to
  remove) the recently added localhost-only fallback section since
  we now have a fully working py vnc client again with `pyvnc` B)
- in `vnc_click_hack()` match for all the possible config line styles
  and,
  * pass any `pw` field to `pyvncVNCConfig`,
  * continue matching host, port without password,
  * fallthrough to raising a val-err when neither ^ match.
2026-01-06 13:53:53 -05:00
Tyler Goodlet a591ae998d binance: add `AggTrade.nq: float`: "normal quantity" field.. 2026-01-06 13:48:57 -05:00
Tyler Goodlet 3f16156e4d binance: handle new `TRADIFI_PERPETUAL`.. 2026-01-06 13:48:57 -05:00
Tyler Goodlet dee622bfc7 binance: add `Pair.opoAllowed` field
Handle new API field per 2025-12-02 update.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-06 13:48:57 -05:00
Tyler Goodlet 7cb65490c6 ib: bump `docker/ib/README.rst`
For the new github image, a high-level look at its basic
features/usage/docs and prosing around our expected default usage with
the `piker.brokers.ib` backend.
2026-01-06 13:48:57 -05:00
Tyler Goodlet 2d62ee681c ib.feed: better no-bars error-log message format 2026-01-06 13:48:57 -05:00
Tyler Goodlet 5e44e3e6d6 binance: set `Pair.pegInstructionsAllowed = False`
Lol, a cheeky unforeseen bug due to TOML's lack of a null type and
thinking i can render an `Optional` field on a `msgspec.Struct`
(defaulted to `None`) the `binance.symcache.toml` cache file..

I didn't catch this when i first updated to the 3.1 API in f7caa75228
because i never did a cache-files flush.. lesson learned and we **really
need tests for this**!!
2026-01-06 13:48:57 -05:00
Tyler Goodlet 44f58657c3 Set `.bs_mktid` on all IB position-msg emissions.. 2026-01-06 13:48:57 -05:00
Tyler Goodlet 6d6afbde8f Switch to `pyvnc` for IB reset hackz
It actually works for vncAuth(2) (thank god!) which the previous
`asyncvnc` **did not**, and seems to be mostly based on the work
from the `asyncvnc` author anyway (so all my past efforts don't seem to
have been in vain XD).

NOTE, the below deats ended up being factored in earlier into the
`pyproject.toml` alongside nix(os) support needed for testing and
landing this history. As the such, the comments are the originals but
the changes are not.

Deats,
- switch to `pyvnc` async API (using `asyncio` again obvi) in
  `.ib._util._vnc_click_hack()`.
- add `pyvnc` as src installed dep from GH.
- drop `asyncvnc` as dep.

Other,
- update `pytest` version range to avoid weird auto-load plugin exposed
  by `xonsh`?
- add a `tool.pytest.ini_options` to project file with vars to,
  - disable that^ `xonsh` plug using `addopts = '-p no:xonsh'`.
  - set a `testpaths` to avoid running anything but that subdir.
  - try out the `'progress'` style console output (does it work?).
2026-01-06 13:48:57 -05:00
Tyler Goodlet 6501f04ace Convert remaining `.to_asyncio.open_channel_from()` to `chan` fn-sig usage 2026-01-06 13:48:57 -05:00
Tyler Goodlet eff651ea28 `ib.feed`: finally solve `push()` exc propagation
Such that if/when the `push()` ticker callback (closure) errors
internally, we actually eventually bubble the error out-and-up from the
`asyncio.Task` and from there out the `.to_asyncio.open_channel_from()` to
the parent `trio.Task`..

It ended up being much more subtle to solve then i would have liked
thanks to,

- whatever `Ticker.updateEvent.connect()` does behind the scenes in
  terms of (clearly) swallowing with only log reporting any exc raised
  in the registered callback (in our case `push()`),

- `asyncio.Task.set_excepion()` never working and instead needing to
  resort to `Task.cancel()`, catching `CancelledError` and re-raising
  the stashed `maybe_exc` from `push()` when set..

Further this ports `.to_asyncio.open_channel_from()` usage to use
the new `chan: tractor.to_asyncio.LinkedTaskChannel` fn-sig API, namely
for `_setup_quote_stream()` task. Requires the latest `tractor` updates
to the inter-eventloop-chan iface providing a `.set_nowait()` and
`.get()` for the `asyncio`-side.

Impl deats within `_setup_quote_stream()`,
- implement `push()` error-bubbling by adding a `maybe_exc` which can be
  set by that callback itself or by its registering task; when set it is
  both,
  * reported on by the `teardown()` cb,
  * re-raised by the terminated (via `.cancel()`) `asyncio.Task` after
    woken from its sleep, aka "cancelled" (since that's apparently one
    of the only options.. see big rant further todo comments).
- add explicit error-tolerance-tuning via a `handler_tries: int` counter
  and `tries_before_raise: int` limit such that we only bubble
  a `push()` raised exc once enough tries have consecutively failed.
- as mentioned, use the new `chan` fn-sig support and thus the new
  method API for `asyncio` -> `trio` comms.
- a big TODO XXX around the need to use a better sys for terminating
  `asyncio.Task`s whether it's by delegating to some `.to_asyncio`
  internals after a factor-out OR by potentially going full bore `anyio`
  throughout `.to_asyncio`'s impl in general..
- mk `teardown()` use appropriate `log.<level>()`s based on outcome.

Surroundingly,
- add a ton of doc-strings to mod fns previously missing them.
- improved / added-new comments to `wait_on_data_reset()` internals and
  anything changed per ^above.

NOTE, resolved conflicts on `piker/brokers/ib/feed.py` due to
`brokers_refinery` commit:

d809c797 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout!
2026-01-06 13:48:57 -05:00
Tyler Goodlet 4c902f6cd7 `ib`: various type-annot, multiline styling and todos updates 2026-01-06 13:48:57 -05:00
Tyler Goodlet 8902142b20 ib: add venue-hours checking
Such that we can avoid other (pretty unreliable) "alternative" checks to
determine whether a real-time quote should be waited on or (when venue
is closed) we should just signal that historical backfilling can
commence immediately.

This has been a todo for a very long time and it turned out to be much
easier to accomplish than anticipated..

Deats,
- add a new `is_current_time_in_range()` dt range checker to predicate
  whether an input range contains `datetime.now(start_dt.tzinfo)`.
- in `.ib.feed.stream_quotes()` add a `venue_is_open: bool` which uses
  all of the new ^^ to determine whether to branch for the
  short-circuit-and-do-history-now-case or the std real-time-quotes
  should-be-awaited-since-venue-is-open, case; drop all the old hacks
  trying to workaround not figuring that venue state stuff..

Other,
- also add a gpt5 composed parser to `._util` for the
  `ib_insync.ContractDetails.tradingHours: str` for before i realized
  there was a `.tradingSessions` property XD
- in `.ib_feed`,
  * add various EG-collapsings per recent tractor/trio updates.
  * better logging / exc-handling around ticker quote pushes.
  * stop clearing `Ticker.ticks` each quote iteration; not sure if this
    is needed/correct tho?
  * add masked `Ticker.ticks` poll loop that logs.
- fix some `str.format()` usage in `._util.try_xdo_manual()`

NOTE, resolved conflicts on `piker/brokers/ib/feed.py` due to
rebasing onto up stream `brokers_refinery` commit,

d809c797 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout
2026-01-06 13:48:57 -05:00
Tyler Goodlet 875dacabc2 ib: never relay "Warning:" errors to EMS..
You'd think they could be bothered to make either a "log" or "warning"
msg type instead of a `type='error'`.. but alas, this attempts to detect
all such "warning"-errors and never proxy them to the clearing engine
thus avoiding the cancellation of any associated (by `reqid`)
pre-existing orders (control dialogs).

Also update all surrounding log messages to a more multiline style.
2026-01-06 13:48:57 -05:00
Tyler Goodlet 153e413c79 ib: jig `.data_reset_hack()` with vnc-client failover
Since apparently porting to the new docker container enforces using
a vnc password and `asyncvnc` seems to have a bug/mis-config whenever
i've tried a pw over a wg tunnel..?

Soo, this tries out the old `i3ipc`-win-focus + `xdo` click hack when
the above fails.

Deats,
- add a mod-level `try_xdo_manual()` to wrap calling
  `i3ipc_xdotool_manual_click_hack()` with an oserr handler, ensure we
  don't bother trying if `i3ipc` import fails beforehand tho.
- call ^ from both the orig case block and the failover from the
  vnc-client case.
- factor the `+no_setup_msg: str` out to mod level and expect it to be
  `.format()`-ed.
- refresh todo around `asyncvnc` pw ish..
- add a new `i3ipc_fin_wins_titled()` window-title scanner which
  predicates input `titles` and delivers any matches alongside the orig
  focused win at call time.
- tweak `i3ipc_xdotool_manual_click_hack()` to call ^ and remove prior
  unfactored window scanning logic.
2026-01-06 13:48:57 -05:00
Tyler Goodlet 2d2c20d72a Add fix for binance API 3.1 rollout..
See https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
2026-01-06 13:48:57 -05:00
Tyler Goodlet d1ecbd8142 kraken: add crash-handling around `Pair()` init
Since it can otherwise be difficult to debug due to nursery cancellation
(we need that taskman yo!).
2026-01-06 13:48:57 -05:00
Tyler Goodlet 10a6f666b9 kraken: `Pair.costmin` is now optional?
Some pairs don't seem to define it but it's not listed as deprecated on
official API page (new one now linked in type def's doc string).
2026-01-06 13:48:57 -05:00
Tyler Goodlet dfdd8a42ec binance: add new `permissionSets` to base `Pair` 2026-01-06 13:48:57 -05:00
Tyler Goodlet a578f7a21f Update `binance` spot pairs with `amendAllowed`
As per API updates,
https://developers.binance.com/docs/binance-spot-api-docs
https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority

I also slightly tweaked the filed mismatch exception note to include the
`repr(pair_type)` so the dev can know which pair types should be
changed.
2026-01-06 13:48:57 -05:00
Tyler Goodlet 9e7ad816d4 `.kraken`: add masked pauses for order req debug
Such that the next time i inevitably must debug the some order-request
error status or precision discrepancy, i have the mkt-symbol branch
ready to go. Also, switch to `'action': 'buy'|'sell' as action,` style
`case` matching instead of the post-`if` predicate style.
2026-01-06 13:48:57 -05:00
Tyler Goodlet a0ba295b50 `.questrade`: link in ws-API issue! 2026-01-06 13:48:57 -05:00
Tyler Goodlet 743dfc3646 `.kraken.broker`: need to `await verify_balances()` .. 2026-01-06 13:48:57 -05:00
Tyler Goodlet a230ca6dce `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! 2026-01-06 13:48:57 -05:00
Tyler Goodlet bc3b0379d0 `.brokers.cli`: module type and todo for `--pdb` flag to NOT src from sub-cmd 2026-01-06 13:48:57 -05:00
Tyler Goodlet a1a0b9c47b Type loaded backend modules 2026-01-06 13:48:57 -05:00
54 changed files with 627 additions and 1190 deletions

View File

@ -95,15 +95,12 @@ bc why install with `python` when you can faster with `rust` ::
include all GUIs (ex. for charting):: include all GUIs (ex. for charting)::
uv sync --group uis uv sync --extra uis
AND with **all** our normal hacking tools:: AND with all our hacking tools and WIP integrations::
uv sync --dev uv sync --dev --all-extras
AND if you want to try WIP integrations::
uv sync --all-groups
Ensure you can run the root-daemon:: Ensure you can run the root-daemon::

View File

@ -1,5 +1,6 @@
################
# ---- CEXY ---- # ---- CEXY ----
################
[binance] [binance]
accounts.paper = 'paper' accounts.paper = 'paper'
@ -12,41 +13,28 @@ accounts.spot = 'spot'
spot.use_testnet = false spot.use_testnet = false
spot.api_key = '' spot.api_key = ''
spot.api_secret = '' spot.api_secret = ''
# ------ binance ------
[deribit] [deribit]
# std assets
key_id = '' key_id = ''
key_secret = '' key_secret = ''
# options
accounts.option = 'option'
option.use_testnet = false
option.key_id = ''
option.key_secret = ''
# aux logging from `cryptofeed`
option.log.filename = 'cryptofeed.log'
option.log.level = 'DEBUG'
option.log.disabled = true
# ------ deribit ------
[kraken] [kraken]
key_descr = '' key_descr = ''
api_key = '' api_key = ''
secret = '' secret = ''
# ------ kraken ------
[kucoin] [kucoin]
key_id = '' key_id = ''
key_secret = '' key_secret = ''
key_passphrase = '' key_passphrase = ''
# ------ kucoin ------
################
# -- BROKERZ --- # -- BROKERZ ---
################
[questrade] [questrade]
refresh_token = '' refresh_token = ''
access_token = '' access_token = ''
@ -54,55 +42,44 @@ api_server = 'https://api06.iq.questrade.com/'
expires_in = 1800 expires_in = 1800
token_type = 'Bearer' token_type = 'Bearer'
expires_at = 1616095326.355846 expires_at = 1616095326.355846
# ------ questrade ------
[ib] [ib]
# define the (set of) host-port socketaddrs that
# brokerd.ib will scan to connect to an API endpoint
# (ib-gw or ib-tws listening instances)
hosts = [ hosts = [
'127.0.0.1', '127.0.0.1',
] ]
# XXX: the order in which ports will be scanned
# (by the `brokerd` daemon-actor)
# is determined # by the line order here.
# TODO: when we eventually spawn gateways in our
# container, we can just dynamically allocate these
# using IBC.
ports = [ ports = [
4002, # gw 4002, # gw
7497, # tws 7497, # tws
] ]
# When API endpoints are being scanned durin startup, the order # XXX: for a paper account the flex web query service
# of user-defined-account "names" (as defined below) here # is not supported so you have to manually download
# determines which py-client connection is given priority to be # and XML report and put it in a location that can be
# used for data-feed-requests by according to whichever client # accessed by the ``brokerd.ib`` backend code for parsing.
# connected to an API endpoing which reported the equivalent flex_token = ''
# account number for that name. flex_trades_query_id = '' # live account
# when clients are being scanned this determines
# which clients are preferred to be used for data
# feeds based on the order of account names, if
# detected as active on an API client.
prefer_data_account = [ prefer_data_account = [
'paper', 'paper',
'margin', 'margin',
'ira', 'ira',
] ]
# For long-term trades txn (transaction) history
# processing (i.e your txn ledger with IB) you can
# (automatically for live accounts) query the FLEX
# report system for past history.
#
# (For paper accounts the web query service
# is not supported so you have to manually download
# an XML report and put it in a location that can be
# accessed by our `brokerd.ib` backend code for parsing).
#
flex_token = ''
flex_trades_query_id = '' # live account
# define "aliases" (names) for each account number
# such that the names can be reffed and logged throughout
# `piker.accounting` subsys and more easily
# referred to by the user.
#
# These keys will be the set exposed through the order-mode
# account-selection UI so that numbers are never shown.
[ib.accounts] [ib.accounts]
paper = 'DU0000000' # <- literal account # # the order in which accounts will be selectable
margin = 'U0000000' # in the order mode UI (if found via clients during
ira = 'U0000000' # API-app scanning)when a new symbol is loaded.
# ------ ib ------ paper = 'XX0000000'
margin = 'X0000000'
ira = 'X0000000'

View File

@ -1,9 +1,7 @@
[network] [network]
pikerd = [ tsdb.backend = 'marketstore'
'/ipv4/127.0.0.1/tcp/6116', # std localhost daemon-actor tree tsdb.host = 'localhost'
# '/uds/6116', # TODO std uds socket file tsdb.grpc_port = 5995
]
[ui] [ui]
# set custom font + size which will scale entire UI # set custom font + size which will scale entire UI

View File

@ -24,8 +24,9 @@ here is an example using ``vncclient`` on ``linux``::
vncviewer localhost:5900 vncviewer localhost:5900
now enter the pw (password) you set via an (see second code blob)
`.env file`_ or pw-file according to the `credentials section`_. now enter the pw you set via an (see second code blob) `.env file`_
or pw-file according to the `credentials section`_.
If you want to change away from their default config see the example If you want to change away from their default config see the example
`docker-compose.yml`-config issue and config-section of the readme, `docker-compose.yml`-config issue and config-section of the readme,
@ -38,74 +39,6 @@ If you want to change away from their default config see the example
.. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials .. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials
Connecting to the API from `piker`
---------------------------------
In order to expose the container's API endpoint to the
`brokerd/datad/ib` actor, we need to add a section to the user's
`brokers.toml` config (note the below is similar to the repo-shipped
template file),
.. code:: toml
[ib]
# define the (set of) host-port socketaddrs that
# brokerd.ib will scan to connect to an API endpoint
# (ib-gw or ib-tws listening instances)
hosts = [
'127.0.0.1',
]
ports = [
4002, # gw
7497, # tws
]
# When API endpoints are being scanned durin startup, the order
# of user-defined-account "names" (as defined below) here
# determines which py-client connection is given priority to be
# used for data-feed-requests by according to whichever client
# connected to an API endpoing which reported the equivalent
# account number for that name.
prefer_data_account = [
'paper',
'margin',
'ira',
]
# define "aliases" (names) for each account number
# such that the names can be reffed and logged throughout
# `piker.accounting` subsys and more easily
# referred to by the user.
#
# These keys will be the set exposed through the order-mode
# account-selection UI so that numbers are never shown.
[ib.accounts]
paper = 'XX0000000'
margin = 'X0000000'
ira = 'X0000000'
the broker daemon can also connect to the container's VNC server for
added functionalies including,
- viewing the API endpoint program's GUI for manual interventions,
- workarounds for historical data throttling using hotkey hacks,
Add a further section to `brokers.toml` which maps each API-ep's
port to a table of VNC server connection info like,
.. code:: toml
[ib.vnc_addrs]
4002 = {host = 'localhost', port = 5900, pw = 'doggy'}
The `pw = 'doggy'` here ^ should the same value as the particular
container instances `.env` file setting (when it was run),
.. code:: ini
VNC_SERVER_PASSWORD='doggy'
IF you also want to run ``TWS`` IF you also want to run ``TWS``
------------------------------- -------------------------------
You can also run it containerized, You can also run it containerized,

View File

@ -1,15 +1,10 @@
# a community maintained IB API container! # rework from the original @
# # https://github.com/waytrade/ib-gateway-docker/blob/master/docker-compose.yml
# https://github.com/gnzsnz/ib-gateway-docker version: "3.5"
#
# For piker we (currently) include some minor deviations
# for some config files in the `volumes` section.
#
# See full configuration settings @
# - https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration
# - https://github.com/gnzsnz/ib-gateway-docker/discussions/103
services: services:
ib_gw_paper: ib_gw_paper:
# apparently java is a mega cukc: # apparently java is a mega cukc:
@ -55,22 +50,16 @@ services:
target: /root/scripts/run_x11_vnc.sh target: /root/scripts/run_x11_vnc.sh
read_only: true read_only: true
# NOTE: an alt method to fill these out is to # NOTE:to fill these out, define an `.env` file in the same dir as
# define an `.env` file in the same dir as # this compose file which looks something like:
# this compose file. # TWS_USERID='myuser'
# TWS_PASSWORD='guest'
environment: environment:
TWS_USERID: ${TWS_USERID} TWS_USERID: ${TWS_USERID}
# TWS_USERID: 'myuser'
TWS_PASSWORD: ${TWS_PASSWORD} TWS_PASSWORD: ${TWS_PASSWORD}
# TWS_PASSWORD: 'guest' TRADING_MODE: 'paper'
TRADING_MODE: ${TRADING_MODE} VNC_SERVER_PASSWORD: 'doggy'
# TRADING_MODE: 'paper' VNC_SERVER_PORT: '3003'
VNC_SERVER_PASSWORD: ${VNC_SERVER_PASSWORD}
# VNC_SERVER_PASSWORD: 'doggy'
# TODO, see if we can get this supported like it
# was on the old `waytrade` image?
# VNC_SERVER_PORT: '3003'
# ports: # ports:
# - target: 4002 # - target: 4002
@ -87,9 +76,6 @@ services:
# - "127.0.0.1:4002:4002" # - "127.0.0.1:4002:4002"
# - "127.0.0.1:5900:5900" # - "127.0.0.1:5900:5900"
# TODO, a masked but working example of dual paper + live
# ib-gw instances running in a single app run!
#
# ib_gw_live: # ib_gw_live:
# image: waytrade/ib-gateway:1012.2i # image: waytrade/ib-gateway:1012.2i
# restart: no # restart: no

View File

@ -121,7 +121,6 @@ async def bot_main():
# tick_throttle=10, # tick_throttle=10,
) as feed, ) as feed,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
assert accounts assert accounts

View File

@ -33,6 +33,7 @@ from ._pos import (
Account, Account,
load_account, load_account,
load_account_from_ledger, load_account_from_ledger,
open_pps,
open_account, open_account,
Position, Position,
) )
@ -67,6 +68,7 @@ __all__ = [
'load_account_from_ledger', 'load_account_from_ledger',
'mk_allocator', 'mk_allocator',
'open_account', 'open_account',
'open_pps',
'open_trade_ledger', 'open_trade_ledger',
'unpack_fqme', 'unpack_fqme',
'DerivTypes', 'DerivTypes',

View File

@ -356,20 +356,17 @@ class Position(Struct):
) -> bool: ) -> bool:
''' '''
Update clearing table by calculating the rolling ppu and Update clearing table by calculating the rolling ppu and
(accumulative) size in both the clears entry and local attrs (accumulative) size in both the clears entry and local
state. attrs state.
Inserts are always done in datetime sorted order. Inserts are always done in datetime sorted order.
''' '''
# added: bool = False
tid: str = t.tid tid: str = t.tid
if tid in self._events: if tid in self._events:
log.debug( log.warning(f'{t} is already added?!')
f'Txn is already added?\n' # return added
f'\n'
f'{t}\n'
)
return False
# TODO: apparently this IS possible with a dict but not # TODO: apparently this IS possible with a dict but not
# common and probably not that beneficial unless we're also # common and probably not that beneficial unless we're also
@ -450,12 +447,6 @@ class Position(Struct):
# def suggest_split(self) -> float: # def suggest_split(self) -> float:
# ... # ...
# ?TODO, for sending rendered state over the wire?
# def summary(self) -> PositionSummary:
# do minimal conversion to a subset of fields
# currently defined in `.clearing._messages.BrokerdPosition`
class Account(Struct): class Account(Struct):
''' '''
@ -499,9 +490,9 @@ class Account(Struct):
def update_from_ledger( def update_from_ledger(
self, self,
ledger: TransactionLedger|dict[str, Transaction], ledger: TransactionLedger | dict[str, Transaction],
cost_scalar: float = 2, cost_scalar: float = 2,
symcache: SymbologyCache|None = None, symcache: SymbologyCache | None = None,
_mktmap_table: dict[str, MktPair] | None = None, _mktmap_table: dict[str, MktPair] | None = None,
@ -740,7 +731,7 @@ class Account(Struct):
else: else:
# TODO: we reallly need a diff set of # TODO: we reallly need a diff set of
# loglevels/colors per subsys. # loglevels/colors per subsys.
log.debug( log.warning(
f'Recent position for {fqme} was closed!' f'Recent position for {fqme} was closed!'
) )
@ -754,7 +745,7 @@ class Account(Struct):
# XXX WTF: if we use a tomlkit.Integer here we get this # XXX WTF: if we use a tomlkit.Integer here we get this
# super weird --1 thing going on for cumsize!?1! # super weird --1 thing going on for cumsize!?1!
# NOTE: the fix was to always float() the size value loaded # NOTE: the fix was to always float() the size value loaded
# in open_account() below! # in open_pps() below!
config.write( config.write(
config=self.conf, config=self.conf,
path=self.conf_path, path=self.conf_path,
@ -938,6 +929,7 @@ def open_account(
clears_table['dt'] = dt clears_table['dt'] = dt
trans.append(Transaction( trans.append(Transaction(
fqme=bs_mktid, fqme=bs_mktid,
# sym=mkt,
bs_mktid=bs_mktid, bs_mktid=bs_mktid,
tid=tid, tid=tid,
# XXX: not sure why sometimes these are loaded as # XXX: not sure why sometimes these are loaded as
@ -960,22 +952,11 @@ def open_account(
): ):
expiry: pendulum.DateTime = pendulum.parse(expiry) expiry: pendulum.DateTime = pendulum.parse(expiry)
# !XXX, should never be duplicates over pp = pp_objs[bs_mktid] = Position(
# a backend-(broker)-system's unique market-IDs! mkt,
if pos := pp_objs.get(bs_mktid): split_ratio=split_ratio,
if mkt != pos.mkt: bs_mktid=bs_mktid,
log.warning( )
f'Duplicated position but diff `MktPair.fqme` ??\n'
f'bs_mktid: {bs_mktid!r}\n'
f'pos.mkt: {pos.mkt}\n'
f'mkt: {mkt}\n'
)
else:
pos = pp_objs[bs_mktid] = Position(
mkt,
split_ratio=split_ratio,
bs_mktid=bs_mktid,
)
# XXX: super critical, we need to be sure to include # XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were # all pps.toml clears to avoid reusing clears that were
@ -983,13 +964,8 @@ def open_account(
# state, since today's records may have already been # state, since today's records may have already been
# processed! # processed!
for t in trans: for t in trans:
added: bool = pos.add_clear(t) pp.add_clear(t)
if not added:
log.warning(
f'Txn already recorded in pp ??\n'
f'\n'
f'{t}\n'
)
try: try:
yield acnt yield acnt
finally: finally:
@ -997,6 +973,20 @@ def open_account(
acnt.write_config() acnt.write_config()
# TODO: drop the old name and THIS!
@cm
def open_pps(
*args,
**kwargs,
) -> Generator[Account, None, None]:
log.warning(
'`open_pps()` is now deprecated!\n'
'Please use `with open_account() as cnt:`'
)
with open_account(*args, **kwargs) as acnt:
yield acnt
def load_account_from_ledger( def load_account_from_ledger(
brokername: str, brokername: str,

View File

@ -22,9 +22,7 @@ you know when you're losing money (if possible) XD
from __future__ import annotations from __future__ import annotations
from collections.abc import ValuesView from collections.abc import ValuesView
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
from functools import partial
from math import copysign from math import copysign
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -32,7 +30,6 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
from tractor.devx import maybe_open_crash_handler
import polars as pl import polars as pl
from pendulum import ( from pendulum import (
DateTime, DateTime,
@ -40,16 +37,12 @@ from pendulum import (
parse, parse,
) )
from ..log import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from ._ledger import ( from ._ledger import (
Transaction, Transaction,
TransactionLedger, TransactionLedger,
) )
log = get_logger(__name__)
def ppu( def ppu(
clears: Iterator[Transaction], clears: Iterator[Transaction],
@ -245,9 +238,6 @@ def iter_by_dt(
def dyn_parse_to_dt( def dyn_parse_to_dt(
tx: tuple[str, dict[str, Any]] | Transaction, tx: tuple[str, dict[str, Any]] | Transaction,
debug: bool = False,
_invalid: list|None = None,
) -> DateTime: ) -> DateTime:
# handle `.items()` inputs # handle `.items()` inputs
@ -260,90 +250,52 @@ def iter_by_dt(
# get best parser for this record.. # get best parser for this record..
for k in parsers: for k in parsers:
if ( if (
(v := getattr(tx, k, None)) isdict and k in tx
or or
( getattr(tx, k, None)
isdict
and
(v := tx.get(k))
)
): ):
v = (
tx[k] if isdict
else tx.dt
)
assert v is not None, (
f'No valid value for `{k}`!?'
)
# only call parser on the value if not None from # only call parser on the value if not None from
# the `parsers` table above (when NOT using # the `parsers` table above (when NOT using
# `.get()`), otherwise pass through the value and # `.get()`), otherwise pass through the value and
# sort on it directly # sort on it directly
if ( if (
not isinstance(v, DateTime) not isinstance(v, DateTime)
and and (parser := parsers.get(k))
(parser := parsers.get(k))
): ):
ret = parser(v) return parser(v)
else: else:
ret = v return v
return ret
else:
log.debug(
f'Parser-field not found in txn\n'
f'\n'
f'parser-field: {k!r}\n'
f'txn: {tx!r}\n'
f'\n'
f'Trying next..\n'
)
continue
# XXX: we should never really get here bc it means some kinda
# bad txn-record (field) data..
#
# -> set the `debug_mode = True` if you want to trace such
# cases from REPL ;)
else: else:
# TODO: move to top?
from piker.log import get_logger
log = get_logger(__name__)
# XXX: we should really never get here.. # XXX: we should really never get here..
# only if a ledger record has no expected sort(able) # only if a ledger record has no expected sort(able)
# field will we likely hit this.. like with ze IB. # field will we likely hit this.. like with ze IB.
# if no sortable field just deliver epoch? # if no sortable field just deliver epoch?
log.warning( log.warning(
'No (time) sortable field for TXN:\n' 'No (time) sortable field for TXN:\n'
f'{tx!r}\n' f'{tx}\n'
) )
report: str = ( return from_timestamp(0)
f'No supported time-field found in txn !?\n' # breakpoint()
f'\n'
f'supported-time-fields: {parsers!r}\n'
f'\n'
f'txn: {tx!r}\n'
)
if debug:
with maybe_open_crash_handler(
pdb=debug,
raise_on_exit=False,
):
raise ValueError(report)
else:
log.error(report)
if _invalid is not None:
_invalid.append(tx)
return from_timestamp(0.)
entry: tuple[str, dict]|Transaction entry: tuple[str, dict] | Transaction
invalid: list = []
for entry in sorted( for entry in sorted(
records, records,
key=key or partial( key=key or dyn_parse_to_dt,
dyn_parse_to_dt,
_invalid=invalid,
),
): ):
if entry in invalid:
log.warning(
f'Ignoring txn w invalid timestamp ??\n'
f'{pformat(entry)}\n'
)
continue
# NOTE the type sig above; either pairs or txns B) # NOTE the type sig above; either pairs or txns B)
yield entry yield entry
@ -406,7 +358,6 @@ def open_ledger_dfs(
acctname: str, acctname: str,
ledger: TransactionLedger | None = None, ledger: TransactionLedger | None = None,
debug_mode: bool = False,
**kwargs, **kwargs,
@ -421,10 +372,8 @@ def open_ledger_dfs(
can update the ledger on exit. can update the ledger on exit.
''' '''
with maybe_open_crash_handler( from piker.toolz import open_crash_handler
pdb=debug_mode, with open_crash_handler():
# raise_on_exit=False,
):
if not ledger: if not ledger:
import time import time
from ._ledger import open_trade_ledger from ._ledger import open_trade_ledger
@ -516,7 +465,7 @@ def ledger_to_dfs(
df = dfs[key] = ldf.with_columns([ df = dfs[key] = ldf.with_columns([
pl.cum_sum('size').alias('cumsize'), pl.cumsum('size').alias('cumsize'),
# amount of source asset "sent" (via buy txns in # amount of source asset "sent" (via buy txns in
# the market) to acquire the dst asset, PER txn. # the market) to acquire the dst asset, PER txn.
@ -531,7 +480,7 @@ def ledger_to_dfs(
]).with_columns([ ]).with_columns([
# rolling balance in src asset units # rolling balance in src asset units
(pl.col('dst_bot').cum_sum() * -1).alias('src_balance'), (pl.col('dst_bot').cumsum() * -1).alias('src_balance'),
# "position operation type" in terms of increasing the # "position operation type" in terms of increasing the
# amount in the dst asset (entering) or decreasing the # amount in the dst asset (entering) or decreasing the
@ -673,7 +622,7 @@ def ledger_to_dfs(
# cost that was included in the least-recently # cost that was included in the least-recently
# entered txn that is still part of the current CSi # entered txn that is still part of the current CSi
# set. # set.
# => we look up the cost-per-unit cum_sum and apply # => we look up the cost-per-unit cumsum and apply
# if over the current txn size (by multiplication) # if over the current txn size (by multiplication)
# and then reverse that previusly applied cost on # and then reverse that previusly applied cost on
# the txn_cost for this record. # the txn_cost for this record.

View File

@ -96,10 +96,7 @@ async def _setup_persistent_brokerd(
# - `open_symbol_search()` # - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`. # NOTE: see ep invocation details inside `.data.feed`.
try: try:
async with ( async with trio.open_nursery() as service_nursery:
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus( bus: _FeedsBus = feed.get_feed_bus(
brokername, brokername,
service_nursery, service_nursery,

View File

@ -440,7 +440,6 @@ async def open_trade_dialog(
# - ledger: TransactionLedger # - ledger: TransactionLedger
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
): ):

View File

@ -102,13 +102,12 @@ class AggTrade(Struct, frozen=True):
a: int # Aggregate trade ID a: int # Aggregate trade ID
p: float # Price p: float # Price
q: float # Quantity with all the market trades q: float # Quantity with all the market trades
nq: float # Normal quantity without the trades involving RPI orders
f: int # First trade ID f: int # First trade ID
l: int # noqa Last trade ID l: int # noqa Last trade ID
T: int # Trade time T: int # Trade time
m: bool # Is the buyer the market maker? m: bool # Is the buyer the market maker?
M: bool|None = None # Ignore M: bool | None = None # Ignore
nq: float|None = None # Normal quantity without the trades involving RPI orders
# ^XXX https://developers.binance.com/docs/derivatives/change-log#2025-12-29
async def stream_messages( async def stream_messages(
@ -451,6 +450,7 @@ async def subscribe(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
@ -462,7 +462,6 @@ async def stream_quotes(
) -> None: ) -> None:
async with ( async with (
tractor.trionics.maybe_raise_from_masking_exc(),
send_chan as send_chan, send_chan as send_chan,
open_cached_client('binance') as client, open_cached_client('binance') as client,
): ):

View File

@ -31,7 +31,7 @@ from typing import (
Callable, Callable,
) )
from pendulum import now import pendulum
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
@ -39,7 +39,6 @@ import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
maybe_open_context maybe_open_context
collapse_eg,
) )
from tractor import to_asyncio from tractor import to_asyncio
# XXX WOOPS XD # XXX WOOPS XD
@ -433,7 +432,6 @@ async def get_client(
) -> Client: ) -> Client:
async with ( async with (
collapse_eg(),
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc _testnet_ws_url, dtype=JSONRPCResult) as json_rpc

View File

@ -48,7 +48,6 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
from tractor import trionics
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
DateTime, DateTime,
@ -1373,8 +1372,8 @@ async def load_clients_for_trio(
''' '''
Pure async mngr proxy to ``load_aio_clients()``. Pure async mngr proxy to ``load_aio_clients()``.
This is a bootstrap entrypoint to call from This is a bootstrap entrypoing to call from
a `tractor.to_asyncio.open_channel_from()`. a ``tractor.to_asyncio.open_channel_from()``.
''' '''
async with load_aio_clients( async with load_aio_clients(
@ -1394,10 +1393,7 @@ async def open_client_proxies() -> tuple[
async with ( async with (
tractor.trionics.maybe_open_context( tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from, acm_func=tractor.to_asyncio.open_channel_from,
kwargs={ kwargs={'target': load_clients_for_trio},
'target': load_clients_for_trio,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access # lock around current actor task access
# TODO: maybe this should be the default in tractor? # TODO: maybe this should be the default in tractor?
@ -1589,8 +1585,7 @@ async def open_client_proxy(
event_consumers=event_table, event_consumers=event_table,
) as (first, chan), ) as (first, chan),
trionics.collapse_eg(), # loose-ify trio.open_nursery() as relay_n,
trio.open_nursery() as relay_tn,
): ):
assert isinstance(first, Client) assert isinstance(first, Client)
@ -1630,7 +1625,7 @@ async def open_client_proxy(
continue continue
relay_tn.start_soon(relay_events) relay_n.start_soon(relay_events)
yield proxy yield proxy

View File

@ -34,7 +34,6 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor.to_asyncio import LinkedTaskChannel from tractor.to_asyncio import LinkedTaskChannel
from tractor import trionics
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
) )
@ -416,7 +415,7 @@ async def update_and_audit_pos_msg(
# TODO: make this a "propaganda" log level? # TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price: if ibpos.avgCost != msg.avg_price:
log.debug( log.warning(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n' f'ib: {ibfmtmsg}\n'
'---------------------------\n' '---------------------------\n'
@ -758,7 +757,7 @@ async def open_trade_dialog(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n' f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n' 'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
) )
log.debug(logmsg) log.error(logmsg)
await ctx.started(( await ctx.started((
all_positions, all_positions,
@ -767,22 +766,21 @@ async def open_trade_dialog(
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trionics.collapse_eg(), trio.open_nursery() as n,
trio.open_nursery() as tn,
): ):
# relay existing open orders to ems # relay existing open orders to ems
for msg in order_msgs: for msg in order_msgs:
await ems_stream.send(msg) await ems_stream.send(msg)
for client in set(aioclients.values()): for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await tn.start( trade_event_stream: LinkedTaskChannel = await n.start(
open_trade_event_stream, open_trade_event_stream,
client, client,
) )
# start order request handler **before** local trades # start order request handler **before** local trades
# event loop # event loop
tn.start_soon( n.start_soon(
handle_order_requests, handle_order_requests,
ems_stream, ems_stream,
accounts_def, accounts_def,
@ -790,7 +788,7 @@ async def open_trade_dialog(
) )
# allocate event relay tasks for each client connection # allocate event relay tasks for each client connection
tn.start_soon( n.start_soon(
deliver_trade_events, deliver_trade_events,
trade_event_stream, trade_event_stream,

View File

@ -25,10 +25,7 @@ from typing import TYPE_CHECKING
import trio import trio
import tractor import tractor
from tractor.trionics import ( from tractor.trionics import broadcast_receiver
broadcast_receiver,
collapse_eg,
)
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -288,11 +285,8 @@ async def open_ems(
client._ems_stream = trades_stream client._ems_stream = trades_stream
# start sync code order msg delivery task # start sync code order msg delivery task
async with ( async with trio.open_nursery() as n:
collapse_eg(), n.start_soon(
trio.open_nursery() as tn,
):
tn.start_soon(
relay_orders_from_sync_code, relay_orders_from_sync_code,
client, client,
fqme, fqme,
@ -308,4 +302,4 @@ async def open_ems(
) )
# stop the sync-msg-relay task on exit. # stop the sync-msg-relay task on exit.
tn.cancel_scope.cancel() n.cancel_scope.cancel()

View File

@ -42,7 +42,6 @@ from bidict import bidict
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import trionics
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -77,6 +76,7 @@ if TYPE_CHECKING:
# TODO: numba all of this # TODO: numba all of this
def mk_check( def mk_check(
trigger_price: float, trigger_price: float,
known_last: float, known_last: float,
action: str, action: str,
@ -162,7 +162,7 @@ async def clear_dark_triggers(
router: Router, router: Router,
brokerd_orders_stream: tractor.MsgStream, brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str, broker: str,
fqme: str, fqme: str,
@ -178,7 +178,6 @@ async def clear_dark_triggers(
''' '''
# XXX: optimize this for speed! # XXX: optimize this for speed!
# TODO: # TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this! # - numba all this!
# - this stream may eventually contain multiple symbols # - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False quote_stream._raise_on_lag = False
@ -388,7 +387,6 @@ async def open_brokerd_dialog(
for ep_name in [ for ep_name in [
'open_trade_dialog', # probably final name? 'open_trade_dialog', # probably final name?
'trades_dialogue', # legacy 'trades_dialogue', # legacy
# ^!TODO, rm this since all backends ported no ?!?
]: ]:
trades_endpoint = getattr( trades_endpoint = getattr(
brokermod, brokermod,
@ -502,7 +500,7 @@ class Router(Struct):
''' '''
# setup at actor spawn time # setup at actor spawn time
_tn: trio.Nursery nursery: trio.Nursery
# broker to book map # broker to book map
books: dict[str, DarkBook] = {} books: dict[str, DarkBook] = {}
@ -672,7 +670,7 @@ class Router(Struct):
# dark book clearing loop, also lives with parent # dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no # daemon to allow dark order clearing while no
# client is connected. # client is connected.
self._tn.start_soon( self.nursery.start_soon(
clear_dark_triggers, clear_dark_triggers,
self, self,
relay.brokerd_stream, relay.brokerd_stream,
@ -695,7 +693,7 @@ class Router(Struct):
# spawn a ``brokerd`` order control dialog stream # spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon. # that syncs lifetime with the parent `emsd` daemon.
self._tn.start_soon( self.nursery.start_soon(
translate_and_relay_brokerd_events, translate_and_relay_brokerd_events,
broker, broker,
relay.brokerd_stream, relay.brokerd_stream,
@ -769,12 +767,10 @@ async def _setup_persistent_emsd(
global _router global _router
# open a root "service task-nursery" for the `emsd`-actor # open a root "service nursery" for the ``emsd`` actor
async with ( async with trio.open_nursery() as service_nursery:
trionics.collapse_eg(),
trio.open_nursery() as tn _router = Router(nursery=service_nursery)
):
_router = Router(_tn=tn)
# TODO: send back the full set of persistent # TODO: send back the full set of persistent
# orders/execs? # orders/execs?
@ -1028,18 +1024,8 @@ async def translate_and_relay_brokerd_events(
) )
if status == 'closed': if status == 'closed':
log.info( log.info(f'Execution for {oid} is complete!')
f'Execution is complete!\n' status_msg = book._active.pop(oid)
f'oid: {oid!r}\n'
)
status_msg = book._active.pop(oid, None)
if status_msg is None:
log.warning(
f'Order was already cleared from book ??\n'
f'oid: {oid!r}\n'
f'\n'
f'Maybe the order cancelled before submitted ??\n'
)
elif status == 'canceled': elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!') log.cancel(f'Cancellation for {oid} is complete!')
@ -1204,16 +1190,12 @@ async def process_client_order_cmds(
submitting live orders immediately if requested by the client. submitting live orders immediately if requested by the client.
''' '''
# TODO, only allow `msgspec.Struct` form! # cmd: dict
cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info( log.info(f'Received order cmd:\n{pformat(cmd)}')
f'Received order cmd:\n'
f'{pformat(cmd)}\n'
)
# CAWT DAMN we need struct support! # CAWT DAMN we need struct support!
oid: str = str(cmd['oid']) oid = str(cmd['oid'])
# register this stream as an active order dialog (msg flow) for # register this stream as an active order dialog (msg flow) for
# this order id such that translated message from the brokerd # this order id such that translated message from the brokerd
@ -1319,7 +1301,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': price, 'price': trigger_price,
'size': size, 'size': size,
'action': ('buy' | 'sell') as action, 'action': ('buy' | 'sell') as action,
'exec_mode': ('live' | 'paper'), 'exec_mode': ('live' | 'paper'),
@ -1351,7 +1333,7 @@ async def process_client_order_cmds(
symbol=sym, symbol=sym,
action=action, action=action,
price=price, price=trigger_price,
size=size, size=size,
account=req.account, account=req.account,
) )
@ -1373,11 +1355,7 @@ async def process_client_order_cmds(
# (``translate_and_relay_brokerd_events()`` above) will # (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info( log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
f'Sending live order to {broker}:\n'
f'{pformat(msg)}'
)
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
@ -1393,7 +1371,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': price, 'price': trigger_price,
'size': size, 'size': size,
'exec_mode': exec_mode, 'exec_mode': exec_mode,
'action': action, 'action': action,
@ -1421,12 +1399,7 @@ async def process_client_order_cmds(
if isnan(last): if isnan(last):
last = flume.rt_shm.array[-1]['close'] last = flume.rt_shm.array[-1]['close']
trigger_price: float = float(price) pred = mk_check(trigger_price, last, action)
pred = mk_check(
trigger_price,
last,
action,
)
# NOTE: for dark orders currently we submit # NOTE: for dark orders currently we submit
# the triggered live order at a price 5 ticks # the triggered live order at a price 5 ticks
@ -1533,7 +1506,7 @@ async def maybe_open_trade_relays(
loglevel: str = 'info', loglevel: str = 'info',
): ):
fqme, relay, feed, client_ready = await _router._tn.start( fqme, relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays, _router.open_trade_relays,
fqme, fqme,
exec_mode, exec_mode,
@ -1563,18 +1536,19 @@ async def maybe_open_trade_relays(
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
ctx: tractor.Context, # becomes `ems_ctx` below ctx: tractor.Context,
fqme: str, fqme: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str|None = None, loglevel: str | None = None,
) -> tuple[ # `ctx.started()` value! ) -> tuple[
dict[ # positions dict[
tuple[str, str], # brokername, acctid # brokername, acctid
tuple[str, str],
list[BrokerdPosition], list[BrokerdPosition],
], ],
list[str], # accounts list[str],
dict[str, Status], # dialogs dict[str, Status],
]: ]:
''' '''
EMS (sub)actor entrypoint providing the execution management EMS (sub)actor entrypoint providing the execution management

View File

@ -19,7 +19,6 @@ Clearing sub-system message and protocols.
""" """
from __future__ import annotations from __future__ import annotations
from decimal import Decimal
from typing import ( from typing import (
Literal, Literal,
) )
@ -72,15 +71,7 @@ class Order(Struct):
symbol: str # | MktPair symbol: str # | MktPair
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
# https://docs.python.org/3/library/decimal.html#decimal-objects price: float
#
# ?TODO? decimal usage throughout?
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
# bit?
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
# -[ ] should we also use it for .size?
#
price: Decimal
size: float # -ve is "sell", +ve is "buy" size: float # -ve is "sell", +ve is "buy"
brokers: list[str] = [] brokers: list[str] = []
@ -187,7 +178,7 @@ class BrokerdOrder(Struct):
time_ns: int time_ns: int
symbol: str # fqme symbol: str # fqme
price: Decimal price: float
size: float size: float
# TODO: if we instead rely on a +ve/-ve size to determine # TODO: if we instead rely on a +ve/-ve size to determine
@ -301,9 +292,6 @@ class BrokerdError(Struct):
# TODO: yeah, so we REALLY need to completely deprecate # TODO: yeah, so we REALLY need to completely deprecate
# this and use the `.accounting.Position` msg-type instead.. # this and use the `.accounting.Position` msg-type instead..
# -[ ] an alternative might be to add a `Position.summary() ->
# `PositionSummary`-msg that we generate since `Position` has a lot
# of fields by default we likely don't want to send over the wire?
class BrokerdPosition(Struct): class BrokerdPosition(Struct):
''' '''
Position update event from brokerd. Position update event from brokerd.
@ -316,4 +304,3 @@ class BrokerdPosition(Struct):
avg_price: float avg_price: float
currency: str = '' currency: str = ''
name: str = 'position' name: str = 'position'
bs_mktid: str|int|None = None

View File

@ -510,7 +510,7 @@ async def handle_order_requests(
reqid = await client.submit_limit( reqid = await client.submit_limit(
oid=order.oid, oid=order.oid,
symbol=f'{order.symbol}.{client.broker}', symbol=f'{order.symbol}.{client.broker}',
price=float(order.price), price=order.price,
action=order.action, action=order.action,
size=order.size, size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that # XXX: by default 0 tells ``ib_insync`` methods that

View File

@ -134,65 +134,86 @@ def pikerd(
Spawn the piker broker-daemon. Spawn the piker broker-daemon.
''' '''
# from tractor.devx import maybe_open_crash_handler from tractor.devx import maybe_open_crash_handler
# with maybe_open_crash_handler(pdb=False): with maybe_open_crash_handler(pdb=pdb):
log = get_console_log(loglevel, name='cli') log = get_console_log(loglevel, name='cli')
if pdb: if pdb:
log.warning(( log.warning((
"\n" "\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the " "When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n" "task-thread until resumed from console!\n"
"\n" "\n"
))
# service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
conf, _ = config.load(
conf_name='conf',
)
network: dict = conf.get('network')
if (
network is None
and not maddr
):
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
)) ))
from .. import service # service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
async def main(): conf, _ = config.load(
service_mngr: service.Services conf_name='conf',
async with ( )
service.open_pikerd( network: dict = conf.get('network')
registry_addrs=regaddrs, if (
loglevel=loglevel, network is None
debug_mode=pdb, and not maddr
# enable_transports=['uds'],
enable_transports=['tcp'],
) as service_mngr,
): ):
assert service_mngr regaddrs = [(
# ?TODO? spawn all other sub-actor daemons according to _default_registry_host,
# multiaddress endpoint spec defined by user config _default_registry_port,
await trio.sleep_forever() )]
trio.run(main) else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
from .. import service
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
) as service_mngr, # normally delivers a ``Services`` handle
# AsyncExitStack() as stack,
):
# TODO: spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
assert service_mngr
# if tsdb:
# dname, conf = await stack.enter_async_context(
# service.marketstore.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'TSDB `{dname}` up with conf:\n{conf}')
# if es:
# dname, conf = await stack.enter_async_context(
# service.elastic.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'DB `{dname}` up with conf:\n{conf}')
await trio.sleep_forever()
trio.run(main)
@click.group(context_settings=config._context_defaults) @click.group(context_settings=config._context_defaults)
@ -307,10 +328,6 @@ def services(config, tl, ports):
if not ports: if not ports:
ports = [_default_registry_port] ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)
async def list_services(): async def list_services():
nonlocal host nonlocal host
async with ( async with (
@ -318,18 +335,16 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_registry( tractor.get_arbiter(
addr=addr, host=host,
port=ports[0]
) as portal ) as portal
): ):
registry = await portal.run_from_ns( registry = await portal.run_from_ns('self', 'get_registry')
'self',
'get_registry',
)
json_d = {} json_d = {}
for key, socket in registry.items(): for key, socket in registry.items():
json_d[key] = f'{socket}' host, port = socket
json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}") click.echo(f"{colorize_json(json_d)}")
trio.run(list_services) trio.run(list_services)

View File

@ -41,13 +41,10 @@ from .log import get_logger
log = get_logger('broker-config') log = get_logger('broker-config')
# XXX NOTE: taken from `click` # XXX NOTE: taken from ``click`` since apparently they have some
# |_https://github.com/pallets/click/blob/main/src/click/utils.py#L449 # super weirdness with sigint and sudo..no clue
# # we're probably going to slowly just modify it to our own version over
# (since apparently they have some super weirdness with SIGINT and # time..
# sudo.. no clue we're probably going to slowly just modify it to our
# own version over time..)
#
def get_app_dir( def get_app_dir(
app_name: str, app_name: str,
roaming: bool = True, roaming: bool = True,
@ -264,7 +261,7 @@ def load(
MutableMapping, MutableMapping,
] = tomllib.loads, ] = tomllib.loads,
touch_if_dne: bool = True, touch_if_dne: bool = False,
**tomlkws, **tomlkws,
@ -273,7 +270,7 @@ def load(
Load config file by name. Load config file by name.
If desired config is not in the top level piker-user config path then If desired config is not in the top level piker-user config path then
pass the `path: Path` explicitly. pass the ``path: Path`` explicitly.
''' '''
# create the $HOME/.config/piker dir if dne # create the $HOME/.config/piker dir if dne
@ -288,8 +285,7 @@ def load(
if ( if (
not path.is_file() not path.is_file()
and and touch_if_dne
touch_if_dne
): ):
# only do a template if no path provided, # only do a template if no path provided,
# just touch an empty file with same name. # just touch an empty file with same name.

View File

@ -740,7 +740,7 @@ async def sample_and_broadcast(
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n' '@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n' f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz' f'throttle = {throttle} Hz'
) )

View File

@ -91,18 +91,6 @@ class SymbologyCache(Struct):
# provided by the backend pkg. # provided by the backend pkg.
mktmaps: dict[str, MktPair] = field(default_factory=dict) mktmaps: dict[str, MktPair] = field(default_factory=dict)
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' .mod: {self.mod!r}\n'
f' .assets: {len(self.assets)!r}\n'
f' .pairs: {len(self.pairs)!r}\n'
f' .mktmaps: {len(self.mktmaps)!r}\n'
f')>'
)
__repr__ = pformat
def write_config(self) -> None: def write_config(self) -> None:
# put the backend's pair-struct type ref at the top # put the backend's pair-struct type ref at the top

View File

@ -27,6 +27,7 @@ from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any, Any,
Optional,
Callable, Callable,
AsyncContextManager, AsyncContextManager,
AsyncGenerator, AsyncGenerator,
@ -34,7 +35,6 @@ from typing import (
) )
import json import json
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_websocket import ( from trio_websocket import (
@ -167,7 +167,7 @@ async def _reconnect_forever(
async def proxy_msgs( async def proxy_msgs(
ws: WebSocketConnection, ws: WebSocketConnection,
rent_cs: trio.CancelScope, # parent cancel scope pcs: trio.CancelScope, # parent cancel scope
): ):
''' '''
Receive (under `timeout` deadline) all msgs from from underlying Receive (under `timeout` deadline) all msgs from from underlying
@ -192,7 +192,7 @@ async def _reconnect_forever(
f'{url} connection bail with:' f'{url} connection bail with:'
) )
await trio.sleep(0.5) await trio.sleep(0.5)
rent_cs.cancel() pcs.cancel()
# go back to reonnect loop in parent task # go back to reonnect loop in parent task
return return
@ -204,7 +204,7 @@ async def _reconnect_forever(
f'{src_mod}\n' f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n' 'WS feed seems down and slow af.. reconnecting\n'
) )
rent_cs.cancel() pcs.cancel()
# go back to reonnect loop in parent task # go back to reonnect loop in parent task
return return
@ -228,12 +228,7 @@ async def _reconnect_forever(
nobsws._connected = trio.Event() nobsws._connected = trio.Event()
task_status.started() task_status.started()
mc_state: trio._channel.MemoryChannelState = snd._state while not snd._closed:
while (
mc_state.open_receive_channels > 0
and
mc_state.open_send_channels > 0
):
log.info( log.info(
f'{src_mod}\n' f'{src_mod}\n'
f'{url} trying (RE)CONNECT' f'{url} trying (RE)CONNECT'
@ -242,11 +237,10 @@ async def _reconnect_forever(
ws: WebSocketConnection ws: WebSocketConnection
try: try:
async with ( async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws, open_websocket_url(url) as ws,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
): ):
cs = nobsws._cs = tn.cancel_scope cs = nobsws._cs = n.cancel_scope
nobsws._ws = ws nobsws._ws = ws
log.info( log.info(
f'{src_mod}\n' f'{src_mod}\n'
@ -254,7 +248,7 @@ async def _reconnect_forever(
) )
# begin relay loop to forward msgs # begin relay loop to forward msgs
tn.start_soon( n.start_soon(
proxy_msgs, proxy_msgs,
ws, ws,
cs, cs,
@ -268,7 +262,7 @@ async def _reconnect_forever(
# TODO: should we return an explicit sub-cs # TODO: should we return an explicit sub-cs
# from this fixture task? # from this fixture task?
await tn.start( await n.start(
open_fixture, open_fixture,
fixture, fixture,
nobsws, nobsws,
@ -278,23 +272,11 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above. # to let tasks run **inside** the ws open block above.
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError:
except (
HandshakeError,
ConnectionRejected,
):
log.exception('Retrying connection') log.exception('Retrying connection')
await trio.sleep(0.5) # throttle
except BaseException as _berr: # ws & nursery block ends
berr = _berr
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
raise berr
#|_ws & nursery block ends
nobsws._connected = trio.Event() nobsws._connected = trio.Event()
if cs.cancelled_caught: if cs.cancelled_caught:
log.cancel( log.cancel(
@ -342,25 +324,21 @@ async def open_autorecon_ws(
connetivity errors, or some user defined recv timeout. connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for entered/exitted around each connection reset; eg. for (re)requesting
(re)requesting subscriptions without requiring streaming setup subscriptions without requiring streaming setup code to rerun.
code to rerun.
''' '''
snd: trio.MemorySendChannel snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616) snd, rcv = trio.open_memory_channel(616)
async with ( async with trio.open_nursery() as n:
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
nobsws = NoBsWs( nobsws = NoBsWs(
url, url,
rcv, rcv,
msg_recv_timeout=msg_recv_timeout, msg_recv_timeout=msg_recv_timeout,
) )
await tn.start( await n.start(
partial( partial(
_reconnect_forever, _reconnect_forever,
url, url,
@ -373,10 +351,11 @@ async def open_autorecon_ws(
await nobsws._connected.wait() await nobsws._connected.wait()
assert nobsws._cs assert nobsws._cs
assert nobsws.connected() assert nobsws.connected()
try: try:
yield nobsws yield nobsws
finally: finally:
tn.cancel_scope.cancel() n.cancel_scope.cancel()
''' '''
@ -389,8 +368,8 @@ of msgs over a `NoBsWs`.
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
id: int id: int
jsonrpc: str = '2.0' jsonrpc: str = '2.0'
result: dict|None = None result: Optional[dict] = None
error: dict|None = None error: Optional[dict] = None
@acm @acm

View File

@ -39,7 +39,6 @@ from typing import (
AsyncContextManager, AsyncContextManager,
Awaitable, Awaitable,
Sequence, Sequence,
TYPE_CHECKING,
) )
import trio import trio
@ -76,10 +75,6 @@ from ._sampling import (
uniform_rate_send, uniform_rate_send,
) )
if TYPE_CHECKING:
from tractor._addr import Address
from tractor.msg.types import Aid
class Sub(Struct, frozen=True): class Sub(Struct, frozen=True):
''' '''
@ -357,9 +352,7 @@ async def allocate_persistent_feed(
# yield back control to starting nursery once we receive either # yield back control to starting nursery once we receive either
# some history or a real-time quote. # some history or a real-time quote.
log.info( log.info(f'loading OHLCV history: {fqme}')
f'loading OHLCV history: {fqme!r}\n'
)
await some_data_ready.wait() await some_data_ready.wait()
flume = Flume( flume = Flume(
@ -730,10 +723,7 @@ class Feed(Struct):
async for msg in stream: async for msg in stream:
await tx.send(msg) await tx.send(msg)
async with ( async with trio.open_nursery() as nurse:
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
# spawn a relay task for each stream so that they all # spawn a relay task for each stream so that they all
# multiplex to a common channel. # multiplex to a common channel.
for brokername in mods: for brokername in mods:
@ -796,6 +786,7 @@ async def install_brokerd_search(
@acm @acm
async def maybe_open_feed( async def maybe_open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: str | None = None, loglevel: str | None = None,
@ -849,12 +840,13 @@ async def maybe_open_feed(
@acm @acm
async def open_feed( async def open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: str|None = None, loglevel: str | None = None,
allow_overruns: bool = True, allow_overruns: bool = True,
start_stream: bool = True, start_stream: bool = True,
tick_throttle: float|None = None, # Hz tick_throttle: float | None = None, # Hz
allow_remote_ctl_ui: bool = False, allow_remote_ctl_ui: bool = False,
@ -907,19 +899,19 @@ async def open_feed(
feed.portals[brokermod] = portal feed.portals[brokermod] = portal
# fill out "status info" that the UI can show # fill out "status info" that the UI can show
chan: tractor.Channel = portal.chan host, port = portal.channel.raddr
raddr: Address = chan.raddr if host == '127.0.0.1':
aid: Aid = chan.aid host = 'localhost'
# TAG_feed_status_update
feed.status.update({ feed.status.update({
'actor_id': aid, 'actor_name': portal.channel.uid[0],
'actor_short_id': f'{aid.name}@{aid.pid}', 'host': host,
'ipc': chan.raddr.proto_key, 'port': port,
'ipc_addr': raddr,
'hist_shm': 'NA', 'hist_shm': 'NA',
'rt_shm': 'NA', 'rt_shm': 'NA',
'throttle_hz': tick_throttle, 'throttle_rate': tick_throttle,
}) })
# feed.status.update(init_msg.pop('status', {}))
# (allocate and) connect to any feed bus for this broker # (allocate and) connect to any feed bus for this broker
bus_ctxs.append( bus_ctxs.append(

View File

@ -36,10 +36,10 @@ from ._sharedmem import (
ShmArray, ShmArray,
_Token, _Token,
) )
from piker.accounting import MktPair
if TYPE_CHECKING: if TYPE_CHECKING:
from piker.data.feed import Feed from ..accounting import MktPair
from .feed import Feed
class Flume(Struct): class Flume(Struct):
@ -82,7 +82,7 @@ class Flume(Struct):
# TODO: do we need this really if we can pull the `Portal` from # TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals? # ``tractor``'s internals?
feed: Feed|None = None feed: Feed | None = None
@property @property
def rt_shm(self) -> ShmArray: def rt_shm(self) -> ShmArray:

View File

@ -113,9 +113,9 @@ def validate_backend(
) )
if ep is None: if ep is None:
log.warning( log.warning(
f'Provider backend {mod.name!r} is missing ' f'Provider backend {mod.name} is missing '
f'{daemon_name!r} support?\n' f'{daemon_name} support :(\n'
f'|_module endpoint-func missing: {name!r}\n' f'The following endpoint is missing: {name}'
) )
inits: list[ inits: list[

View File

@ -498,7 +498,6 @@ async def cascade(
func_name: str = func.__name__ func_name: str = func.__name__
async with ( async with (
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
# TODO: might be better to just make a "restart" method where # TODO: might be better to just make a "restart" method where

View File

@ -107,22 +107,17 @@ async def open_piker_runtime(
async with ( async with (
tractor.open_root_actor( tractor.open_root_actor(
# passed through to `open_root_actor` # passed through to ``open_root_actor``
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
name=name, name=name,
start_method=start_method,
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
start_method=start_method,
# XXX NOTE MEMBER DAT der's a perf hit yo!!
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback=True,
# TODO: eventually we should be able to avoid # TODO: eventually we should be able to avoid
# having the root have more then permissions to # having the root have more then permissions to
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
hide_tb=False,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -205,8 +200,7 @@ async def open_pikerd(
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery, tractor.open_nursery() as actor_nursery,
tractor.trionics.collapse_eg(), trio.open_nursery() as service_nursery,
trio.open_nursery() as service_tn,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -217,7 +211,7 @@ async def open_pikerd(
# assign globally for future daemon/task creation # assign globally for future daemon/task creation
Services.actor_n = actor_nursery Services.actor_n = actor_nursery
Services.service_n = service_tn Services.service_n = service_nursery
Services.debug_mode = debug_mode Services.debug_mode = debug_mode
try: try:
@ -227,7 +221,7 @@ async def open_pikerd(
# TODO: is this more clever/efficient? # TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks: # if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd') # await Services.cancel_service('samplerd')
service_tn.cancel_scope.cancel() service_nursery.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
@ -262,10 +256,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> ( ) -> tractor._portal.Portal | ClassVar[Services]:
tractor._portal.Portal
|ClassVar[Services]
):
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self
@ -290,11 +281,10 @@ async def maybe_open_pikerd(
registry_addrs: list[tuple[str, int]] = ( registry_addrs: list[tuple[str, int]] = (
registry_addrs registry_addrs
or or [_default_reg_addr]
[_default_reg_addr]
) )
pikerd_portal: tractor.Portal|None pikerd_portal: tractor.Portal | None
async with ( async with (
open_piker_runtime( open_piker_runtime(
name=query_name, name=query_name,

View File

@ -28,7 +28,6 @@ from contextlib import (
) )
import tractor import tractor
from trio.lowlevel import current_task
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -71,84 +70,69 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
try: async with find_service(
async with find_service( service_name,
service_name, registry_addrs=[('127.0.0.1', 6116)],
registry_addrs=[('127.0.0.1', 6116)], ) as portal:
) as portal: if portal is not None:
if portal is not None:
lock.release()
yield portal
return
log.warning(
f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..'
)
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**pikerd_kwargs,
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
except BaseException as _err:
err = _err
if (
lock.locked()
and
lock.statistics().owner is current_task()
):
log.exception(
f'Releasing stale lock after crash..?'
f'{err!r}\n'
)
lock.release() lock.release()
raise err yield portal
return
log.warning(
f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..'
)
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**pikerd_kwargs,
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
async def spawn_emsd( async def spawn_emsd(

View File

@ -109,7 +109,7 @@ class Services:
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.wait_for_result() ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context

View File

@ -101,15 +101,13 @@ async def open_registry(
if ( if (
not tractor.is_root_process() not tractor.is_root_process()
and and not Registry.addrs
not Registry.addrs
): ):
Registry.addrs.extend(actor.reg_addrs) Registry.addrs.extend(actor.reg_addrs)
if ( if (
ensure_exists ensure_exists
and and not Registry.addrs
not Registry.addrs
): ):
raise RuntimeError( raise RuntimeError(
f"`{uid}` registry should already exist but doesn't?" f"`{uid}` registry should already exist but doesn't?"
@ -148,7 +146,7 @@ async def find_service(
| list[Portal] | list[Portal]
| None | None
): ):
# try:
reg_addrs: list[tuple[str, int]] reg_addrs: list[tuple[str, int]]
async with open_registry( async with open_registry(
addrs=( addrs=(
@ -159,39 +157,22 @@ async def find_service(
or Registry.addrs or Registry.addrs
), ),
) as reg_addrs: ) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
log.info( maybe_portals: list[Portal] | Portal | None
f'Scanning for service {service_name!r}'
)
# attach to existing daemon by name if possible # attach to existing daemon by name if possible
maybe_portals: list[Portal]|Portal|None
async with tractor.find_actor( async with tractor.find_actor(
service_name, service_name,
registry_addrs=reg_addrs, registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref only_first=first_only, # if set only returns single ref
) as maybe_portals: ) as maybe_portals:
if not maybe_portals: if not maybe_portals:
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None yield None
return return
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals yield maybe_portals
# except BaseException as _berr:
# berr = _berr
# log.exception(
# 'tractor.find_actor() failed with,\n'
# )
# raise berr
async def check_for_service( async def check_for_service(
service_name: str, service_name: str,

View File

@ -963,10 +963,7 @@ async def tsdb_backfill(
# concurrently load the provider's most-recent-frame AND any # concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage. # pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with ( async with trio.open_nursery() as tn:
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon( tn.start_soon(
push_latest_frame, push_latest_frame,
dt_eps, dt_eps,
@ -1015,16 +1012,9 @@ async def tsdb_backfill(
int, int,
Duration, Duration,
]|None = config.get('frame_types', None) ]|None = config.get('frame_types', None)
if def_frame_durs: if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe] def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
if def_frame_size != calced_frame_size:
log.warning(
f'Expected frame size {def_frame_size}\n'
f'Rxed frame {calced_frame_size}\n'
)
# await tractor.pause()
else: else:
# use what we calced from first frame above. # use what we calced from first frame above.
def_frame_size = calced_frame_size def_frame_size = calced_frame_size
@ -1053,9 +1043,7 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first # if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb # history frame until the last datum loaded from the tsdb
# continue that now in the background # continue that now in the background
async with trio.open_nursery( async with trio.open_nursery() as tn:
strict_exception_groups=False,
) as tn:
bf_done = await tn.start( bf_done = await tn.start(
partial( partial(
@ -1320,7 +1308,6 @@ async def manage_history(
# sampling period) data set since normally differently # sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently # sampled timeseries can be loaded / process independently
# ;) # ;)
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
log.info( log.info(

View File

@ -517,7 +517,7 @@ def with_dts(
''' '''
return df.with_columns([ return df.with_columns([
pl.col(time_col).shift(1).name.suffix('_prev'), pl.col(time_col).shift(1).suffix('_prev'),
pl.col(time_col).diff().alias('s_diff'), pl.col(time_col).diff().alias('s_diff'),
pl.from_epoch(pl.col(time_col)).alias('dt'), pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([ ]).with_columns([
@ -623,7 +623,7 @@ def detect_vlm_gaps(
) -> pl.DataFrame: ) -> pl.DataFrame:
vnull: pl.DataFrame = df.filter( vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0 pl.col(col) == 0
) )
return vnull return vnull

View File

@ -21,7 +21,6 @@ Main app startup and run.
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
import tractor
import trio import trio
from piker.ui.qt import ( from piker.ui.qt import (
@ -117,7 +116,6 @@ async def _async_main(
needed_brokermods[brokername] = brokers[brokername] needed_brokermods[brokername] = brokers[brokername]
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as root_n, trio.open_nursery() as root_n,
): ):
# set root nursery and task stack for spawning other charts/feeds # set root nursery and task stack for spawning other charts/feeds

View File

@ -33,6 +33,7 @@ import trio
from piker.ui.qt import ( from piker.ui.qt import (
QtCore, QtCore,
QtWidgets,
Qt, Qt,
QLineF, QLineF,
QFrame, QFrame,

View File

@ -1445,10 +1445,7 @@ async def display_symbol_data(
# for pause/resume on mouse interaction # for pause/resume on mouse interaction
rt_chart.feed = feed rt_chart.feed = feed
async with ( async with trio.open_nursery() as ln:
tractor.trionics.collapse_eg(),
trio.open_nursery() as ln,
):
# if available load volume related built-in display(s) # if available load volume related built-in display(s)
vlm_charts: dict[ vlm_charts: dict[
str, str,

View File

@ -22,10 +22,7 @@ from contextlib import asynccontextmanager as acm
from typing import Callable from typing import Callable
import trio import trio
from tractor.trionics import ( from tractor.trionics import gather_contexts
gather_contexts,
collapse_eg,
)
from piker.ui.qt import ( from piker.ui.qt import (
QtCore, QtCore,
@ -210,10 +207,7 @@ async def open_signal_handler(
async for args in recv: async for args in recv:
await async_handler(*args) await async_handler(*args)
async with ( async with trio.open_nursery() as tn:
collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon(proxy_to_handler) tn.start_soon(proxy_to_handler)
async with send: async with send:
yield yield
@ -248,7 +242,6 @@ async def open_handlers(
widget: QWidget widget: QWidget
streams: list[trio.abc.ReceiveChannel] streams: list[trio.abc.ReceiveChannel]
async with ( async with (
collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
gather_contexts([ gather_contexts([
open_event_stream( open_event_stream(

View File

@ -18,11 +18,10 @@
Feed status and controls widget(s) for embedding in a UI-pane. Feed status and controls widget(s) for embedding in a UI-pane.
""" """
from __future__ import annotations from __future__ import annotations
from typing import ( from textwrap import dedent
Any, from typing import TYPE_CHECKING
TYPE_CHECKING,
)
# from PyQt5.QtCore import Qt # from PyQt5.QtCore import Qt
@ -50,55 +49,35 @@ def mk_feed_label(
a feed control protocol. a feed control protocol.
''' '''
status: dict[str, Any] = feed.status status = feed.status
assert status assert status
# SO tips on ws/nls, msg = dedent("""
# https://stackoverflow.com/a/15721400 actor: **{actor_name}**\n
ws: str = '&nbsp;' |_ @**{host}:{port}**\n
# nl: str = '<br>' # dun work? """)
actor_info_repr: str = (
f')> **{status["actor_short_id"]}**\n'
'\n' # bc md?
)
# fields to select *IN* for display for key, val in status.items():
# (see `.data.feed.open_feed()` status if key in ('host', 'port', 'actor_name'):
# update -> TAG_feed_status_update) continue
for key in [ msg += f'\n|_ {key}: **{{{key}}}**\n'
'ipc',
'hist_shm',
'rt_shm',
'throttle_hz',
]:
# NOTE, the 2nd key is filled via `.format()` updates.
actor_info_repr += (
f'\n' # bc md?
f'{ws}|_{key}: **{{{key}}}**\n'
)
# ^TODO? formatting and content..
# -[ ] showing which fqme is "forward" on the
# chart/fsp/order-mode?
# '|_ flows: **{symbols}**\n'
#
# -[x] why isn't the indent working?
# => markdown, now solved..
feed_label = FormatLabel( feed_label = FormatLabel(
fmt_str=actor_info_repr, fmt_str=msg,
# |_ streams: **{symbols}**\n
font=_font.font, font=_font.font,
font_size=_font_small.px_size, font_size=_font_small.px_size,
font_color='default_lightest', font_color='default_lightest',
) )
# ?TODO, remove this?
# form.vbox.setAlignment(feed_label, Qt.AlignBottom) # form.vbox.setAlignment(feed_label, Qt.AlignBottom)
# form.vbox.setAlignment(Qt.AlignBottom) # form.vbox.setAlignment(Qt.AlignBottom)
# _ = chart.height() - ( _ = chart.height() - (
# form.height() + form.height() +
# form.fill_bar.height() form.fill_bar.height()
# # feed_label.height() # feed_label.height()
# ) )
feed_label.format(**feed.status) feed_label.format(**feed.status)
return feed_label return feed_label

View File

@ -600,7 +600,6 @@ async def open_fsp_admin(
kwargs=kwargs, kwargs=kwargs,
) as (cache_hit, cluster_map), ) as (cache_hit, cluster_map),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
if cache_hit: if cache_hit:
@ -614,8 +613,6 @@ async def open_fsp_admin(
) )
try: try:
yield admin yield admin
# ??TODO, does this *need* to be inside a finally?
finally: finally:
# terminate all tasks via signals # terminate all tasks via signals
for key, entry in admin._registry.items(): for key, entry in admin._registry.items():

View File

@ -285,20 +285,18 @@ class FormatLabel(QLabel):
font_size: int, font_size: int,
font_color: str, font_color: str,
use_md: bool = True,
parent=None, parent=None,
) -> None: ) -> None:
super().__init__(parent) super().__init__(parent)
# by default set the format string verbatim and expect user # by default set the format string verbatim and expect user to
# to call ``.format()`` later (presumably they'll notice the # call ``.format()`` later (presumably they'll notice the
# unformatted content if ``fmt_str`` isn't meant to be # unformatted content if ``fmt_str`` isn't meant to be
# unformatted). # unformatted).
self.fmt_str = fmt_str self.fmt_str = fmt_str
# self.setText(fmt_str) # ?TODO, why here? self.setText(fmt_str)
self.setStyleSheet( self.setStyleSheet(
f"""QLabel {{ f"""QLabel {{
@ -308,10 +306,9 @@ class FormatLabel(QLabel):
""" """
) )
self.setFont(_font.font) self.setFont(_font.font)
if use_md: self.setTextFormat(
self.setTextFormat( Qt.TextFormat.MarkdownText
Qt.TextFormat.MarkdownText )
)
self.setMargin(0) self.setMargin(0)
self.setSizePolicy( self.setSizePolicy(
@ -319,10 +316,7 @@ class FormatLabel(QLabel):
size_policy.Expanding, size_policy.Expanding,
) )
self.setAlignment( self.setAlignment(
Qt.AlignLeft Qt.AlignVCenter | Qt.AlignLeft
|
Qt.AlignBottom
# Qt.AlignVCenter
) )
self.setText(self.fmt_str) self.setText(self.fmt_str)

View File

@ -15,8 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Remote control tasks for sending annotations (and maybe more cmds) to Remote control tasks for sending annotations (and maybe more cmds)
a chart from some other actor. to a chart from some other actor.
''' '''
from __future__ import annotations from __future__ import annotations
@ -32,7 +32,6 @@ from typing import (
) )
import tractor import tractor
import trio
from tractor import trionics from tractor import trionics
from tractor import ( from tractor import (
Portal, Portal,
@ -317,9 +316,7 @@ class AnnotCtl(Struct):
) )
yield aid yield aid
finally: finally:
# async ipc send op await self.remove(aid)
with trio.CancelScope(shield=True):
await self.remove(aid)
async def redraw( async def redraw(
self, self,

View File

@ -15,8 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
qompleterz: embeddable search and complete using trio, Qt and qompleterz: embeddable search and complete using trio, Qt and rapidfuzz.
rapidfuzz.
""" """
@ -47,7 +46,6 @@ import time
from pprint import pformat from pprint import pformat
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -55,7 +53,7 @@ from piker.ui.qt import (
size_policy, size_policy,
align_flag, align_flag,
Qt, Qt,
# QtCore, QtCore,
QtWidgets, QtWidgets,
QModelIndex, QModelIndex,
QItemSelectionModel, QItemSelectionModel,
@ -922,10 +920,7 @@ async def fill_results(
# issue multi-provider fan-out search request and place # issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers # "searching.." statuses on outstanding results providers
async with ( async with trio.open_nursery() as n:
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
for provider, (search, pause) in ( for provider, (search, pause) in (
_searcher_cache.copy().items() _searcher_cache.copy().items()
@ -949,7 +944,7 @@ async def fill_results(
status_field='-> searchin..', status_field='-> searchin..',
) )
await tn.start( await n.start(
pack_matches, pack_matches,
view, view,
has_results, has_results,
@ -1009,14 +1004,12 @@ async def handle_keyboard_input(
view.set_font_size(searchbar.dpi_font.px_size) view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616) send, recv = trio.open_memory_channel(616)
async with ( async with trio.open_nursery() as n:
tractor.trionics.collapse_eg(), # needed?
trio.open_nursery() as tn
):
# start a background multi-searcher task which receives # start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and # patterns relayed from this keyboard input handler and
# async updates the completer view's results. # async updates the completer view's results.
tn.start_soon( n.start_soon(
partial( partial(
fill_results, fill_results,
searchw, searchw,

View File

@ -269,8 +269,6 @@ def hcolor(name: str) -> str:
# default ohlc-bars/curve gray # default ohlc-bars/curve gray
'bracket': '#666666', # like the logo 'bracket': '#666666', # like the logo
'pikers': '#616161', # a trader shade of..
'beast': '#161616', # in the dark alone.
# bluish # bluish
'charcoal': '#36454F', 'charcoal': '#36454F',

View File

@ -21,7 +21,6 @@ Chart trading, the only way to scalp.
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from decimal import Decimal
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import time import time
@ -42,6 +41,7 @@ from piker.accounting import (
Position, Position,
mk_allocator, mk_allocator,
MktPair, MktPair,
Symbol,
) )
from piker.clearing import ( from piker.clearing import (
open_ems, open_ems,
@ -143,15 +143,6 @@ class OrderMode:
} }
_staged_order: Order | None = None _staged_order: Order | None = None
@property
def curr_mkt(self) -> MktPair:
'''
Deliver the currently selected `MktPair` according
chart state.
'''
return self.chart.linked.mkt
def on_level_change_update_next_order_info( def on_level_change_update_next_order_info(
self, self,
level: float, level: float,
@ -181,11 +172,7 @@ class OrderMode:
line.update_labels(order_info) line.update_labels(order_info)
# update bound-in staged order # update bound-in staged order
mkt: MktPair = self.curr_mkt order.price = level
order.price: Decimal = mkt.quantize(
size=level,
quantity_type='price',
)
order.size = order_info['size'] order.size = order_info['size']
# when an order is changed we flip the settings side-pane to # when an order is changed we flip the settings side-pane to
@ -200,9 +187,7 @@ class OrderMode:
) -> LevelLine: ) -> LevelLine:
# TODO, if we instead just always decimalize at the ems layer level = order.price
# we can avoid this back-n-forth casting?
level = float(order.price)
line = order_line( line = order_line(
chart or self.chart, chart or self.chart,
@ -239,11 +224,7 @@ class OrderMode:
# the order mode allocator but we still need to update the # the order mode allocator but we still need to update the
# "staged" order message we'll send to the ems # "staged" order message we'll send to the ems
def update_order_price(y: float) -> None: def update_order_price(y: float) -> None:
mkt: MktPair = self.curr_mkt order.price = y
order.price: Decimal = mkt.quantize(
size=y,
quantity_type='price',
)
line._on_level_change = update_order_price line._on_level_change = update_order_price
@ -294,31 +275,34 @@ class OrderMode:
chart = cursor.linked.chart chart = cursor.linked.chart
if ( if (
not chart not chart
and and cursor
cursor and cursor.active_plot
and
cursor.active_plot
): ):
return return
chart = cursor.active_plot chart = cursor.active_plot
price: float = cursor._datum_xy[1] price = cursor._datum_xy[1]
if not price: if not price:
# zero prices are not supported by any means # zero prices are not supported by any means
# since that's illogical / a no-op. # since that's illogical / a no-op.
return return
mkt: MktPair = self.chart.linked.mkt
# NOTE : we could also use instead,
# mkt.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision # TODO: should we be enforcing this precision
# at a different layer in the stack? # at a different layer in the stack? right now
# |_ might require `MktPair` tracking in the EMS? # any precision error will literally be relayed
# |_ right now any precision error will be relayed # all the way back from the backend.
# all the way back from the backend and vice-versa..
# price = round(
mkt: MktPair = self.curr_mkt price,
price: Decimal = mkt.quantize( ndigits=mkt.price_tick_digits,
size=price,
quantity_type='price',
) )
order = self._staged_order = Order( order = self._staged_order = Order(
action=action, action=action,
price=price, price=price,
@ -394,7 +378,7 @@ class OrderMode:
'oid': oid, 'oid': oid,
}) })
if float(order.price) <= 0: if order.price <= 0:
log.error( log.error(
'*!? Invalid `Order.price <= 0` ?!*\n' '*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form # TODO: make this present multi-line in object form
@ -531,15 +515,14 @@ class OrderMode:
# if an order msg is provided update the line # if an order msg is provided update the line
# **from** that msg. # **from** that msg.
if order: if order:
price: float = float(order.price) if order.price <= 0:
if price <= 0:
log.error(f'Order has 0 price, cancelling..\n{order}') log.error(f'Order has 0 price, cancelling..\n{order}')
self.cancel_orders([order.oid]) self.cancel_orders([order.oid])
return None return None
line.set_level(price) line.set_level(order.price)
self.on_level_change_update_next_order_info( self.on_level_change_update_next_order_info(
level=price, level=order.price,
line=line, line=line,
order=order, order=order,
# use the corresponding position tracker for the # use the corresponding position tracker for the
@ -555,13 +538,14 @@ class OrderMode:
def on_fill( def on_fill(
self, self,
uuid: str, uuid: str,
price: float, price: float,
time_s: float, time_s: float,
pointing: str | None = None, pointing: str | None = None,
) -> bool: ) -> None:
''' '''
Fill msg handler. Fill msg handler.
@ -574,83 +558,60 @@ class OrderMode:
- update fill bar size - update fill bar size
''' '''
# XXX WARNING XXX dialog = self.dialogs[uuid]
# if a `Status(resp='error')` arrives *before* this
# fill-status, the `.dialogs` entry may have already been
# popped and thus the below will skipped.
#
# NOTE, to avoid this confusing scenario ensure that any
# errors delivered thru from the broker-backend are not just
# "noisy reporting" (like is very common from IB..) and are
# instead ONLY errors-causing-order-dialog-cancellation!
if not (dialog := self.dialogs.get(uuid)):
log.warning(
f'Order was already cleared from `.dialogs` ??\n'
f'uuid: {uuid!r}\n'
)
return False
lines = dialog.lines lines = dialog.lines
chart = self.chart chart = self.chart
if not lines: # XXX: seems to fail on certain types of races?
log.warn("No line(s) for order {uuid}!?")
return False
# update line state(s)
#
# ?XXX this fails on certain types of races?
# assert len(lines) == 2 # assert len(lines) == 2
flume: Flume = self.feed.flumes[chart.linked.mkt.fqme] if lines:
_, _, ratio = flume.get_ds_info() flume: Flume = self.feed.flumes[chart.linked.mkt.fqme]
_, _, ratio = flume.get_ds_info()
for chart, shm in [ for chart, shm in [
(self.chart, flume.rt_shm), (self.chart, flume.rt_shm),
(self.hist_chart, flume.hist_shm), (self.hist_chart, flume.hist_shm),
]: ]:
viz = chart.get_viz(chart.name) viz = chart.get_viz(chart.name)
index_field = viz.index_field index_field = viz.index_field
arr = shm.array arr = shm.array
# TODO: borked for int index based.. # TODO: borked for int index based..
index = flume.get_index(time_s, arr) index = flume.get_index(time_s, arr)
# get absolute index for arrow placement # get absolute index for arrow placement
arrow_index = arr[index_field][index] arrow_index = arr[index_field][index]
self.arrows.add( self.arrows.add(
chart.plotItem, chart.plotItem,
uuid, uuid,
arrow_index, arrow_index,
price, price,
pointing=pointing, pointing=pointing,
color=lines[0].color color=lines[0].color
) )
else:
log.warn("No line(s) for order {uuid}!?")
def on_cancel( def on_cancel(
self, self,
uuid: str, uuid: str
) -> bool: ) -> None:
msg: Order|None = self.client._sent_orders.pop(uuid, None) msg: Order = self.client._sent_orders.pop(uuid, None)
if msg is None:
if msg is not None:
self.lines.remove_line(uuid=uuid)
self.chart.linked.cursor.show_xhair()
dialog = self.dialogs.pop(uuid, None)
if dialog:
dialog.last_status_close()
else:
log.warning( log.warning(
f'Received cancel for unsubmitted order {pformat(msg)}' f'Received cancel for unsubmitted order {pformat(msg)}'
) )
return False
# remove GUI line, show cursor.
self.lines.remove_line(uuid=uuid)
self.chart.linked.cursor.show_xhair()
# remove msg dialog (history)
dialog: Dialog|None = self.dialogs.pop(uuid, None)
if dialog:
dialog.last_status_close()
return True
def cancel_orders_under_cursor(self) -> list[str]: def cancel_orders_under_cursor(self) -> list[str]:
return self.cancel_orders( return self.cancel_orders(
@ -720,9 +681,9 @@ class OrderMode:
) -> Dialog | None: ) -> Dialog | None:
# NOTE: the `.order` attr **must** be set with the # NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded. # equivalent order msg in order to be loaded.
order: Order = msg.req order = msg.req
oid = str(msg.oid) oid = str(msg.oid)
symbol: str = order.symbol symbol = order.symbol
# TODO: MEGA UGGG ZONEEEE! # TODO: MEGA UGGG ZONEEEE!
src = msg.src src = msg.src
@ -741,22 +702,13 @@ class OrderMode:
order.oid = str(order.oid) order.oid = str(order.oid)
order.brokers = [brokername] order.brokers = [brokername]
# ?TODO? change this over to `MktPair`, but it's gonna be # TODO: change this over to `MktPair`, but it's
# tough since we don't have any such data really in our # gonna be tough since we don't have any such data
# clearing msg schema.. # really in our clearing msg schema..
# BUT WAIT! WHY do we even want/need this!? order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# order.symbol = self.curr_mkt info={},
# )
# XXX, the old approach.. which i don't quire member why..
# -[ ] verify we for sure don't require this any more!
# |_https://github.com/pikers/piker/issues/517
#
# order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# info={},
# )
maybe_dialog: Dialog | None = self.submit_order( maybe_dialog: Dialog | None = self.submit_order(
send_msg=False, send_msg=False,
order=order, order=order,
@ -814,7 +766,6 @@ async def open_order_mode(
brokerd_accounts, brokerd_accounts,
ems_dialog_msgs, ems_dialog_msgs,
), ),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
@ -1079,23 +1030,13 @@ async def process_trade_msg(
if name in ( if name in (
'position', 'position',
): ):
mkt: MktPair = mode.chart.linked.mkt sym: MktPair = mode.chart.linked.mkt
pp_msg_symbol = msg['symbol'].lower() pp_msg_symbol = msg['symbol'].lower()
pp_msg_bsmktid = msg['bs_mktid'] fqme = sym.fqme
fqme = mkt.fqme broker = sym.broker
broker = mkt.broker
if ( if (
# match on any backed-specific(-unique)-ID first!
(
pp_msg_bsmktid
and
mkt.bs_mktid == pp_msg_bsmktid
)
or
# OW try against what's provided as an FQME..
pp_msg_symbol == fqme pp_msg_symbol == fqme
or or pp_msg_symbol == fqme.removesuffix(f'.{broker}')
pp_msg_symbol == fqme.removesuffix(f'.{broker}')
): ):
log.info( log.info(
f'Loading position for `{fqme}`:\n' f'Loading position for `{fqme}`:\n'
@ -1118,7 +1059,7 @@ async def process_trade_msg(
return return
msg = Status(**msg) msg = Status(**msg)
# resp: str = msg.resp resp = msg.resp
oid = msg.oid oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid) dialog: Dialog = mode.dialogs.get(oid)
@ -1160,7 +1101,7 @@ async def process_trade_msg(
) )
) )
): ):
msg.req: Order = order msg.req = order
dialog: ( dialog: (
Dialog Dialog
# NOTE: on an invalid order submission (eg. # NOTE: on an invalid order submission (eg.
@ -1182,33 +1123,20 @@ async def process_trade_msg(
mode.on_submit(oid) mode.on_submit(oid)
case Status(resp='error'): case Status(resp='error'):
# TODO: parse into broker-side msg, or should we
# expect it to just be **that** msg verbatim (since
# we'd presumably have only 1 `Error` msg-struct)
broker_msg: dict = msg.brokerd_msg
# XXX NOTE, this presumes the rxed "error" is
# order-dialog-cancel-causing, THUS backends much ONLY
# relay errors of this "severity"!!
log.error(
f'Order errored ??\n'
f'oid: {oid!r}\n'
f'\n'
f'{pformat(broker_msg)}\n'
f'\n'
f'=> CANCELLING ORDER DIALOG <=\n'
# from tractor.devx.pformat import ppfmt
# !TODO LOL, wtf the msg is causing
# a recursion bug!
# -[ ] get this shit on msgspec stat!
# f'{ppfmt(broker_msg)}'
)
# do all the things for a cancel: # do all the things for a cancel:
# - drop order-msg dialog from client table # - drop order-msg dialog from client table
# - delete level line from view # - delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
# TODO: parse into broker-side msg, or should we
# expect it to just be **that** msg verbatim (since
# we'd presumably have only 1 `Error` msg-struct)
broker_msg: dict = msg.brokerd_msg
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
case Status(resp='canceled'): case Status(resp='canceled'):
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
@ -1223,10 +1151,10 @@ async def process_trade_msg(
# TODO: UX for a "pending" clear/live order # TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmtmsg}') log.info(f'Dark order triggered for {fmtmsg}')
# TODO: do the struct-msg version, blah blah..
# req=Order(exec_mode='live', action='alert') as req,
case Status( case Status(
resp='triggered', resp='triggered',
# TODO: do the struct-msg version, blah blah..
# req=Order(exec_mode='live', action='alert') as req,
req={ req={
'exec_mode': 'live', 'exec_mode': 'live',
'action': 'alert', 'action': 'alert',
@ -1238,7 +1166,7 @@ async def process_trade_msg(
tm = time.time() tm = time.time()
mode.on_fill( mode.on_fill(
oid, oid,
price=float(req.price), price=req.price,
time_s=tm, time_s=tm,
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
@ -1293,7 +1221,7 @@ async def process_trade_msg(
tm = details['broker_time'] tm = details['broker_time']
mode.on_fill( mode.on_fill(
oid, oid,
price=float(details['price']), price=details['price'],
time_s=tm, time_s=tm,
pointing='up' if action == 'buy' else 'down', pointing='up' if action == 'buy' else 'down',
) )

View File

@ -77,42 +77,54 @@ dependencies = [
"pyvnc", "pyvnc",
] ]
# ------ dependencies ------ # ------ dependencies ------
# NOTE, by default we ship only a "headless" deps set bc
# the `uis` group is not listed in the optional set.
# [optional-dependencies]
# uis = [] # TODO: add an `--only daemon` group for running non-ui / pikerd
# ?TODO? really we should be able to mv this `uis` group # service tree in distributed mode B)
# to be under [optional-dependencies] and then include
# it in the dev deps?
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies # https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# -> uis should be included in pubbed pkgs.
# [ ] uv seems to have no way to do this though?
# TODO? move to a `uv.toml`?
[tool.uv]
# https://docs.astral.sh/uv/reference/settings/#python-preference
python-preference = 'system'
# https://docs.astral.sh/uv/reference/settings/#python-downloads
python-downloads = 'manual'
# https://docs.astral.sh/uv/concepts/projects/dependencies/#default-groups
default-groups = [
'uis',
]
# ------ tool.uv ------
[dependency-groups] [dependency-groups]
uis = [ uis = [
"pyqtgraph", # https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
"qdarkstyle >=3.0.2, <4.0.0", # TODO: make sure the levenshtein shit compiles on nix..
"pyqt6 >=6.7.0, <7.0.0", # rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
"rapidfuzz >=3.2.0, <4.0.0",
"qdarkstyle >=3.0.2, <4.0.0",
"pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph",
# fuzzy search # for consideration,
"rapidfuzz >=3.2.0, <4.0.0", # - 'visidata'
"qdarkstyle >=3.0.2, <4.0.0",
"pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph",
] ]
# dev deps enabled by `uv --dev` # TODO: a toolset that makes debugging a `pikerd` service (tree) easy
# https://docs.astral.sh/uv/concepts/projects/dependencies/#development-dependencies # to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
#
# console ehancements and eventually remote debugging extras/helpers.
# use `uv --dev` to enable
repl = [
# debug
"pdbp >=1.5.0, <2.0.0",
"greenback >=1.1.1, <2.0.0",
"xonsh",
"prompt-toolkit ==3.0.40",
"pyperclip>=1.9.0",
]
testing = [
"pytest",
]
de = [
# DE-specific
"i3ipc>=2.2.1",
]
dev = [ dev = [
# https://docs.astral.sh/uv/concepts/projects/dependencies/#development-dependencies # https://docs.astral.sh/uv/concepts/projects/dependencies/#development-dependencies
"cython >=3.0.0, <4.0.0", "cython >=3.0.0, <4.0.0",
@ -124,34 +136,13 @@ dev = [
{include-group = 'testing'}, {include-group = 'testing'},
{include-group = 'de'}, {include-group = 'de'},
] ]
repl = [
# `tractor`'s debugger
"pdbp >=1.8.2, <2.0.0",
"greenback >=1.1.1, <2.0.0",
# @goodboy's preferred console toolz
"xonsh",
"prompt-toolkit ==3.0.40",
"pyperclip>=1.9.0",
# ?TODO, new stuff to consider..
# "visidata" # console numerics
# "xxh" # for remote `xonsh`-ing
# "rsyscall" # (eventual) optional `tractor` backend
# - an actor-runtime-ctl console like BEAM/OTP
]
testing = [
"pytest",
]
de = [ # (linux) specific DEs
"i3ipc>=2.2.1",
]
lint = [ lint = [
# XXX, with flake.nix needs to be from nixpkgs # XXX, with flake.nix needs to be from nixpkgs
"ruff>=0.9.6" "ruff>=0.9.6"
#
# ^TODO? these markers don't work; use deps-flags for now?
# ; os_name != 'nixos' and platform_system != 'NixOS'", # ; os_name != 'nixos' and platform_system != 'NixOS'",
# ?TODO? since ^ markers won't work, use a deps-flags to toggle for # ; defined('IN_NIX_SHELL')",
# now.
] ]
dbs = [ dbs = [
"elasticsearch >=8.9.0, <9.0.0", "elasticsearch >=8.9.0, <9.0.0",
@ -186,6 +177,15 @@ include = ["piker"]
# ------ tool.hatch ------ # ------ tool.hatch ------
# TODO? move to a `uv.toml`?
[tool.uv]
python-preference = 'system'
python-downloads = 'manual'
# https://docs.astral.sh/uv/concepts/projects/dependencies/#default-groups
default-groups = ['uis', 'dev']
# ------ tool.uv ------
[tool.uv.sources] [tool.uv.sources]
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" } pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" } tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }

View File

@ -1,22 +1,4 @@
# piker: trading gear for hackers """
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
A per-display, DPI (scaling) info dumper.
Resource list for mucking with DPIs on multiple screens: Resource list for mucking with DPIs on multiple screens:
- https://stackoverflow.com/questions/42141354/convert-pixel-size-to-point-size-for-fonts-on-multiple-platforms - https://stackoverflow.com/questions/42141354/convert-pixel-size-to-point-size-for-fonts-on-multiple-platforms
@ -30,86 +12,89 @@ Resource list for mucking with DPIs on multiple screens:
- https://stackoverflow.com/questions/16561879/what-is-the-difference-between-logicaldpix-and-physicaldpix-in-qt - https://stackoverflow.com/questions/16561879/what-is-the-difference-between-logicaldpix-and-physicaldpix-in-qt
- https://doc.qt.io/qt-5/qguiapplication.html#screenAt - https://doc.qt.io/qt-5/qguiapplication.html#screenAt
''' """
from pyqtgraph import QtGui from pyqtgraph import QtGui
from PyQt6 import ( from PyQt5.QtCore import (
QtCore, Qt, QCoreApplication
QtWidgets,
)
from PyQt6.QtCore import (
Qt,
QCoreApplication,
QSize,
QRect,
) )
# Proper high DPI scaling is available in Qt >= 5.6.0. This attibute # Proper high DPI scaling is available in Qt >= 5.6.0. This attibute
# must be set before creating the application # must be set before creating the application
if hasattr(Qt, 'AA_EnableHighDpiScaling'): if hasattr(Qt, 'AA_EnableHighDpiScaling'):
QCoreApplication.setAttribute( QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True)
Qt.AA_EnableHighDpiScaling,
True,
)
if hasattr(Qt, 'AA_UseHighDpiPixmaps'): if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
QCoreApplication.setAttribute( QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
Qt.AA_UseHighDpiPixmaps,
True,
)
app = QtWidgets.QApplication([])
window = QtWidgets.QMainWindow() app = QtGui.QApplication([])
main_widget = QtWidgets.QWidget() window = QtGui.QMainWindow()
main_widget = QtGui.QWidget()
window.setCentralWidget(main_widget) window.setCentralWidget(main_widget)
window.show() window.show()
pxr: float = main_widget.devicePixelRatioF() pxr = main_widget.devicePixelRatioF()
# explicitly get main widget and primary displays # screen_num = app.desktop().screenNumber()
current_screen: QtGui.QScreen = app.screenAt( # screen = app.screens()[screen_num]
main_widget.geometry().center()
screen = app.screenAt(main_widget.geometry().center())
name = screen.name()
size = screen.size()
geo = screen.availableGeometry()
phydpi = screen.physicalDotsPerInch()
logdpi = screen.logicalDotsPerInch()
print(
# f'screen number: {screen_num}\n',
f'screen name: {name}\n'
f'screen size: {size}\n'
f'screen geometry: {geo}\n\n'
f'devicePixelRationF(): {pxr}\n'
f'physical dpi: {phydpi}\n'
f'logical dpi: {logdpi}\n'
) )
primary_screen: QtGui.QScreen = app.primaryScreen()
screen: QtGui.QScreen print('-'*50)
for screen in app.screens():
name: str = screen.name()
model: str = screen.model().rstrip()
size: QSize = screen.size()
geo: QRect = screen.availableGeometry()
phydpi: float = screen.physicalDotsPerInch()
logdpi: float = screen.logicalDotsPerInch()
is_primary: bool = screen is primary_screen
is_current: bool = screen is current_screen
print( screen = app.primaryScreen()
f'------ screen name: {name} ------\n'
f'|_primary: {is_primary}\n'
f' _current: {is_current}\n'
f' _model: {model}\n'
f' _screen size: {size}\n'
f' _screen geometry: {geo}\n'
f' _devicePixelRationF(): {pxr}\n'
f' _physical dpi: {phydpi}\n'
f' _logical dpi: {logdpi}\n'
)
# app-wide font info name = screen.name()
size = screen.size()
geo = screen.availableGeometry()
phydpi = screen.physicalDotsPerInch()
logdpi = screen.logicalDotsPerInch()
print(
# f'screen number: {screen_num}\n',
f'screen name: {name}\n'
f'screen size: {size}\n'
f'screen geometry: {geo}\n\n'
f'devicePixelRationF(): {pxr}\n'
f'physical dpi: {phydpi}\n'
f'logical dpi: {logdpi}\n'
)
# app-wide font
font = QtGui.QFont("Hack") font = QtGui.QFont("Hack")
# use pixel size to be cross-resolution compatible? # use pixel size to be cross-resolution compatible?
font.setPixelSize(6) font.setPixelSize(6)
fm = QtGui.QFontMetrics(font)
fontdpi: float = fm.fontDpi()
font_h: int = fm.height()
string: str = '10000' fm = QtGui.QFontMetrics(font)
str_br: QtCore.QRect = fm.boundingRect(string) fontdpi = fm.fontDpi()
str_w: int = str_br.width() font_h = fm.height()
string = '10000'
str_br = fm.boundingRect(string)
str_w = str_br.width()
print( print(
f'------ global font settings ------\n' # f'screen number: {screen_num}\n',
f'font dpi: {fontdpi}\n' f'font dpi: {fontdpi}\n'
f'font height: {font_h}\n' f'font height: {font_h}\n'
f'string bounding rect: {str_br}\n' f'string bounding rect: {str_br}\n'

1
tags
View File

@ -1 +0,0 @@
TAG_feed_status_update ./piker/data/feed.py /TAG_feed_status_update/

View File

@ -15,12 +15,6 @@ from piker.service import (
from piker.log import get_console_log from piker.log import get_console_log
# include `tractor`'s built-in fixtures!
pytest_plugins: tuple[str] = (
"tractor._testing.pytest",
)
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel', parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing") default=None, help="logging level to set when testing")

View File

@ -12,14 +12,12 @@ from piker import config
from piker.accounting import ( from piker.accounting import (
Account, Account,
calc, calc,
open_account,
load_account,
load_account_from_ledger,
open_trade_ledger,
Position, Position,
TransactionLedger, TransactionLedger,
open_trade_ledger,
load_account,
load_account_from_ledger,
) )
import tractor
def test_root_conf_networking_section( def test_root_conf_networking_section(
@ -55,17 +53,12 @@ def test_account_file_default_empty(
) )
def test_paper_ledger_position_calcs( def test_paper_ledger_position_calcs(
fq_acnt: tuple[str, str], fq_acnt: tuple[str, str],
debug_mode: bool,
): ):
broker: str broker: str
acnt_name: str acnt_name: str
broker, acnt_name = fq_acnt broker, acnt_name = fq_acnt
accounts_path: Path = ( accounts_path: Path = config.repodir() / 'tests' / '_inputs'
config.repodir()
/ 'tests'
/ '_inputs' # tests-local-subdir
)
ldr: TransactionLedger ldr: TransactionLedger
with ( with (
@ -84,7 +77,6 @@ def test_paper_ledger_position_calcs(
ledger=ldr, ledger=ldr,
_fp=accounts_path, _fp=accounts_path,
debug_mode=debug_mode,
) as (dfs, ledger), ) as (dfs, ledger),
@ -110,87 +102,3 @@ def test_paper_ledger_position_calcs(
df = dfs[xrp] df = dfs[xrp]
assert df['cumsize'][-1] == 0 assert df['cumsize'][-1] == 0
assert pos.cumsize == 0 assert pos.cumsize == 0
@pytest.mark.parametrize(
'fq_acnt',
[
('ib', 'algopaper'),
],
)
def test_ib_account_with_duplicated_mktids(
fq_acnt: tuple[str, str],
debug_mode: bool,
):
# ?TODO, once we start symcache-incremental-update-support?
# from piker.data import (
# open_symcache,
# )
#
# async def main():
# async with (
# # TODO: do this as part of `open_account()`!?
# open_symcache(
# 'ib',
# only_from_memcache=True,
# ) as symcache,
# ):
from piker.brokers.ib.ledger import (
tx_sort,
# ?TODO, once we want to pull lowlevel txns and process them?
# norm_trade_records,
# update_ledger_from_api_trades,
)
broker: str
acnt_id: str = 'algopaper'
broker, acnt_id = fq_acnt
accounts_def = config.load_accounts([broker])
assert accounts_def[f'{broker}.{acnt_id}']
ledger: TransactionLedger
acnt: Account
with (
tractor.devx.maybe_open_crash_handler(pdb=debug_mode),
open_trade_ledger(
'ib',
acnt_id,
tx_sort=tx_sort,
# TODO, eventually incrementally updated for IB..
# symcache=symcache,
symcache=None,
allow_from_sync_code=True,
) as ledger,
open_account(
'ib',
acnt_id,
write_on_exit=True,
) as acnt,
):
# per input params
symcache = ledger.symcache
assert not (
symcache.pairs
or
symcache.pairs
or
symcache.mktmaps
)
# re-compute all positions that have changed state.
# TODO: likely we should change the API to return the
# position updates from `.update_from_ledger()`?
active, closed = acnt.dump_active()
# breakpoint()
# TODO, (see above imports as well) incremental update from
# (updated) ledger?
# -[ ] pull some code from `.ib.broker` content.

View File

@ -42,7 +42,7 @@ from piker.accounting import (
unpack_fqme, unpack_fqme,
) )
from piker.accounting import ( from piker.accounting import (
open_account, open_pps,
Position, Position,
) )
@ -136,7 +136,7 @@ def load_and_check_pos(
) -> None: ) -> None:
with open_account(ppmsg.broker, ppmsg.account) as table: with open_pps(ppmsg.broker, ppmsg.account) as table:
if ppmsg.size == 0: if ppmsg.size == 0:
assert ppmsg.symbol not in table.pps assert ppmsg.symbol not in table.pps
@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules # NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`. # import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re: except tractor.RemoteActorError as re:
assert re.type is ModuleNotFoundError assert re.type == ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme) run_and_tollerate_cancels(load_bad_fqme)

View File

@ -142,12 +142,7 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
# async with tractor.open_nursery() as n: # async with tractor.open_nursery() as n:
# await n.run_in_actor('other', intermittently_refresh_tokens) # await n.run_in_actor('other', intermittently_refresh_tokens)
async with ( async with trio.open_nursery() as n:
tractor.trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
) as n
):
quoter = await qt.stock_quoter(client, us_symbols) quoter = await qt.stock_quoter(client, us_symbols)
@ -388,9 +383,7 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
else: else:
symbols = [tmx_symbols] symbols = [tmx_symbols]
async with trio.open_nursery( async with trio.open_nursery() as n:
strict_exception_groups=False,
) as n:
for syms, func in zip(symbols, stream_what): for syms, func in zip(symbols, stream_what):
n.start_soon(func, feed, syms) n.start_soon(func, feed, syms)

10
uv.lock
View File

@ -955,16 +955,16 @@ wheels = [
[[package]] [[package]]
name = "pdbp" name = "pdbp"
version = "1.8.2" version = "1.8.1"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" }, { name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "pygments" }, { name = "pygments" },
{ name = "tabcompleter" }, { name = "tabcompleter" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/50/91/2d614b0db12840d646159f65510415ade0db9db595d6dee3eac60dfe9302/pdbp-1.8.2.tar.gz", hash = "sha256:367c25c17555d3ac1f024b9ad494ff50e6e20f6494a84741487f3e6596d88f94", size = 25843, upload-time = "2026-01-14T03:10:28.134Z" } sdist = { url = "https://files.pythonhosted.org/packages/fc/f5/794ef06a84b4aea5619cda8956aefb838c6b4849002079dca3bc8955f6e3/pdbp-1.8.1.tar.gz", hash = "sha256:e2acef6b14567b5599f624aec7378139cba086695c13a4e7e327ccb235c3a00b", size = 25812, upload-time = "2025-11-02T18:15:27.098Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/51/fe/53ac0cd932db5dcaf55961bc7cb7afdca8d80d8cc7406ed661f0c7dc111a/pdbp-1.8.2-py3-none-any.whl", hash = "sha256:d4fd05e177636b5ccd0b2e03e378cec57afc06149e5fd975de6f8ddb3d0109a8", size = 21969, upload-time = "2026-01-14T03:10:27.062Z" }, { url = "https://files.pythonhosted.org/packages/75/58/3af430d0de0b95d5adf7e576067e07d750ba76e28d142871982464fb40db/pdbp-1.8.1-py3-none-any.whl", hash = "sha256:643e8c84df7c09542c0c7c3f0f18a6c2fb4fb372f9f054ceca80a9037db986a5", size = 21950, upload-time = "2025-11-02T18:15:25.923Z" },
] ]
[[package]] [[package]]
@ -1115,7 +1115,7 @@ dev = [
{ name = "cython", specifier = ">=3.0.0,<4.0.0" }, { name = "cython", specifier = ">=3.0.0,<4.0.0" },
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" }, { name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "i3ipc", specifier = ">=2.2.1" }, { name = "i3ipc", specifier = ">=2.2.1" },
{ name = "pdbp", specifier = ">=1.8.2,<2.0.0" }, { name = "pdbp", specifier = ">=1.5.0,<2.0.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" }, { name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pyqt6", specifier = ">=6.7.0,<7.0.0" }, { name = "pyqt6", specifier = ">=6.7.0,<7.0.0" },
@ -1128,7 +1128,7 @@ dev = [
lint = [{ name = "ruff", specifier = ">=0.9.6" }] lint = [{ name = "ruff", specifier = ">=0.9.6" }]
repl = [ repl = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" }, { name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "pdbp", specifier = ">=1.8.2,<2.0.0" }, { name = "pdbp", specifier = ">=1.5.0,<2.0.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" }, { name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh" }, { name = "xonsh" },