Compare commits

..

32 Commits

Author SHA1 Message Date
Tyler Goodlet 2074eeea4f Spurious first-draft of EG collapsing
Topically, throughout various (seemingly) console-UX-affecting or benign
spots in the code base; nothing that required more intervention beyond
things superficial. A few spots also include `trio.Nursery` ref renames
(always to something with a `tn` in it) and log-level reductions to
quiet (benign) console noise oriented around issues meant to be solved
long..

Note there's still a couple spots i left with the loose-ify flag because
i haven't fully tested them without using the latest version of
`tractor.trionics.collapse_eg()`, but more then likely they should flip
over fine.
2026-01-02 18:22:11 -05:00
Tyler Goodlet 1799171705 Use `.trionics.collapse_eg()` in `.deribit.api`
Commit this change separate from the (original) broader set applied to
the entire code base since the `.deribit.api` mod contained changes from
upstream max-pain work (from our very own @nt) which caused a noticeable
conflict and intros un-required changes from his work to re-enable
`deribit` support.

Note the original commit, "69eac7bb Spurious first-draft of EG
collapsing", applied similar changes through the rest of the code base.
AGAIN, this mod's change is only being broken out to minimize upstream
change conflicts due to updates to the `deribit` backend done earlier in
time-history.
2026-01-02 18:22:11 -05:00
Tyler Goodlet cffefac615 ib-related: cope with invalid txn timestamps
That is inside embedded `.accounting.calc.dyn_parse_to_dt()` closure add
an optional `_invalid: list` param to where we can report
bad-timestamped records which we instead override and return as
`from_timestamp(0.)` (when the parser loop falls through) and report
later (in summary ) from the `.accounting.calc.iter_by_dt()` caller. Add
some logging and an optional debug block for future tracing.

NOTE, this commit was re-edited during a conflict between the orig
branches: `dev/binance_api_3.1` & `dev/alt_tpts_for_perf`.
2026-01-02 18:19:50 -05:00
Tyler Goodlet fc2d727fdb Mk a `notes_to_self/` move orig file `ideas.rst' 2026-01-02 17:39:12 -05:00
Tyler Goodlet 05dde42f70 Drop old/masked ahab-docker daemon starting 2026-01-02 17:39:12 -05:00
Tyler Goodlet 04e423e6bd Try running daemons on UDS tpt
The root daemon, pikerd, needs to be adjusted to use diff default
registry addrs to also utilize non-TCP, but for now this gets us started
testing; so far so good B)
2026-01-02 17:37:55 -05:00
Tyler Goodlet 793a454463 Adjust feed status fields/display-pane to new actor-ID
That is to use the new `tractor.msg.types.Aid` struct to pull the
`brokerd` info from the `tractor.Channel.aid: Aid` attr as well as more
generally handling the new `Channel.raddr.proto_key: str` and no longer
assuming a TCP IPC transport; this per the recent `tractor.ipc`
subsys which adds multi-IPC-transports!

Downstream tweaks to match,
- use an "opt-in" field set to display in the `brokerd` info pane in
  `.ui._feedstatus.mk_feed_label()`.
 |_ also add some todos and drop some seemingly unneeded form sizing
    calcs?
- tweak `.ui._label` to allow not using markdown, though ended up not
  doing that since it looked too plain..
2026-01-02 17:36:13 -05:00
Tyler Goodlet b8b4f1b80f Adjust to `trio`'s strict eg nurseries throughout!
Using `tractor.trionics.collapse_eg()` as needed to avoid, at the least,
crash-worthy (in debug-mode REPL-ing terms) nested cancellation egs that
exhibit on SIGINT/ctl-c of each "app" (chart & daemon).

Also a bit of renaming of all `trio.Nursery`s to `tn`, the new "task
nursery" shorthand-var-name being used in all our other `tractor`
related projects.
2026-01-02 17:36:13 -05:00
Tyler Goodlet 1cf041d8e6 Start a manual `tags` file for internal refs 2026-01-02 17:36:13 -05:00
Tyler Goodlet 831b6cfb21 Add a couple new grays to the pallete 2026-01-02 17:25:29 -05:00
Tyler Goodlet e5f7e8de9d Bump to (latest) `polars`, the `0.20.6x` series B)
Since I was trying out the neat lookin `polars-fuzzy-match` (also added
for now as a core dep here) which requires the new plugin sys, plus it's
about time we synced with upstream!

Adjust some column syntax to the new `.name` sub-field-space and the
`uv` lock-file to match.

Other,
- add back `trio-typing` bc i guess something else needs it (debug
  tooling stuff in new `tractor`?)
- flip back to the `tractor` pre-main pin since the new `main`-branch
  requires new `trio` stuff we haven't ported yet..
2026-01-02 17:24:17 -05:00
Tyler Goodlet 871bb2620e Port to newer `tractor.get_registry()` 2026-01-02 17:00:23 -05:00
Tyler Goodlet de980a69e0 Update legacy type to `tractor.MsgStream` 2026-01-02 17:00:23 -05:00
Tyler Goodlet ab9f01caf2 Fix type-check assertion in ems test to use `is` 2026-01-02 17:00:23 -05:00
Tyler Goodlet d85632ba9b Cast to `float` as needed from order-mode and ems
Since we're not quite yet using automatic typed msging from
`tractor`/`msgspec` (i.e. still manually decoding order ctl msgs from
built-in types..`dict`s still not `msgspec.Struct`) this adds the
appropriate typecasting ops to ensure the required precision is attained
prior to processing and/or submission to a brokerd backend service.

For the `.clearing._ems`,
- flip all `trigger_price` previously presumed to be `float` to just
  the field-identical `price: Decimal` and ensure we cast to `float`
  for any `trigger_price` usage, like before passing to `mk_check()`.

For `.ui.order_mode.OrderMode`,
- add a new `.curr_mkt: MktPair` convenience property to get the
  chart-active value.
- ensure we always use the `.curr_mkt.quantize() -> Decimal` before
  setting any IPC-msg's `.price` field!
- always cast `float(Order.price)` before use in setting line-levels.
- don't bother setting `Order.symbol` to a (now fully removed) `Symbol`
  instance since it's not really required-for-use anywhere; leaving it
  a `str` (per the type-annot) is fine for now?
2026-01-02 17:00:23 -05:00
Tyler Goodlet 8294ca6487 Mk `Brokerd[Order].price` avoid `float`-errs
By re-typing to a `.price: Decimal` field on both legs of the EMS.

It seems we must do it ourselves since,
- these msg's (fields) are relayed through the clearing engine to each
  `brokerd` backend and,
- bc many (if not all) of those backends `.broker`-clients (nor their
  encapsulated "brokerage services") **are not** doing any
  precision-truncation themselves.

So, for now, instead we opt to expect rounding at the source. This means
we will explicitly require casting to/from `float` at the line-graphics
interface to the order-clearing-engine (as implemented throughout
`.ui.order_mode.OrderMode`); and this is coming shortly.
2026-01-02 17:00:23 -05:00
Tyler Goodlet 87385a4e2d ib: never relay "Warning:" errors to EMS..
You'd think they could be bothered to make either a "log" or "warning"
msg type instead of a `type='error'`.. but alas, this attempts to detect
all such "warning"-errors and never proxy them to the clearing engine
thus avoiding the cancellation of any associated (by `reqid`)
pre-existing orders (control dialogs).

Also update all surrounding log messages to a more multiline style.
2026-01-02 16:59:09 -05:00
Tyler Goodlet b3c5478017 ib: jig `.data_reset_hack()` with vnc-client failover
Since apparently porting to the new docker container enforces using
a vnc password and `asyncvnc` seems to have a bug/mis-config whenever
i've tried a pw over a wg tunnel..?

Soo, this tries out the old `i3ipc`-win-focus + `xdo` click hack when
the above fails.

Deats,
- add a mod-level `try_xdo_manual()` to wrap calling
  `i3ipc_xdotool_manual_click_hack()` with an oserr handler, ensure we
  don't bother trying if `i3ipc` import fails beforehand tho.
- call ^ from both the orig case block and the failover from the
  vnc-client case.
- factor the `+no_setup_msg: str` out to mod level and expect it to be
  `.format()`-ed.
- refresh todo around `asyncvnc` pw ish..
- add a new `i3ipc_fin_wins_titled()` window-title scanner which
  predicates input `titles` and delivers any matches alongside the orig
  focused win at call time.
- tweak `i3ipc_xdotool_manual_click_hack()` to call ^ and remove prior
  unfactored window scanning logic.
2026-01-02 16:59:09 -05:00
Tyler Goodlet 6c9a78c5a0 Add fix for binance API 3.1 rollout..
See https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
2026-01-02 16:59:09 -05:00
Tyler Goodlet da223f7a55 kraken: add crash-handling around `Pair()` init
Since it can otherwise be difficult to debug due to nursery cancellation
(we need that taskman yo!).
2026-01-02 16:59:09 -05:00
Tyler Goodlet 49fe0a3398 kraken: `Pair.costmin` is now optional?
Some pairs don't seem to define it but it's not listed as deprecated on
official API page (new one now linked in type def's doc string).
2026-01-02 16:59:09 -05:00
Tyler Goodlet 29fc3b8a8b binance: add new `permissionSets` to base `Pair` 2026-01-02 16:59:09 -05:00
Tyler Goodlet 1bfe777637 Update `binance` spot pairs with `amendAllowed`
As per API updates,
https://developers.binance.com/docs/binance-spot-api-docs
https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority

I also slightly tweaked the filed mismatch exception note to include the
`repr(pair_type)` so the dev can know which pair types should be
changed.
2026-01-02 16:59:09 -05:00
Tyler Goodlet c694d915f1 `.kraken`: add masked pauses for order req debug
Such that the next time i inevitably must debug the some order-request
error status or precision discrepancy, i have the mkt-symbol branch
ready to go. Also, switch to `'action': 'buy'|'sell' as action,` style
`case` matching instead of the post-`if` predicate style.
2026-01-02 16:59:09 -05:00
Tyler Goodlet c120cb51a4 `.questrade`: link in ws-API issue! 2026-01-02 16:59:09 -05:00
Tyler Goodlet 7c20231f16 `.kraken.broker`: need to `await verify_balances()` .. 2026-01-02 16:59:09 -05:00
Tyler Goodlet d809c79788 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! 2026-01-02 16:59:09 -05:00
Tyler Goodlet 9f2f8a1664 `.brokers.cli`: module type and todo for `--pdb` flag to NOT src from sub-cmd 2026-01-02 16:59:09 -05:00
Tyler Goodlet 9f141635d1 Type loaded backend modules 2026-01-02 16:59:09 -05:00
Tyler Goodlet 0604ca7c82 Bump various `.brokers.core` doc string content/style 2026-01-02 16:59:09 -05:00
Tyler Goodlet 82c2256271 Add missing f-str prefix to log line 2026-01-02 16:55:15 -05:00
Tyler Goodlet a743fa28b5 Teensie `piker.data` styling tweaks
- use more compact optional value style with `|`-union
- fix `.flows` typing-only import since we need `MktPair` to be
  immediately defined for use on a `msgspec.Struct` field.
- more "tree-like" warning msg in `.validate()` reporting.
2026-01-02 16:55:15 -05:00
31 changed files with 600 additions and 1390 deletions

View File

@ -1,5 +1,6 @@
################
# ---- CEXY ----
################
[binance]
accounts.paper = 'paper'
@ -12,41 +13,28 @@ accounts.spot = 'spot'
spot.use_testnet = false
spot.api_key = ''
spot.api_secret = ''
# ------ binance ------
[deribit]
# std assets
key_id = ''
key_secret = ''
# options
accounts.option = 'option'
option.use_testnet = false
option.key_id = ''
option.key_secret = ''
# aux logging from `cryptofeed`
option.log.filename = 'cryptofeed.log'
option.log.level = 'DEBUG'
option.log.disabled = true
# ------ deribit ------
[kraken]
key_descr = ''
api_key = ''
secret = ''
# ------ kraken ------
[kucoin]
key_id = ''
key_secret = ''
key_passphrase = ''
# ------ kucoin ------
################
# -- BROKERZ ---
################
[questrade]
refresh_token = ''
access_token = ''
@ -54,55 +42,44 @@ api_server = 'https://api06.iq.questrade.com/'
expires_in = 1800
token_type = 'Bearer'
expires_at = 1616095326.355846
# ------ questrade ------
[ib]
# define the (set of) host-port socketaddrs that
# brokerd.ib will scan to connect to an API endpoint
# (ib-gw or ib-tws listening instances)
hosts = [
'127.0.0.1',
]
# XXX: the order in which ports will be scanned
# (by the `brokerd` daemon-actor)
# is determined # by the line order here.
# TODO: when we eventually spawn gateways in our
# container, we can just dynamically allocate these
# using IBC.
ports = [
4002, # gw
7497, # tws
]
# When API endpoints are being scanned durin startup, the order
# of user-defined-account "names" (as defined below) here
# determines which py-client connection is given priority to be
# used for data-feed-requests by according to whichever client
# connected to an API endpoing which reported the equivalent
# account number for that name.
# XXX: for a paper account the flex web query service
# is not supported so you have to manually download
# and XML report and put it in a location that can be
# accessed by the ``brokerd.ib`` backend code for parsing.
flex_token = ''
flex_trades_query_id = '' # live account
# when clients are being scanned this determines
# which clients are preferred to be used for data
# feeds based on the order of account names, if
# detected as active on an API client.
prefer_data_account = [
'paper',
'margin',
'ira',
]
# For long-term trades txn (transaction) history
# processing (i.e your txn ledger with IB) you can
# (automatically for live accounts) query the FLEX
# report system for past history.
#
# (For paper accounts the web query service
# is not supported so you have to manually download
# an XML report and put it in a location that can be
# accessed by our `brokerd.ib` backend code for parsing).
#
flex_token = ''
flex_trades_query_id = '' # live account
# define "aliases" (names) for each account number
# such that the names can be reffed and logged throughout
# `piker.accounting` subsys and more easily
# referred to by the user.
#
# These keys will be the set exposed through the order-mode
# account-selection UI so that numbers are never shown.
[ib.accounts]
paper = 'DU0000000' # <- literal account #
margin = 'U0000000'
ira = 'U0000000'
# ------ ib ------
# the order in which accounts will be selectable
# in the order mode UI (if found via clients during
# API-app scanning)when a new symbol is loaded.
paper = 'XX0000000'
margin = 'X0000000'
ira = 'X0000000'

View File

@ -1,9 +1,7 @@
[network]
pikerd = [
'/ipv4/127.0.0.1/tcp/6116', # std localhost daemon-actor tree
# '/uds/6116', # TODO std uds socket file
]
tsdb.backend = 'marketstore'
tsdb.host = 'localhost'
tsdb.grpc_port = 5995
[ui]
# set custom font + size which will scale entire UI

View File

@ -1,138 +1,30 @@
running ``ib`` gateway in ``docker``
------------------------------------
We have a config based on a well maintained community
image from `@gnzsnz`:
We have a config based on the (now defunct)
image from "waytrade":
https://github.com/gnzsnz/ib-gateway-docker
https://github.com/waytrade/ib-gateway-docker
To startup this image simply run the command::
To startup this image with our custom settings
simply run the command::
docker compose up
(For further usage^ see the official `docker-compose`_ docs)
And you should have the following socket-available services:
- ``x11vnc1@127.0.0.1:3003``
- ``ib-gw@127.0.0.1:4002``
And you should have the following socket-available services by
default:
You can attach to the container via a VNC client
without password auth.
- ``x11vnc1 @ 127.0.0.1:5900``
- ``ib-gw @ 127.0.0.1:4002``
You can now attach to the container via a VNC client with password-auth;
here is an example using ``vncclient`` on ``linux``::
vncviewer localhost:5900
now enter the pw (password) you set via an (see second code blob)
`.env file`_ or pw-file according to the `credentials section`_.
If you want to change away from their default config see the example
`docker-compose.yml`-config issue and config-section of the readme,
- https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration
- https://github.com/gnzsnz/ib-gateway-docker/discussions/103
.. _.env file: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#how-to-use-it
.. _docker-compose: https://docs.docker.com/compose/
.. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials
Connecting to the API from `piker`
---------------------------------
In order to expose the container's API endpoint to the
`brokerd/datad/ib` actor, we need to add a section to the user's
`brokers.toml` config (note the below is similar to the repo-shipped
template file),
.. code:: toml
[ib]
# define the (set of) host-port socketaddrs that
# brokerd.ib will scan to connect to an API endpoint
# (ib-gw or ib-tws listening instances)
hosts = [
'127.0.0.1',
]
ports = [
4002, # gw
7497, # tws
]
# When API endpoints are being scanned durin startup, the order
# of user-defined-account "names" (as defined below) here
# determines which py-client connection is given priority to be
# used for data-feed-requests by according to whichever client
# connected to an API endpoing which reported the equivalent
# account number for that name.
prefer_data_account = [
'paper',
'margin',
'ira',
]
# define "aliases" (names) for each account number
# such that the names can be reffed and logged throughout
# `piker.accounting` subsys and more easily
# referred to by the user.
#
# These keys will be the set exposed through the order-mode
# account-selection UI so that numbers are never shown.
[ib.accounts]
paper = 'XX0000000'
margin = 'X0000000'
ira = 'X0000000'
the broker daemon can also connect to the container's VNC server for
added functionalies including,
- viewing the API endpoint program's GUI for manual interventions,
- workarounds for historical data throttling using hotkey hacks,
Add a further section to `brokers.toml` which maps each API-ep's
port to a table of VNC server connection info like,
.. code:: toml
[ib.vnc_addrs]
4002 = {host = 'localhost', port = 5900, pw = 'doggy'}
The `pw = 'doggy'` here ^ should the same value as the particular
container instances `.env` file setting (when it was run),
.. code:: ini
VNC_SERVER_PASSWORD='doggy'
IF you also want to run ``TWS``
-------------------------------
You can also run it containerized,
https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#using-tws
SECURITY stuff (advanced, only if you're paranoid)
--------------------------------------------------
First and foremost if doing a "distributed" container setup where you
run the ``ib-gw`` docker container and your connecting API client
(likely ``ib_async`` from python) on **different hosts** be sure to
read the `security considerations`_ section!
And for a further (somewhat paranoid) perspective from
a long-time-ago serious devops eng..
Though "``ib``" claims they filter remote host connections outside
``localhost`` (aka ``127.0.0.1`` on ipv4) it's prolly justified if
you'd like to filter the socket at the *OS level* using a stateless
firewall rule::
SECURITY STUFF!?!?!
-------------------
Though "``ib``" claims they host filter connections outside
localhost (aka ``127.0.0.1``) it's probably better if you filter
the socket at the OS level using a stateless firewall rule::
ip rule add not unicast iif lo to 0.0.0.0/0 dport 4002
We will soon have this either baked into our own custom derivative
image (or patched into the current upstream one after further testin)
but for now you'll have to do it urself, diggity dawg.
.. _security considerations: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#security-considerations
We will soon have this baked into our own custom image but for
now you'll have to do it urself dawgy.

View File

@ -1,15 +1,10 @@
# a community maintained IB API container!
#
# https://github.com/gnzsnz/ib-gateway-docker
#
# For piker we (currently) include some minor deviations
# for some config files in the `volumes` section.
#
# See full configuration settings @
# - https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration
# - https://github.com/gnzsnz/ib-gateway-docker/discussions/103
# rework from the original @
# https://github.com/waytrade/ib-gateway-docker/blob/master/docker-compose.yml
version: "3.5"
services:
ib_gw_paper:
# apparently java is a mega cukc:
@ -55,22 +50,16 @@ services:
target: /root/scripts/run_x11_vnc.sh
read_only: true
# NOTE: an alt method to fill these out is to
# define an `.env` file in the same dir as
# this compose file.
# NOTE:to fill these out, define an `.env` file in the same dir as
# this compose file which looks something like:
# TWS_USERID='myuser'
# TWS_PASSWORD='guest'
environment:
TWS_USERID: ${TWS_USERID}
# TWS_USERID: 'myuser'
TWS_PASSWORD: ${TWS_PASSWORD}
# TWS_PASSWORD: 'guest'
TRADING_MODE: ${TRADING_MODE}
# TRADING_MODE: 'paper'
VNC_SERVER_PASSWORD: ${VNC_SERVER_PASSWORD}
# VNC_SERVER_PASSWORD: 'doggy'
# TODO, see if we can get this supported like it
# was on the old `waytrade` image?
# VNC_SERVER_PORT: '3003'
TRADING_MODE: 'paper'
VNC_SERVER_PASSWORD: 'doggy'
VNC_SERVER_PORT: '3003'
# ports:
# - target: 4002
@ -87,9 +76,6 @@ services:
# - "127.0.0.1:4002:4002"
# - "127.0.0.1:5900:5900"
# TODO, a masked but working example of dual paper + live
# ib-gw instances running in a single app run!
#
# ib_gw_live:
# image: waytrade/ib-gateway:1012.2i
# restart: no

View File

@ -33,6 +33,7 @@ from ._pos import (
Account,
load_account,
load_account_from_ledger,
open_pps,
open_account,
Position,
)
@ -67,6 +68,7 @@ __all__ = [
'load_account_from_ledger',
'mk_allocator',
'open_account',
'open_pps',
'open_trade_ledger',
'unpack_fqme',
'DerivTypes',

View File

@ -356,12 +356,13 @@ class Position(Struct):
) -> bool:
'''
Update clearing table by calculating the rolling ppu and
(accumulative) size in both the clears entry and local attrs
state.
(accumulative) size in both the clears entry and local
attrs state.
Inserts are always done in datetime sorted order.
'''
# added: bool = False
tid: str = t.tid
if tid in self._events:
log.debug(
@ -369,7 +370,7 @@ class Position(Struct):
f'\n'
f'{t}\n'
)
return False
# return added
# TODO: apparently this IS possible with a dict but not
# common and probably not that beneficial unless we're also
@ -450,12 +451,6 @@ class Position(Struct):
# def suggest_split(self) -> float:
# ...
# ?TODO, for sending rendered state over the wire?
# def summary(self) -> PositionSummary:
# do minimal conversion to a subset of fields
# currently defined in `.clearing._messages.BrokerdPosition`
class Account(Struct):
'''
@ -499,9 +494,9 @@ class Account(Struct):
def update_from_ledger(
self,
ledger: TransactionLedger|dict[str, Transaction],
ledger: TransactionLedger | dict[str, Transaction],
cost_scalar: float = 2,
symcache: SymbologyCache|None = None,
symcache: SymbologyCache | None = None,
_mktmap_table: dict[str, MktPair] | None = None,
@ -754,7 +749,7 @@ class Account(Struct):
# XXX WTF: if we use a tomlkit.Integer here we get this
# super weird --1 thing going on for cumsize!?1!
# NOTE: the fix was to always float() the size value loaded
# in open_account() below!
# in open_pps() below!
config.write(
config=self.conf,
path=self.conf_path,
@ -938,6 +933,7 @@ def open_account(
clears_table['dt'] = dt
trans.append(Transaction(
fqme=bs_mktid,
# sym=mkt,
bs_mktid=bs_mktid,
tid=tid,
# XXX: not sure why sometimes these are loaded as
@ -960,22 +956,11 @@ def open_account(
):
expiry: pendulum.DateTime = pendulum.parse(expiry)
# !XXX, should never be duplicates over
# a backend-(broker)-system's unique market-IDs!
if pos := pp_objs.get(bs_mktid):
if mkt != pos.mkt:
log.warning(
f'Duplicated position but diff `MktPair.fqme` ??\n'
f'bs_mktid: {bs_mktid!r}\n'
f'pos.mkt: {pos.mkt}\n'
f'mkt: {mkt}\n'
)
else:
pos = pp_objs[bs_mktid] = Position(
mkt,
split_ratio=split_ratio,
bs_mktid=bs_mktid,
)
pp = pp_objs[bs_mktid] = Position(
mkt,
split_ratio=split_ratio,
bs_mktid=bs_mktid,
)
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
@ -983,13 +968,8 @@ def open_account(
# state, since today's records may have already been
# processed!
for t in trans:
added: bool = pos.add_clear(t)
if not added:
log.warning(
f'Txn already recorded in pp ??\n'
f'\n'
f'{t}\n'
)
pp.add_clear(t)
try:
yield acnt
finally:
@ -997,6 +977,20 @@ def open_account(
acnt.write_config()
# TODO: drop the old name and THIS!
@cm
def open_pps(
*args,
**kwargs,
) -> Generator[Account, None, None]:
log.warning(
'`open_pps()` is now deprecated!\n'
'Please use `with open_account() as cnt:`'
)
with open_account(*args, **kwargs) as acnt:
yield acnt
def load_account_from_ledger(
brokername: str,

View File

@ -32,7 +32,6 @@ from typing import (
TYPE_CHECKING,
)
from tractor.devx import maybe_open_crash_handler
import polars as pl
from pendulum import (
DateTime,
@ -294,11 +293,7 @@ def iter_by_dt(
)
continue
# XXX: we should never really get here bc it means some kinda
# bad txn-record (field) data..
#
# -> set the `debug_mode = True` if you want to trace such
# cases from REPL ;)
# XXX: should never get here..
else:
# XXX: we should really never get here..
# only if a ledger record has no expected sort(able)
@ -308,21 +303,16 @@ def iter_by_dt(
'No (time) sortable field for TXN:\n'
f'{tx!r}\n'
)
report: str = (
f'No supported time-field found in txn !?\n'
f'\n'
f'supported-time-fields: {parsers!r}\n'
f'\n'
f'txn: {tx!r}\n'
)
if debug:
with maybe_open_crash_handler(
pdb=debug,
raise_on_exit=False,
):
raise ValueError(report)
else:
log.error(report)
import tractor
with tractor.devx.maybe_open_crash_handler():
raise ValueError(
f'No supported time-field found in txn !?\n'
f'\n'
f'supported-time-fields: {parsers!r}\n'
f'\n'
f'txn: {tx!r}\n'
)
if _invalid is not None:
_invalid.append(tx)
@ -406,7 +396,6 @@ def open_ledger_dfs(
acctname: str,
ledger: TransactionLedger | None = None,
debug_mode: bool = False,
**kwargs,
@ -421,10 +410,8 @@ def open_ledger_dfs(
can update the ledger on exit.
'''
with maybe_open_crash_handler(
pdb=debug_mode,
# raise_on_exit=False,
):
from piker.toolz import open_crash_handler
with open_crash_handler():
if not ledger:
import time
from ._ledger import open_trade_ledger
@ -516,7 +503,7 @@ def ledger_to_dfs(
df = dfs[key] = ldf.with_columns([
pl.cum_sum('size').alias('cumsize'),
pl.cumsum('size').alias('cumsize'),
# amount of source asset "sent" (via buy txns in
# the market) to acquire the dst asset, PER txn.
@ -531,7 +518,7 @@ def ledger_to_dfs(
]).with_columns([
# rolling balance in src asset units
(pl.col('dst_bot').cum_sum() * -1).alias('src_balance'),
(pl.col('dst_bot').cumsum() * -1).alias('src_balance'),
# "position operation type" in terms of increasing the
# amount in the dst asset (entering) or decreasing the
@ -673,7 +660,7 @@ def ledger_to_dfs(
# cost that was included in the least-recently
# entered txn that is still part of the current CSi
# set.
# => we look up the cost-per-unit cum_sum and apply
# => we look up the cost-per-unit cumsum and apply
# if over the current txn size (by multiplication)
# and then reverse that previusly applied cost on
# the txn_cost for this record.

View File

@ -94,15 +94,13 @@ class L1(Struct):
# validation type
# https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Aggregate-Trade-Streams#response-example
class AggTrade(Struct, frozen=True):
e: str # Event type
E: int # Event time
s: str # Symbol
a: int # Aggregate trade ID
p: float # Price
q: float # Quantity with all the market trades
nq: float # Normal quantity without the trades involving RPI orders
q: float # Quantity
f: int # First trade ID
l: int # noqa Last trade ID
T: int # Trade time
@ -450,6 +448,7 @@ async def subscribe(
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
@ -461,7 +460,6 @@ async def stream_quotes(
) -> None:
async with (
tractor.trionics.maybe_raise_from_masking_exc(),
send_chan as send_chan,
open_cached_client('binance') as client,
):

View File

@ -102,10 +102,7 @@ class Pair(Struct, frozen=True, kw_only=True):
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
# will become non-optional 2025-08-28?
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
pegInstructionsAllowed: bool = False
# https://developers.binance.com/docs/binance-spot-api-docs#2025-12-02
opoAllowed: bool = False
pegInstructionsAllowed: bool|None = None
filters: dict[
str,
@ -223,10 +220,7 @@ class FutesPair(Pair):
assert pair == self.pair # sanity
return f'{expiry}'
case (
'PERPETUAL'
| 'TRADIFI_PERPETUAL'
):
case 'PERPETUAL':
return 'PERP'
case '':
@ -255,10 +249,7 @@ class FutesPair(Pair):
margin: str = self.marginAsset
match ctype:
case (
'PERPETUAL'
| 'TRADIFI_PERPETUAL'
):
case 'PERPETUAL':
return f'{margin}M'
case (

View File

@ -20,11 +20,6 @@ runnable script-programs.
'''
from __future__ import annotations
from datetime import ( # noqa
datetime,
date,
tzinfo as TzInfo,
)
from functools import partial
from typing import (
Literal,
@ -38,6 +33,7 @@ from piker.brokers._util import get_logger
if TYPE_CHECKING:
from .api import Client
from ib_insync import IB
import i3ipc
log = get_logger('piker.brokers.ib')
@ -61,7 +57,7 @@ no_setup_msg:str = (
def try_xdo_manual(
client: Client,
vnc_sockaddr: str,
):
'''
Do the "manual" `xdo`-based screen switch + click
@ -78,14 +74,14 @@ def try_xdo_manual(
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
vnc_sockaddr: str = client.conf.vnc_addrs
log.exception(
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
no_setup_msg.format(vnc_sockaddr)
)
return False
async def data_reset_hack(
# vnc_host: str,
client: Client,
reset_type: Literal['data', 'connection'],
@ -117,24 +113,36 @@ async def data_reset_hack(
that need to be wrangle.
'''
ib_client: IB = client.ib
# look up any user defined vnc socket address mapped from
# a particular API socket port.
vnc_addrs: tuple[str]|None = client.conf.get('vnc_addrs')
if not vnc_addrs:
api_port: str = str(ib_client.client.port)
vnc_host: str
vnc_port: int
vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs')
if not vnc_sockaddr:
log.warning(
no_setup_msg.format(vnc_sockaddr=client.conf)
no_setup_msg.format(vnc_sockaddr)
+
'REQUIRES A `vnc_addrs: array` ENTRY'
)
vnc_host, vnc_port = vnc_sockaddr.get(
api_port,
('localhost', 3003)
)
global _reset_tech
match _reset_tech:
case 'vnc':
try:
await tractor.to_asyncio.run_task(
partial(
vnc_click_hack,
client=client,
host=vnc_host,
port=vnc_port,
)
)
except (
@ -145,31 +153,29 @@ async def data_reset_hack(
import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError:
log.warning(
no_setup_msg.format(vnc_sockaddr=client.conf)
no_setup_msg.format(vnc_sockaddr)
)
return False
# XXX, Xorg only workaround..
# TODO? remove now that we have `pyvnc`?
# if vnc_host not in {
# 'localhost',
# '127.0.0.1',
# }:
# focussed, matches = i3ipc_fin_wins_titled()
# if not matches:
# log.warning(
# no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
# )
# return False
# else:
# try_xdo_manual(vnc_sockaddr)
if vnc_host not in {
'localhost',
'127.0.0.1',
}:
focussed, matches = i3ipc_fin_wins_titled()
if not matches:
log.warning(
no_setup_msg.format(vnc_sockaddr)
)
return False
else:
try_xdo_manual(vnc_sockaddr)
# localhost but no vnc-client or it borked..
else:
try_xdo_manual(client)
try_xdo_manual(vnc_sockaddr)
case 'i3ipc_xdotool':
try_xdo_manual(client)
try_xdo_manual(vnc_sockaddr)
# i3ipc_xdotool_manual_click_hack()
case _ as tech:
@ -180,66 +186,21 @@ async def data_reset_hack(
async def vnc_click_hack(
client: Client,
reset_type: str = 'data',
pw: str|None = None,
host: str,
port: int,
reset_type: str = 'data'
) -> None:
'''
Reset the data or network connection for the VNC attached
ib-gateway using a (magic) keybinding combo.
A vnc-server password can be set either by an input `pw` param or
set in the client's config with the latter loaded from the user's
`brokers.toml` in a vnc-addrs-port-mapping section,
.. code:: toml
[ib.vnc_addrs]
4002 = {host = 'localhost', port = 5900, pw = 'doggy'}
ib gateway using magic combos.
'''
api_port: str = str(client.ib.client.port)
conf: dict = client.conf
vnc_addrs: dict[int, tuple] = conf.get('vnc_addrs')
if not vnc_addrs:
return None
addr_entry: dict|tuple = vnc_addrs.get(
api_port,
('localhost', 5900) # a typical default
)
if pw is None:
match addr_entry:
case (
host,
port,
):
pass
case {
'host': host,
'port': port,
'pw': pw
}:
pass
case _:
raise ValueError(
f'Invalid `ib.vnc_addrs` entry ?\n'
f'{addr_entry!r}\n'
)
try:
from pyvnc import (
AsyncVNCClient,
VNCConfig,
Point,
MOUSE_BUTTON_LEFT,
)
import asyncvnc
except ModuleNotFoundError:
log.warning(
"In order to leverage `piker`'s built-in data reset hacks, install "
"the `pyvnc` project: https://github.com/regulad/pyvnc.git"
"the `asyncvnc` project: https://github.com/barneygale/asyncvnc"
)
return
@ -250,27 +211,24 @@ async def vnc_click_hack(
'connection': 'r'
}[reset_type]
with tractor.devx.open_crash_handler():
client = await AsyncVNCClient.connect(
VNCConfig(
host=host,
port=port,
password=pw,
)
async with asyncvnc.connect(
host,
port=port,
# TODO: doesn't work?
# see, https://github.com/barneygale/asyncvnc/issues/7
password='doggy',
) as client:
# move to middle of screen
# 640x1800
client.mouse.move(
x=500,
y=500,
)
async with client:
# move to middle of screen
# 640x1800
await client.move(
Point(
500,
500,
)
)
# ensure the ib-gw window is active
await client.click(MOUSE_BUTTON_LEFT)
# send the hotkeys combo B)
await client.press('Ctrl', 'Alt', key) # keys are stacked
client.mouse.click()
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
def i3ipc_fin_wins_titled(
@ -379,99 +337,3 @@ def i3ipc_xdotool_manual_click_hack() -> None:
])
except subprocess.TimeoutExpired:
log.exception('xdotool timed out?')
def is_current_time_in_range(
start_dt: datetime,
end_dt: datetime,
) -> bool:
'''
Check if current time is within the datetime range.
Use any/the-same timezone as provided by `start_dt.tzinfo` value
in the range.
'''
now: datetime = datetime.now(start_dt.tzinfo)
return start_dt <= now <= end_dt
# TODO, put this into `._util` and call it from here!
#
# NOTE, this was generated by @guille from a gpt5 prompt
# and was originally thot to be needed before learning about
# `ib_insync.contract.ContractDetails._parseSessions()` and
# it's downstream meths..
#
# This is still likely useful to keep for now to parse the
# `.tradingHours: str` value manually if we ever decide
# to move off `ib_async` and implement our own `trio`/`anyio`
# based version Bp
#
# >attempt to parse the retarted ib "time stampy thing" they
# >do for "venue hours" with this.. written by
# >gpt5-"thinking",
#
def parse_trading_hours(
spec: str,
tz: TzInfo|None = None
) -> dict[
date,
tuple[datetime, datetime]
]|None:
'''
Parse venue hours like:
'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...'
Returns `dict[date] = (open_dt, close_dt)` or `None` if
closed.
'''
if (
not isinstance(spec, str)
or
not spec
):
raise ValueError('spec must be a non-empty string')
out: dict[
date,
tuple[datetime, datetime]
]|None = {}
for part in (p.strip() for p in spec.split(';') if p.strip()):
if part.endswith(':CLOSED'):
day_s, _ = part.split(':', 1)
d = datetime.strptime(day_s, '%Y%m%d').date()
out[d] = None
continue
try:
start_s, end_s = part.split('-', 1)
start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M')
end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M')
except ValueError as exc:
raise ValueError(f'invalid segment: {part}') from exc
if tz is not None:
start_dt = start_dt.replace(tzinfo=tz)
end_dt = end_dt.replace(tzinfo=tz)
out[start_dt.date()] = (start_dt, end_dt)
return out
# ORIG desired usage,
#
# TODO, for non-drunk tomorrow,
# - call above fn and check that `output[today] is not None`
# trading_hrs: dict = parse_trading_hours(
# details.tradingHours
# )
# liq_hrs: dict = parse_trading_hours(
# details.liquidHours
# )

View File

@ -334,15 +334,15 @@ class Client:
fqme: str,
# EST in ISO 8601 format is required... below is EPOCH
start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00",
end_dt: datetime|str = "",
start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00",
end_dt: datetime | str = "",
# ohlc sample period in seconds
sample_period_s: int = 1,
# optional "duration of time" equal to the
# length of the returned history frame.
duration: str|None = None,
duration: str | None = None,
**kwargs,
@ -716,8 +716,8 @@ class Client:
async def find_contracts(
self,
pattern: str|None = None,
contract: Contract|None = None,
pattern: str | None = None,
contract: Contract | None = None,
qualify: bool = True,
err_on_qualify: bool = True,
@ -862,7 +862,7 @@ class Client:
self,
fqme: str,
) -> datetime|None:
) -> datetime | None:
'''
Return the first datetime stamp for `fqme` or `None`
on request failure.
@ -918,7 +918,7 @@ class Client:
tries: int = 100,
raise_on_timeout: bool = False,
) -> Ticker|None:
) -> Ticker | None:
'''
Return a single (snap) quote for symbol.
@ -930,7 +930,7 @@ class Client:
ready: ticker.TickerUpdateEvent = ticker.updateEvent
# ensure a last price gets filled in before we deliver quote
timeouterr: Exception|None = None
timeouterr: Exception | None = None
warnset: bool = False
for _ in range(tries):
@ -944,7 +944,6 @@ class Client:
)
if tkr:
break
except TimeoutError as err:
timeouterr = err
await asyncio.sleep(0.01)
@ -953,9 +952,7 @@ class Client:
else:
if not warnset:
log.warning(
f'Quote req timed out..\n'
f'Maybe the venue is closed?\n'
f'\n'
f'Quote req timed out..maybe venue is closed?\n'
f'{asdict(contract)}'
)
warnset = True
@ -967,11 +964,9 @@ class Client:
)
break
else:
if (
timeouterr
and
raise_on_timeout
):
if timeouterr and raise_on_timeout:
import pdbp
pdbp.set_trace()
raise timeouterr
if not warnset:
@ -1368,7 +1363,9 @@ async def load_aio_clients(
async def load_clients_for_trio(
chan: tractor.to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
'''
Pure async mngr proxy to ``load_aio_clients()``.
@ -1381,7 +1378,8 @@ async def load_clients_for_trio(
disconnect_on_exit=False,
) as accts2clients:
chan.started_nowait(accts2clients)
to_trio.send_nowait(accts2clients)
# TODO: maybe a sync event to wait on instead?
await asyncio.sleep(float('inf'))
@ -1507,7 +1505,7 @@ class MethodProxy:
self,
pattern: str,
) -> dict[str, Any]|trio.Event:
) -> dict[str, Any] | trio.Event:
ev = self.event_table.get(pattern)
@ -1528,22 +1526,23 @@ class MethodProxy:
async def open_aio_client_method_relay(
chan: tractor.to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
client: Client,
event_consumers: dict[str, trio.Event],
) -> None:
# sync with `open_client_proxy()` caller
chan.started_nowait(client)
to_trio.send_nowait(client)
# TODO: separate channel for error handling?
client.inline_errors(chan)
client.inline_errors(to_trio)
# relay all method requests to ``asyncio``-side client and deliver
# back results
while not chan._to_trio._closed: # <- TODO, better check like `._web_bs`?
msg: tuple[str, dict]|dict|None = await chan.get()
while not to_trio._closed:
msg: tuple[str, dict] | dict | None = await from_trio.get()
match msg:
case None: # termination sentinel
log.info('asyncio `Client` method-proxy SHUTDOWN!')
@ -1556,7 +1555,7 @@ async def open_aio_client_method_relay(
try:
resp = await meth(**kwargs)
# echo the msg back
chan.send_nowait({'result': resp})
to_trio.send_nowait({'result': resp})
except (
RequestError,
@ -1564,10 +1563,10 @@ async def open_aio_client_method_relay(
# TODO: relay all errors to trio?
# BaseException,
) as err:
chan.send_nowait({'exception': err})
to_trio.send_nowait({'exception': err})
case {'error': content}:
chan.send_nowait({'exception': content})
to_trio.send_nowait({'exception': content})
case _:
raise ValueError(f'Unhandled msg {msg}')

View File

@ -117,11 +117,7 @@ def pack_position(
symbol=fqme,
currency=con.currency,
size=float(pos.position),
avg_price=(
float(pos.avgCost)
/
float(con.multiplier or 1.0)
),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
),
)
@ -362,10 +358,6 @@ async def update_and_audit_pos_msg(
size=ibpos.position,
avg_price=pikerpos.ppu,
# XXX ensures matching even if multiple venue-names
# in `.bs_fqme`, likely from txn records..
bs_mktid=mkt.bs_mktid,
)
ibfmtmsg: str = pformat(ibpos._asdict())
@ -434,8 +426,7 @@ async def aggr_open_orders(
) -> None:
'''
Collect all open orders from client and fill in `order_msgs:
list`.
Collect all open orders from client and fill in `order_msgs: list`.
'''
trades: list[Trade] = client.ib.openTrades()
@ -556,10 +547,7 @@ async def open_trade_dialog(
),
# TODO: do this as part of `open_account()`!?
open_symcache(
'ib',
only_from_memcache=True,
) as symcache,
open_symcache('ib', only_from_memcache=True) as symcache,
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
@ -567,10 +555,8 @@ async def open_trade_dialog(
ledgers: dict[str, TransactionLedger] = {}
tables: dict[str, Account] = {}
order_msgs: list[Status] = []
conf: dict = get_config()
accounts_def_inv: bidict[str, str] = bidict(
conf['accounts']
).inverse
conf = get_config()
accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse
with (
ExitStack() as lstack,
@ -720,11 +706,7 @@ async def open_trade_dialog(
# client-account and build out position msgs to deliver to
# EMS.
for acctid, acnt in tables.items():
active_pps: dict[str, Position]
(
active_pps,
closed_pps,
) = acnt.dump_active()
active_pps, closed_pps = acnt.dump_active()
for pps in [active_pps, closed_pps]:
piker_pps: list[Position] = list(pps.values())
@ -740,7 +722,6 @@ async def open_trade_dialog(
)
if ibpos:
bs_mktid: str = str(ibpos.contract.conId)
msg = await update_and_audit_pos_msg(
acctid,
pikerpos,

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -13,12 +13,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
'''
Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio`
via "infected-asyncio-mode".
'''
"""
from __future__ import annotations
import asyncio
from contextlib import (
@ -28,6 +26,7 @@ from dataclasses import asdict
from datetime import datetime
from functools import partial
from pprint import pformat
from math import isnan
import time
from typing import (
Any,
@ -41,6 +40,7 @@ import numpy as np
from pendulum import (
now,
from_timestamp,
# DateTime,
Duration,
duration as mk_duration,
)
@ -69,10 +69,7 @@ from .api import (
Contract,
RequestError,
)
from ._util import (
data_reset_hack,
is_current_time_in_range,
)
from ._util import data_reset_hack
from .symbols import get_mkt_info
if TYPE_CHECKING:
@ -187,8 +184,7 @@ async def open_history_client(
if (
start_dt
and
start_dt.timestamp() == 0
and start_dt.timestamp() == 0
):
await tractor.pause()
@ -207,16 +203,14 @@ async def open_history_client(
):
count += 1
mean += latency / count
log.debug(
print(
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}'
)
# could be trying to retreive bars over weekend
if out is None:
log.error(
f"No bars starting at {end_dt!r} !?!?"
)
log.error(f"Can't grab bars starting at {end_dt}!?!?")
if (
end_dt
and head_dt
@ -291,9 +285,8 @@ _pacing: str = (
async def wait_on_data_reset(
proxy: MethodProxy,
reset_type: str = 'data',
timeout: float = 16,
timeout: float = 16, # float('inf'),
task_status: TaskStatus[
tuple[
@ -302,47 +295,29 @@ async def wait_on_data_reset(
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
Wait on a (global-ish) "data-farm" event to be emitted
by the IB api server.
Allows syncing to reconnect event-messages emitted on the API
console, such as:
- 'HMDS data farm connection is OK:ushmds'
- 'Market data farm is connecting:usfuture'
- 'Market data farm connection is OK:usfuture'
Deliver a `(cs, done: Event)` pair to the caller to support it
waiting or cancelling the associated "data-reset-request";
normally a manual data-reset-req is expected to be the cause and
thus trigger such events (such as our click-hack-magic from
`.ib._util`).
'''
# ?TODO, do we need a task-lock around this method?
#
# register for an API "status event" wrapped for `trio`-sync.
hist_ev: trio.Event = proxy.status_event(
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
#
# ^TODO: other event-messages we might want to support waiting-for
# but i wasn't able to get reliable..
#
# TODO: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
client: Client = proxy._aio_ns
done = trio.Event()
with trio.move_on_after(timeout) as cs:
task_status.started((cs, done))
log.warning(
@ -421,9 +396,8 @@ async def get_bars(
bool, # timed out hint
]:
'''
Request-n-retrieve historical data frames from a `trio.Task`
using a `MethoProxy` to query the `asyncio`-side's
`.ib.api.Client` methods.
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
global _data_resetter_task, _failed_resets
@ -633,10 +607,7 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as tn:
# start history request that we allow
# to run indefinitely until a result is acquired
@ -682,12 +653,14 @@ async def get_bars(
)
# per-actor cache of inter-eventloop-chans
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
async def _setup_quote_stream(
chan: tractor.to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
symbol: str,
opts: tuple[int] = (
'375', # RT trade volume (excludes utrades)
@ -705,13 +678,10 @@ async def _setup_quote_stream(
) -> trio.abc.ReceiveChannel:
'''
Stream L1 quotes via the `Ticker.updateEvent.connect(push)`
callback API by registering a `push` callback which simply
`chan.send_nowait()`s quote msgs back to the calling
parent-`trio.Task`-side.
Stream a ticker using the std L1 api.
NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY
and is thus run via `tractor.to_asyncio.open_channel_from()`.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
'''
global _quote_streams
@ -719,79 +689,39 @@ async def _setup_quote_stream(
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
# XXX since this is an `asyncio.Task`, we must use
# tractor.pause_from_sync()
caccount_name, client = get_preferred_data_client(accts2clients)
contract = (
contract
or
(await client.find_contract(symbol))
)
chan.started_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(
contract,
','.join(opts),
)
maybe_exc: BaseException|None = None
handler_tries: int = 0
aio_task: asyncio.Task = asyncio.current_task()
contract = contract or (await client.find_contract(symbol))
to_trio.send_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# ?TODO? this API is batch-wise and quite slow-af but,
# - seems to be 5s updates?
# - maybe we could use it for backchecking?
#
# NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe?
# ticker: Ticker = client.ib.reqTickByTickData(
# contract, 'Last',
# )
# define a very naive queue-pushing callback that relays
# quote-packets directly the calling (parent) `trio.Task`.
# Ensure on teardown we cancel the feed via their cancel API.
#
# # define a simple queue push routine that streams quote packets
# # to trio over the ``to_trio`` memory channel.
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
'''
Disconnect our `push`-er callback and cancel the data-feed
for `contract`.
'''
nonlocal maybe_exc
ticker.updateEvent.disconnect(push)
report: str = f'Disconnected mkt-data for {symbol!r} due to '
if maybe_exc is not None:
report += (
'error,\n'
f'{maybe_exc!r}\n'
)
log.error(report)
else:
report += (
'cancellation.\n'
)
log.cancel(report)
log.error(
f'Disconnected stream for `{symbol}`'
)
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
def push(
t: Ticker,
tries_before_raise: int = 6,
) -> None:
'''
Push quotes verbatim to parent-side `trio.Task`.
def push(t: Ticker) -> None:
"""
Push quotes to trio task.
'''
nonlocal maybe_exc, handler_tries
# log.debug(f'new IB quote: {t}\n')
"""
# log.debug(t)
try:
chan.send_nowait(t)
to_trio.send_nowait(t)
# XXX TODO XXX replicate in `tractor` tests
# as per `CancelledError`-handler notes below!
# assert 0
except (
trio.BrokenResourceError,
@ -806,107 +736,38 @@ async def _setup_quote_stream(
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
teardown()
# for slow debugging purposes to avoid clobbering prompt
# with log msgs
except trio.WouldBlock:
log.exception(
f'Asyncio->Trio `chan.send_nowait()` blocked !?\n'
f'\n'
f'{chan._to_trio.statistics()}\n'
)
# ?TODO, handle re-connection attempts?
except BaseException as _berr:
berr = _berr
if handler_tries >= tries_before_raise:
# breakpoint()
maybe_exc = _berr
# task.set_exception(berr)
aio_task.cancel(msg=berr.args)
raise berr
else:
handler_tries += 1
log.exception(
f'Failed to push ticker quote !?\n'
f'handler_tries={handler_tries!r}\n'
f'ticker: {t!r}\n'
f'\n'
f'{chan._to_trio.statistics()}\n'
f'\n'
f'CAUSE: {berr}\n'
)
# log.warning(
# f'channel is blocking symbol feed for {symbol}?'
# f'\n{to_trio.statistics}'
# )
pass
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
# # with log msgs
# pass
ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
# XXX, for debug.. TODO? can we rm again?
#
# tractor.pause_from_sync()
# while True:
# await asyncio.sleep(1.6)
# if ticker.ticks:
# log.debug(
# f'ticker.ticks = \n'
# f'{ticker.ticks}\n'
# )
# else:
# log.warning(
# 'UHH no ticker.ticks ??'
# )
# XXX TODO XXX !?!?
# apparently **without this handler** and the subsequent
# re-raising of `maybe_exc from _taskc` cancelling the
# `aio_task` from the `push()`-callback will cause a very
# strange chain of exc raising that breaks alll sorts of
# downstream callers, tasks and remote-actor tasks!?
#
# -[ ] we need some lowlevel reproducting tests to replicate
# those worst-case scenarios in `tractor` core!!
# -[ ] likely we should factor-out the `tractor.to_asyncio`
# attempts at workarounds in `.translate_aio_errors()`
# for failed `asyncio.Task.set_exception()` to either
# call `aio_task.cancel()` and/or
# `aio_task._fut_waiter.set_exception()` to a re-useable
# toolset in something like a `.to_asyncio._utils`??
#
except asyncio.CancelledError as _taskc:
if maybe_exc is not None:
raise maybe_exc from _taskc
raise _taskc
except BaseException as _berr:
# stash any crash cause for reporting in `teardown()`
maybe_exc = _berr
raise _berr
finally:
# always disconnect our `push()` and cancel the
# ib-"mkt-data-feed".
teardown()
# return from_aio
@acm
async def open_aio_quote_stream(
symbol: str,
contract: Contract|None = None,
contract: Contract | None = None,
) -> (
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
'''
Open a real-time `Ticker` quote stream from an `asyncio.Task`
spawned via `tractor.to_asyncio.open_channel_from()`, deliver the
inter-event-loop channel to the `trio.Task` caller and cache it
globally for re-use.
'''
from tractor.trionics import broadcast_receiver
global _quote_streams
@ -932,10 +793,6 @@ async def open_aio_quote_stream(
assert contract
# TODO? de-reg on teardown of last consumer task?
# -> why aren't we using `.trionics.maybe_open_context()`
# here again?? (we are in `open_client_proxies()` tho?)
#
# cache feed for later consumers
_quote_streams[symbol] = from_aio
@ -950,12 +807,7 @@ def normalize(
calc_price: bool = False
) -> dict:
'''
Translate `ib_async`'s `Ticker.ticks` values to a `piker`
normalized `dict` form for transmit to downstream `.data` layer
consumers.
'''
# check for special contract types
con = ticker.contract
fqme, calc_price = con2fqme(con)
@ -974,7 +826,7 @@ def normalize(
tbt = ticker.tickByTicks
if tbt:
log.info(f'tickbyticks:\n {ticker.tickByTicks}')
print(f'tickbyticks:\n {ticker.tickByTicks}')
ticker.ticks = new_ticks
@ -1010,39 +862,27 @@ def normalize(
return data
# ?TODO? feels like this task-fn could be factored to reduce some
# indentation levels?
# -[ ] the reconnect while loop on ib-gw "data farm connection.."s
# -[ ] everything embedded under the `async with aclosing(stream):`
# as the "meat" of the quote delivery once the connection is
# stable.
#
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
# TODO? we need to hook into the `ib_async` logger like
# we can with i3ipc from modden!
# loglevel: str|None = None,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Stream `symbols[0]` quotes back via `send_chan`.
Stream symbol quotes.
The `feed_is_live: Event` is set to signal the caller that it can
begin processing msgs from the mem-chan.
This is a ``trio`` callable routine meant to be invoked
once the brokerd is up.
'''
# TODO: support multiple subscriptions
sym: str = symbols[0]
log.info(
f'request for real-time quotes\n'
f'sym: {sym!r}\n'
)
sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
init_msgs: list[FeedInit] = []
@ -1051,52 +891,34 @@ async def stream_quotes(
details: ibis.ContractDetails
async with (
open_data_client() as proxy,
# trio.open_nursery() as tn,
):
mkt, details = await get_mkt_info(
sym,
proxy=proxy, # passed to avoid implicit client load
)
# is venue active rn?
venue_is_open: bool = any(
is_current_time_in_range(
start_dt=sesh.start,
end_dt=sesh.end,
)
for sesh in details.tradingSessions()
)
init_msg = FeedInit(mkt_info=mkt)
# NOTE, tell sampler (via config) to skip vlm summing for dst
# assets which provide no vlm data..
if mkt.dst.atype in {
'fiat',
'index',
'commodity',
}:
# tell sampler config that it shouldn't do vlm summing.
init_msg.shm_write_opts['sum_tick_vlm'] = False
init_msg.shm_write_opts['has_vlm'] = False
init_msgs.append(init_msg)
con: Contract = details.contract
first_ticker: Ticker|None = None
timeout: float = 1.6
with trio.move_on_after(timeout) as quote_cs:
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
# XXX should never happen with this ep right?
# but if so then, more then likely mkt is closed?
if quote_cs.cancelled_caught:
log.warning(
f'First quote req timed out after {timeout!r}s'
)
if first_ticker:
first_quote: dict = normalize(first_ticker)
@ -1108,27 +930,28 @@ async def stream_quotes(
f'{pformat(first_quote)}\n'
)
# XXX NOTE: whenever we're "outside regular trading hours"
# (only relevant for assets coming from the "legacy markets"
# space) so we basically (from an API/runtime-operational
# perspective) "pretend the feed is live" even if it's
# actually closed.
#
# IOW, we signal to the effective caller (task) that the live
# feed is "already up" but really we're just indicating that
# the OHLCV history can start being loaded immediately by the
# `piker.data`/`.tsp` layers.
#
# XXX, deats: the "pretend we're live" is just done by
# a `feed_is_live.set()` even though nothing is actually live
# Bp
if not venue_is_open:
log.warning(
f'Venue is closed, unable to establish real-time feed.\n'
f'mkt: {mkt!r}\n'
f'\n'
f'first_ticker: {first_ticker}\n'
)
# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
# type is NOT within the NON-NORMAL-venue set: aka not
# commodities, forex or crypto currencies which CAN
# always return a NaN on a snap quote request during
# normal venue hours. In the case of a closed venue
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
# dst asset venue with a lot of closed operating hours.
and mkt.dst.atype not in {
'commodity',
'fiat',
'crypto',
}
):
task_status.started((
init_msgs,
first_quote,
@ -1139,12 +962,10 @@ async def stream_quotes(
feed_is_live.set()
# block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to
# terminate this task.
await trio.sleep_forever()
return # we never expect feed to come up?
# ?TODO, we could instead spawn a task that waits on a feed
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
@ -1166,27 +987,24 @@ async def stream_quotes(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
cs: trio.CancelScope|None = None
cs: trio.CancelScope | None = None
startup: bool = True
iter_quotes: trio.abc.Channel
while (
startup
or
cs.cancel_called
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as iter_quotes,
):
# ?TODO? can we rm this - particularly for `ib_async`?
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
# first_ticker.ticks = []
first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
@ -1200,8 +1018,8 @@ async def stream_quotes(
# data feed event.
async def reset_on_feed():
# ??TODO? this seems to be surpressed from the
# traceback in `tractor`?
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
@ -1247,7 +1065,7 @@ async def stream_quotes(
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
# ticker.ticks = []
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
@ -1269,12 +1087,8 @@ async def stream_quotes(
async for ticker in iter_quotes:
quote = normalize(ticker)
fqme: str = quote['fqme']
log.debug(
f'Sending quote\n'
f'{quote}'
)
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them
# ticker.ticks = []
ticker.ticks = []
# last = time.time()

View File

@ -388,7 +388,6 @@ async def open_brokerd_dialog(
for ep_name in [
'open_trade_dialog', # probably final name?
'trades_dialogue', # legacy
# ^!TODO, rm this since all backends ported no ?!?
]:
trades_endpoint = getattr(
brokermod,
@ -1028,18 +1027,8 @@ async def translate_and_relay_brokerd_events(
)
if status == 'closed':
log.info(
f'Execution is complete!\n'
f'oid: {oid!r}\n'
)
status_msg = book._active.pop(oid, None)
if status_msg is None:
log.warning(
f'Order was already cleared from book ??\n'
f'oid: {oid!r}\n'
f'\n'
f'Maybe the order cancelled before submitted ??\n'
)
log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!')
@ -1563,18 +1552,19 @@ async def maybe_open_trade_relays(
@tractor.context
async def _emsd_main(
ctx: tractor.Context, # becomes `ems_ctx` below
ctx: tractor.Context,
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str|None = None,
) -> tuple[ # `ctx.started()` value!
dict[ # positions
tuple[str, str], # brokername, acctid
) -> tuple[
dict[
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
],
list[str], # accounts
dict[str, Status], # dialogs
list[str],
dict[str, Status],
]:
'''
EMS (sub)actor entrypoint providing the execution management

View File

@ -301,9 +301,6 @@ class BrokerdError(Struct):
# TODO: yeah, so we REALLY need to completely deprecate
# this and use the `.accounting.Position` msg-type instead..
# -[ ] an alternative might be to add a `Position.summary() ->
# `PositionSummary`-msg that we generate since `Position` has a lot
# of fields by default we likely don't want to send over the wire?
class BrokerdPosition(Struct):
'''
Position update event from brokerd.
@ -316,4 +313,3 @@ class BrokerdPosition(Struct):
avg_price: float
currency: str = ''
name: str = 'position'
bs_mktid: str|int|None = None

View File

@ -134,65 +134,65 @@ def pikerd(
Spawn the piker broker-daemon.
'''
# from tractor.devx import maybe_open_crash_handler
# with maybe_open_crash_handler(pdb=False):
log = get_console_log(loglevel, name='cli')
from tractor.devx import maybe_open_crash_handler
with maybe_open_crash_handler(pdb=pdb):
log = get_console_log(loglevel, name='cli')
if pdb:
log.warning((
"\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n"
"\n"
))
# service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
conf, _ = config.load(
conf_name='conf',
)
network: dict = conf.get('network')
if (
network is None
and not maddr
):
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
if pdb:
log.warning((
"\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n"
"\n"
))
from .. import service
# service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
# enable_transports=['uds'],
enable_transports=['tcp'],
) as service_mngr,
conf, _ = config.load(
conf_name='conf',
)
network: dict = conf.get('network')
if (
network is None
and not maddr
):
assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
await trio.sleep_forever()
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
trio.run(main)
else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
from .. import service
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
enable_transports=['uds'],
) as service_mngr,
):
assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
await trio.sleep_forever()
trio.run(main)
@click.group(context_settings=config._context_defaults)
@ -307,10 +307,6 @@ def services(config, tl, ports):
if not ports:
ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)
async def list_services():
nonlocal host
async with (
@ -319,17 +315,15 @@ def services(config, tl, ports):
loglevel=config['loglevel'] if tl else None,
),
tractor.get_registry(
addr=addr,
host=host,
port=ports[0]
) as portal
):
registry = await portal.run_from_ns(
'self',
'get_registry',
)
registry = await portal.run_from_ns('self', 'get_registry')
json_d = {}
for key, socket in registry.items():
json_d[key] = f'{socket}'
host, port = socket
json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}")
trio.run(list_services)

View File

@ -41,13 +41,10 @@ from .log import get_logger
log = get_logger('broker-config')
# XXX NOTE: taken from `click`
# |_https://github.com/pallets/click/blob/main/src/click/utils.py#L449
#
# (since apparently they have some super weirdness with SIGINT and
# sudo.. no clue we're probably going to slowly just modify it to our
# own version over time..)
#
# XXX NOTE: taken from ``click`` since apparently they have some
# super weirdness with sigint and sudo..no clue
# we're probably going to slowly just modify it to our own version over
# time..
def get_app_dir(
app_name: str,
roaming: bool = True,
@ -264,7 +261,7 @@ def load(
MutableMapping,
] = tomllib.loads,
touch_if_dne: bool = True,
touch_if_dne: bool = False,
**tomlkws,
@ -273,7 +270,7 @@ def load(
Load config file by name.
If desired config is not in the top level piker-user config path then
pass the `path: Path` explicitly.
pass the ``path: Path`` explicitly.
'''
# create the $HOME/.config/piker dir if dne
@ -288,8 +285,7 @@ def load(
if (
not path.is_file()
and
touch_if_dne
and touch_if_dne
):
# only do a template if no path provided,
# just touch an empty file with same name.

View File

@ -91,18 +91,6 @@ class SymbologyCache(Struct):
# provided by the backend pkg.
mktmaps: dict[str, MktPair] = field(default_factory=dict)
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' .mod: {self.mod!r}\n'
f' .assets: {len(self.assets)!r}\n'
f' .pairs: {len(self.pairs)!r}\n'
f' .mktmaps: {len(self.mktmaps)!r}\n'
f')>'
)
__repr__ = pformat
def write_config(self) -> None:
# put the backend's pair-struct type ref at the top

View File

@ -27,6 +27,7 @@ from functools import partial
from types import ModuleType
from typing import (
Any,
Optional,
Callable,
AsyncContextManager,
AsyncGenerator,
@ -34,7 +35,6 @@ from typing import (
)
import json
import tractor
import trio
from trio_typing import TaskStatus
from trio_websocket import (
@ -167,7 +167,7 @@ async def _reconnect_forever(
async def proxy_msgs(
ws: WebSocketConnection,
rent_cs: trio.CancelScope, # parent cancel scope
pcs: trio.CancelScope, # parent cancel scope
):
'''
Receive (under `timeout` deadline) all msgs from from underlying
@ -192,7 +192,7 @@ async def _reconnect_forever(
f'{url} connection bail with:'
)
await trio.sleep(0.5)
rent_cs.cancel()
pcs.cancel()
# go back to reonnect loop in parent task
return
@ -204,7 +204,7 @@ async def _reconnect_forever(
f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n'
)
rent_cs.cancel()
pcs.cancel()
# go back to reonnect loop in parent task
return
@ -228,12 +228,7 @@ async def _reconnect_forever(
nobsws._connected = trio.Event()
task_status.started()
mc_state: trio._channel.MemoryChannelState = snd._state
while (
mc_state.open_receive_channels > 0
and
mc_state.open_send_channels > 0
):
while not snd._closed:
log.info(
f'{src_mod}\n'
f'{url} trying (RE)CONNECT'
@ -242,11 +237,10 @@ async def _reconnect_forever(
ws: WebSocketConnection
try:
async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
cs = nobsws._cs = tn.cancel_scope
cs = nobsws._cs = n.cancel_scope
nobsws._ws = ws
log.info(
f'{src_mod}\n'
@ -254,7 +248,7 @@ async def _reconnect_forever(
)
# begin relay loop to forward msgs
tn.start_soon(
n.start_soon(
proxy_msgs,
ws,
cs,
@ -268,7 +262,7 @@ async def _reconnect_forever(
# TODO: should we return an explicit sub-cs
# from this fixture task?
await tn.start(
await n.start(
open_fixture,
fixture,
nobsws,
@ -278,23 +272,11 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above.
nobsws._connected.set()
await trio.sleep_forever()
except (
HandshakeError,
ConnectionRejected,
):
except HandshakeError:
log.exception('Retrying connection')
await trio.sleep(0.5) # throttle
except BaseException as _berr:
berr = _berr
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
raise berr
# ws & nursery block ends
#|_ws & nursery block ends
nobsws._connected = trio.Event()
if cs.cancelled_caught:
log.cancel(
@ -342,25 +324,21 @@ async def open_autorecon_ws(
connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for
(re)requesting subscriptions without requiring streaming setup
code to rerun.
entered/exitted around each connection reset; eg. for (re)requesting
subscriptions without requiring streaming setup code to rerun.
'''
snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as n:
nobsws = NoBsWs(
url,
rcv,
msg_recv_timeout=msg_recv_timeout,
)
await tn.start(
await n.start(
partial(
_reconnect_forever,
url,
@ -373,10 +351,11 @@ async def open_autorecon_ws(
await nobsws._connected.wait()
assert nobsws._cs
assert nobsws.connected()
try:
yield nobsws
finally:
tn.cancel_scope.cancel()
n.cancel_scope.cancel()
'''
@ -389,8 +368,8 @@ of msgs over a `NoBsWs`.
class JSONRPCResult(Struct):
id: int
jsonrpc: str = '2.0'
result: dict|None = None
error: dict|None = None
result: Optional[dict] = None
error: Optional[dict] = None
@acm

View File

@ -357,9 +357,7 @@ async def allocate_persistent_feed(
# yield back control to starting nursery once we receive either
# some history or a real-time quote.
log.info(
f'loading OHLCV history: {fqme!r}\n'
)
log.info(f'loading OHLCV history: {fqme}')
await some_data_ready.wait()
flume = Flume(

View File

@ -107,22 +107,17 @@ async def open_piker_runtime(
async with (
tractor.open_root_actor(
# passed through to `open_root_actor`
# passed through to ``open_root_actor``
registry_addrs=registry_addrs,
name=name,
start_method=start_method,
loglevel=loglevel,
debug_mode=debug_mode,
# XXX NOTE MEMBER DAT der's a perf hit yo!!
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback=True,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
hide_tb=False,
**tractor_kwargs,
) as actor,
@ -262,10 +257,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> (
tractor._portal.Portal
|ClassVar[Services]
):
) -> tractor._portal.Portal | ClassVar[Services]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
@ -290,11 +282,10 @@ async def maybe_open_pikerd(
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or
[_default_reg_addr]
or [_default_reg_addr]
)
pikerd_portal: tractor.Portal|None
pikerd_portal: tractor.Portal | None
async with (
open_piker_runtime(
name=query_name,

View File

@ -28,7 +28,6 @@ from contextlib import (
)
import tractor
from trio.lowlevel import current_task
from ._util import (
log, # sub-sys logger
@ -71,84 +70,69 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name]
await lock.acquire()
try:
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
) as portal:
if portal is not None:
lock.release()
yield portal
return
log.warning(
f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..'
)
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**pikerd_kwargs,
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
except BaseException as _err:
err = _err
if (
lock.locked()
and
lock.statistics().owner is current_task()
):
log.exception(
f'Releasing stale lock after crash..?'
f'{err!r}\n'
)
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
) as portal:
if portal is not None:
lock.release()
raise err
yield portal
return
log.warning(
f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..'
)
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**pikerd_kwargs,
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
async def spawn_emsd(

View File

@ -109,7 +109,7 @@ class Services:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context

View File

@ -101,15 +101,13 @@ async def open_registry(
if (
not tractor.is_root_process()
and
not Registry.addrs
and not Registry.addrs
):
Registry.addrs.extend(actor.reg_addrs)
if (
ensure_exists
and
not Registry.addrs
and not Registry.addrs
):
raise RuntimeError(
f"`{uid}` registry should already exist but doesn't?"
@ -148,7 +146,7 @@ async def find_service(
| list[Portal]
| None
):
# try:
reg_addrs: list[tuple[str, int]]
async with open_registry(
addrs=(
@ -159,39 +157,22 @@ async def find_service(
or Registry.addrs
),
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
log.info(
f'Scanning for service {service_name!r}'
)
maybe_portals: list[Portal] | Portal | None
# attach to existing daemon by name if possible
maybe_portals: list[Portal]|Portal|None
async with tractor.find_actor(
service_name,
registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref
) as maybe_portals:
if not maybe_portals:
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None
return
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals
# except BaseException as _berr:
# berr = _berr
# log.exception(
# 'tractor.find_actor() failed with,\n'
# )
# raise berr
async def check_for_service(
service_name: str,

View File

@ -15,8 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Remote control tasks for sending annotations (and maybe more cmds) to
a chart from some other actor.
Remote control tasks for sending annotations (and maybe more cmds)
to a chart from some other actor.
'''
from __future__ import annotations
@ -32,7 +32,6 @@ from typing import (
)
import tractor
import trio
from tractor import trionics
from tractor import (
Portal,
@ -317,9 +316,7 @@ class AnnotCtl(Struct):
)
yield aid
finally:
# async ipc send op
with trio.CancelScope(shield=True):
await self.remove(aid)
await self.remove(aid)
async def redraw(
self,

View File

@ -15,8 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
qompleterz: embeddable search and complete using trio, Qt and
rapidfuzz.
qompleterz: embeddable search and complete using trio, Qt and rapidfuzz.
"""
@ -47,7 +46,6 @@ import time
from pprint import pformat
from rapidfuzz import process as fuzzy
import tractor
import trio
from trio_typing import TaskStatus
@ -55,7 +53,7 @@ from piker.ui.qt import (
size_policy,
align_flag,
Qt,
# QtCore,
QtCore,
QtWidgets,
QModelIndex,
QItemSelectionModel,
@ -922,10 +920,7 @@ async def fill_results(
# issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as n:
for provider, (search, pause) in (
_searcher_cache.copy().items()
@ -949,7 +944,7 @@ async def fill_results(
status_field='-> searchin..',
)
await tn.start(
await n.start(
pack_matches,
view,
has_results,
@ -1009,14 +1004,12 @@ async def handle_keyboard_input(
view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616)
async with (
tractor.trionics.collapse_eg(), # needed?
trio.open_nursery() as tn
):
async with trio.open_nursery() as n:
# start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and
# async updates the completer view's results.
tn.start_soon(
n.start_soon(
partial(
fill_results,
searchw,

View File

@ -555,13 +555,14 @@ class OrderMode:
def on_fill(
self,
uuid: str,
price: float,
time_s: float,
pointing: str | None = None,
) -> bool:
) -> None:
'''
Fill msg handler.
@ -574,83 +575,60 @@ class OrderMode:
- update fill bar size
'''
# XXX WARNING XXX
# if a `Status(resp='error')` arrives *before* this
# fill-status, the `.dialogs` entry may have already been
# popped and thus the below will skipped.
#
# NOTE, to avoid this confusing scenario ensure that any
# errors delivered thru from the broker-backend are not just
# "noisy reporting" (like is very common from IB..) and are
# instead ONLY errors-causing-order-dialog-cancellation!
if not (dialog := self.dialogs.get(uuid)):
log.warning(
f'Order was already cleared from `.dialogs` ??\n'
f'uuid: {uuid!r}\n'
)
return False
dialog = self.dialogs[uuid]
lines = dialog.lines
chart = self.chart
if not lines:
log.warn("No line(s) for order {uuid}!?")
return False
# update line state(s)
#
# ?XXX this fails on certain types of races?
# XXX: seems to fail on certain types of races?
# assert len(lines) == 2
flume: Flume = self.feed.flumes[chart.linked.mkt.fqme]
_, _, ratio = flume.get_ds_info()
if lines:
flume: Flume = self.feed.flumes[chart.linked.mkt.fqme]
_, _, ratio = flume.get_ds_info()
for chart, shm in [
(self.chart, flume.rt_shm),
(self.hist_chart, flume.hist_shm),
]:
viz = chart.get_viz(chart.name)
index_field = viz.index_field
arr = shm.array
for chart, shm in [
(self.chart, flume.rt_shm),
(self.hist_chart, flume.hist_shm),
]:
viz = chart.get_viz(chart.name)
index_field = viz.index_field
arr = shm.array
# TODO: borked for int index based..
index = flume.get_index(time_s, arr)
# TODO: borked for int index based..
index = flume.get_index(time_s, arr)
# get absolute index for arrow placement
arrow_index = arr[index_field][index]
# get absolute index for arrow placement
arrow_index = arr[index_field][index]
self.arrows.add(
chart.plotItem,
uuid,
arrow_index,
price,
pointing=pointing,
color=lines[0].color
)
self.arrows.add(
chart.plotItem,
uuid,
arrow_index,
price,
pointing=pointing,
color=lines[0].color
)
else:
log.warn("No line(s) for order {uuid}!?")
def on_cancel(
self,
uuid: str,
uuid: str
) -> bool:
) -> None:
msg: Order|None = self.client._sent_orders.pop(uuid, None)
if msg is None:
msg: Order = self.client._sent_orders.pop(uuid, None)
if msg is not None:
self.lines.remove_line(uuid=uuid)
self.chart.linked.cursor.show_xhair()
dialog = self.dialogs.pop(uuid, None)
if dialog:
dialog.last_status_close()
else:
log.warning(
f'Received cancel for unsubmitted order {pformat(msg)}'
)
return False
# remove GUI line, show cursor.
self.lines.remove_line(uuid=uuid)
self.chart.linked.cursor.show_xhair()
# remove msg dialog (history)
dialog: Dialog|None = self.dialogs.pop(uuid, None)
if dialog:
dialog.last_status_close()
return True
def cancel_orders_under_cursor(self) -> list[str]:
return self.cancel_orders(
@ -1079,23 +1057,13 @@ async def process_trade_msg(
if name in (
'position',
):
mkt: MktPair = mode.chart.linked.mkt
sym: MktPair = mode.chart.linked.mkt
pp_msg_symbol = msg['symbol'].lower()
pp_msg_bsmktid = msg['bs_mktid']
fqme = mkt.fqme
broker = mkt.broker
fqme = sym.fqme
broker = sym.broker
if (
# match on any backed-specific(-unique)-ID first!
(
pp_msg_bsmktid
and
mkt.bs_mktid == pp_msg_bsmktid
)
or
# OW try against what's provided as an FQME..
pp_msg_symbol == fqme
or
pp_msg_symbol == fqme.removesuffix(f'.{broker}')
or pp_msg_symbol == fqme.removesuffix(f'.{broker}')
):
log.info(
f'Loading position for `{fqme}`:\n'
@ -1118,7 +1086,7 @@ async def process_trade_msg(
return
msg = Status(**msg)
# resp: str = msg.resp
resp = msg.resp
oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid)
@ -1182,33 +1150,20 @@ async def process_trade_msg(
mode.on_submit(oid)
case Status(resp='error'):
# TODO: parse into broker-side msg, or should we
# expect it to just be **that** msg verbatim (since
# we'd presumably have only 1 `Error` msg-struct)
broker_msg: dict = msg.brokerd_msg
# XXX NOTE, this presumes the rxed "error" is
# order-dialog-cancel-causing, THUS backends much ONLY
# relay errors of this "severity"!!
log.error(
f'Order errored ??\n'
f'oid: {oid!r}\n'
f'\n'
f'{pformat(broker_msg)}\n'
f'\n'
f'=> CANCELLING ORDER DIALOG <=\n'
# from tractor.devx.pformat import ppfmt
# !TODO LOL, wtf the msg is causing
# a recursion bug!
# -[ ] get this shit on msgspec stat!
# f'{ppfmt(broker_msg)}'
)
# do all the things for a cancel:
# - drop order-msg dialog from client table
# - delete level line from view
mode.on_cancel(oid)
# TODO: parse into broker-side msg, or should we
# expect it to just be **that** msg verbatim (since
# we'd presumably have only 1 `Error` msg-struct)
broker_msg: dict = msg.brokerd_msg
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
case Status(resp='canceled'):
# delete level line from view
mode.on_cancel(oid)
@ -1223,10 +1178,10 @@ async def process_trade_msg(
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmtmsg}')
# TODO: do the struct-msg version, blah blah..
# req=Order(exec_mode='live', action='alert') as req,
case Status(
resp='triggered',
# TODO: do the struct-msg version, blah blah..
# req=Order(exec_mode='live', action='alert') as req,
req={
'exec_mode': 'live',
'action': 'alert',

View File

@ -1,22 +1,4 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
A per-display, DPI (scaling) info dumper.
"""
Resource list for mucking with DPIs on multiple screens:
- https://stackoverflow.com/questions/42141354/convert-pixel-size-to-point-size-for-fonts-on-multiple-platforms
@ -30,86 +12,89 @@ Resource list for mucking with DPIs on multiple screens:
- https://stackoverflow.com/questions/16561879/what-is-the-difference-between-logicaldpix-and-physicaldpix-in-qt
- https://doc.qt.io/qt-5/qguiapplication.html#screenAt
'''
"""
from pyqtgraph import QtGui
from PyQt6 import (
QtCore,
QtWidgets,
)
from PyQt6.QtCore import (
Qt,
QCoreApplication,
QSize,
QRect,
from PyQt5.QtCore import (
Qt, QCoreApplication
)
# Proper high DPI scaling is available in Qt >= 5.6.0. This attibute
# must be set before creating the application
if hasattr(Qt, 'AA_EnableHighDpiScaling'):
QCoreApplication.setAttribute(
Qt.AA_EnableHighDpiScaling,
True,
)
QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True)
if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
QCoreApplication.setAttribute(
Qt.AA_UseHighDpiPixmaps,
True,
)
QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
app = QtWidgets.QApplication([])
window = QtWidgets.QMainWindow()
main_widget = QtWidgets.QWidget()
app = QtGui.QApplication([])
window = QtGui.QMainWindow()
main_widget = QtGui.QWidget()
window.setCentralWidget(main_widget)
window.show()
pxr: float = main_widget.devicePixelRatioF()
pxr = main_widget.devicePixelRatioF()
# explicitly get main widget and primary displays
current_screen: QtGui.QScreen = app.screenAt(
main_widget.geometry().center()
# screen_num = app.desktop().screenNumber()
# screen = app.screens()[screen_num]
screen = app.screenAt(main_widget.geometry().center())
name = screen.name()
size = screen.size()
geo = screen.availableGeometry()
phydpi = screen.physicalDotsPerInch()
logdpi = screen.logicalDotsPerInch()
print(
# f'screen number: {screen_num}\n',
f'screen name: {name}\n'
f'screen size: {size}\n'
f'screen geometry: {geo}\n\n'
f'devicePixelRationF(): {pxr}\n'
f'physical dpi: {phydpi}\n'
f'logical dpi: {logdpi}\n'
)
primary_screen: QtGui.QScreen = app.primaryScreen()
screen: QtGui.QScreen
for screen in app.screens():
name: str = screen.name()
model: str = screen.model().rstrip()
size: QSize = screen.size()
geo: QRect = screen.availableGeometry()
phydpi: float = screen.physicalDotsPerInch()
logdpi: float = screen.logicalDotsPerInch()
is_primary: bool = screen is primary_screen
is_current: bool = screen is current_screen
print('-'*50)
print(
f'------ screen name: {name} ------\n'
f'|_primary: {is_primary}\n'
f' _current: {is_current}\n'
f' _model: {model}\n'
f' _screen size: {size}\n'
f' _screen geometry: {geo}\n'
f' _devicePixelRationF(): {pxr}\n'
f' _physical dpi: {phydpi}\n'
f' _logical dpi: {logdpi}\n'
)
screen = app.primaryScreen()
# app-wide font info
name = screen.name()
size = screen.size()
geo = screen.availableGeometry()
phydpi = screen.physicalDotsPerInch()
logdpi = screen.logicalDotsPerInch()
print(
# f'screen number: {screen_num}\n',
f'screen name: {name}\n'
f'screen size: {size}\n'
f'screen geometry: {geo}\n\n'
f'devicePixelRationF(): {pxr}\n'
f'physical dpi: {phydpi}\n'
f'logical dpi: {logdpi}\n'
)
# app-wide font
font = QtGui.QFont("Hack")
# use pixel size to be cross-resolution compatible?
font.setPixelSize(6)
fm = QtGui.QFontMetrics(font)
fontdpi: float = fm.fontDpi()
font_h: int = fm.height()
string: str = '10000'
str_br: QtCore.QRect = fm.boundingRect(string)
str_w: int = str_br.width()
fm = QtGui.QFontMetrics(font)
fontdpi = fm.fontDpi()
font_h = fm.height()
string = '10000'
str_br = fm.boundingRect(string)
str_w = str_br.width()
print(
f'------ global font settings ------\n'
# f'screen number: {screen_num}\n',
f'font dpi: {fontdpi}\n'
f'font height: {font_h}\n'
f'string bounding rect: {str_br}\n'

View File

@ -15,12 +15,6 @@ from piker.service import (
from piker.log import get_console_log
# include `tractor`'s built-in fixtures!
pytest_plugins: tuple[str] = (
"tractor._testing.pytest",
)
def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing")

View File

@ -12,14 +12,12 @@ from piker import config
from piker.accounting import (
Account,
calc,
open_account,
load_account,
load_account_from_ledger,
open_trade_ledger,
Position,
TransactionLedger,
open_trade_ledger,
load_account,
load_account_from_ledger,
)
import tractor
def test_root_conf_networking_section(
@ -55,17 +53,12 @@ def test_account_file_default_empty(
)
def test_paper_ledger_position_calcs(
fq_acnt: tuple[str, str],
debug_mode: bool,
):
broker: str
acnt_name: str
broker, acnt_name = fq_acnt
accounts_path: Path = (
config.repodir()
/ 'tests'
/ '_inputs' # tests-local-subdir
)
accounts_path: Path = config.repodir() / 'tests' / '_inputs'
ldr: TransactionLedger
with (
@ -84,7 +77,6 @@ def test_paper_ledger_position_calcs(
ledger=ldr,
_fp=accounts_path,
debug_mode=debug_mode,
) as (dfs, ledger),
@ -110,87 +102,3 @@ def test_paper_ledger_position_calcs(
df = dfs[xrp]
assert df['cumsize'][-1] == 0
assert pos.cumsize == 0
@pytest.mark.parametrize(
'fq_acnt',
[
('ib', 'algopaper'),
],
)
def test_ib_account_with_duplicated_mktids(
fq_acnt: tuple[str, str],
debug_mode: bool,
):
# ?TODO, once we start symcache-incremental-update-support?
# from piker.data import (
# open_symcache,
# )
#
# async def main():
# async with (
# # TODO: do this as part of `open_account()`!?
# open_symcache(
# 'ib',
# only_from_memcache=True,
# ) as symcache,
# ):
from piker.brokers.ib.ledger import (
tx_sort,
# ?TODO, once we want to pull lowlevel txns and process them?
# norm_trade_records,
# update_ledger_from_api_trades,
)
broker: str
acnt_id: str = 'algopaper'
broker, acnt_id = fq_acnt
accounts_def = config.load_accounts([broker])
assert accounts_def[f'{broker}.{acnt_id}']
ledger: TransactionLedger
acnt: Account
with (
tractor.devx.maybe_open_crash_handler(pdb=debug_mode),
open_trade_ledger(
'ib',
acnt_id,
tx_sort=tx_sort,
# TODO, eventually incrementally updated for IB..
# symcache=symcache,
symcache=None,
allow_from_sync_code=True,
) as ledger,
open_account(
'ib',
acnt_id,
write_on_exit=True,
) as acnt,
):
# per input params
symcache = ledger.symcache
assert not (
symcache.pairs
or
symcache.pairs
or
symcache.mktmaps
)
# re-compute all positions that have changed state.
# TODO: likely we should change the API to return the
# position updates from `.update_from_ledger()`?
active, closed = acnt.dump_active()
# breakpoint()
# TODO, (see above imports as well) incremental update from
# (updated) ledger?
# -[ ] pull some code from `.ib.broker` content.

View File

@ -42,7 +42,7 @@ from piker.accounting import (
unpack_fqme,
)
from piker.accounting import (
open_account,
open_pps,
Position,
)
@ -136,7 +136,7 @@ def load_and_check_pos(
) -> None:
with open_account(ppmsg.broker, ppmsg.account) as table:
with open_pps(ppmsg.broker, ppmsg.account) as table:
if ppmsg.size == 0:
assert ppmsg.symbol not in table.pps