Turns out the EMS can support this as originally expected: you can
update a `brokerd`-side `.reqid` through a `BrokerdAck` msg and the ems
which update its cross-dialog (leg) tracking correctly! The issue was
a bug in the `editOrderStatus` msg handling and appropriate tracking
of the correct `.oid` (ems uid) on the kraken side. This unfortunately
required adding a `emsflow: dict[str, list[BrokerdOrder]]` msg flow
tracing table which means the broker daemon is tracking all the msg flow
with the ems, though I'm wondering now if this is just good practise
anyway and maybe we should offer a small primitive type from our msging
utils to aid with this? I've used such constructs in event handling
systems prior.
There's a lot more factoring that can be done after these changes as
well but the quick detailed summary is,
- rework the `handle_order_requests()` loop to use `match:` syntax and
update the new `emsflow` table on every new request from the ems.
- fix the `editOrderStatus` case pattern to not include an error msg and
thus actually be triggered to respond to the ems with a `BrokerdAck`
containing the new `.reqid`, the new kraken side `txid`.
- skip any `openOrders` msgs which are detected as being kraken's
internal order "edits" by matching on the `cancel_reason` field.
- update the `emsflow` table in all ws-stream msg handling blocks
with responses sent to the ems.
Relates to #290
Move to using the websocket API for all order control ops and dropping
the sync rest api approach which resulted in a bunch of buggy races.
Further this gets us must faster (batch) order cancellation for free
and a simpler ems request handler loop. We now heavily leverage the new
py3.10 `match:` syntax for all kraken-side API msg parsing and
processing and handle both the `openOrders` and `ownTrades` subscription
streams.
We also block "order editing" (by immediate cancellation) for now since
the EMS isn't entirely yet equipped to handle brokerd side `.reqid`
changes (which is how kraken implements so called order "updates" or
"edits") for a given order-request dialog and we may want to even
consider just implementing "updates" ourselves via independent cancel
and submit requests? Definitely something to ponder. Alternatively we
can "masquerade" such updates behind the count-style `.oid` remapping we
had to implement anyway (kraken's limitation) and maybe everything will
just work?
Further details in this patch:
- create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s
that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable
local lookup of ems uids to piker-backend-client-side request ids and
received order messages.
- add `openOrders` sub support which more or less directly relays to
equivalent `BrokerdStatus` updates and calc the `.filled` and
`.remaining` values based on cleared vlm updates.
- add handler blocks for `[add/edit/cancel]OrderStatus` events including
error msg cases.
- don't do any order request response processing in
`handle_order_requests()` since responses are always received via one
(or both?) of the new ws subs: `ownTrades` and `openOrders` and thus
such msgs are now handled in the response relay loop.
Relates to #290Resolves#310, #296
This drops the use of `pp.update_pps_conf()` (and friends) and instead
moves to using the context style `open_trade_ledger()` and `open_pps()`
managers for faster pp msg gen due to delayed file writing (which was
the main source update latency).
In order to make this work with potentially multiple accounts this also
uses an exit stack which loads each ledger / `pps.toml` into an account
id mapped `dict`; a POC for likely how we should implement some higher
level position manager api.
Change `.find_contract()` -> `.find_contracts()` to allow multi-search
for so called "ambiguous" contracts (like for `Future`s) such that the
method now returns a `list` of tracts and populates the contract cache
with all specific tracts retrieved. Let it take in an (unvalidated)
contract that will be fqsn-style-tokenized such that it can be called
from `.search_symbols()` (though we're not quite yet XD).
More stuff,
- add `Client.parse_patt2fqsn()` which is an fqsn to token unpacker
built from the original logic in the old `.find_contract()`.
- handle fiat/forex pairs with the `'CASH'` sectype.
- add a flag to allow unqualified contracts to fail with a warning msg.
- populate the client's contract cache with all expiries of
an ambiguous derivative.
- allow `.con_deats()` to warn msg instead of raise on def-not-found.
- add commented `assert 0` which was triggering a debugger deadlock in
`tractor` which we still haven't been able to create a unit test for.
Not sure this didn't get caught in usage, but basically real-time
updates got broken by a rework of `update_ledger_from_api_trades()`.
The issue is that the ledger was being updated **before** calling
`piker.pp.update_pps_conf()` which resulted in the `Position.size`
not being updated correctly since the [latest added] clears passed
in via the `trade_records` arg were already found in the `.clears` table
and thus were causing the loop to skip the `Position.lifo_update()`
call..
The solution here is to not update the ledger **until after** we call
`update_pps_conf()` - it's more read/writes but it's correct and we
figure out a less io heavy way to do the file writing later.
Further this includes a fix to avoid double emitting a pp update caused
by non-thorough logic that waits for a commission report to arrive
during a fill event; previously we were emitting the same message twice
due to the lack of a check for an existing comms report in the case
where the report arrives *after* the fill.
Moves to using the new `piker.pp` apis to both store real-time trade
events in a ledger file as well emit position update msgs (which were
not in this backend at all prior) when new orders clear (aka fill).
In terms of outstanding issues,
- solves the pp update part of the bugs reported in #310
- starts a msg case block in prep for #293
Details of rework:
- move the `subscribe()` ws fixture to module level and `partial()` in
the client token instead of passing it to the instance; in prep for
removal of the `.token` attr from the `NoBsWs` wrapper.
- drop `make_auth_sub()` since it was too thin and we can just
do it all succinctly in `subscribe()`
- filter trade update msgs to those not yet stored int the toml ledger
- much better kraken api msg unpacking using new `match:` synax B)
Resolves#311
No real-time update support (yet) but this is the first draft at writing
trades ledgers and `pps.toml` entries for the kraken backend.
Deatz:
- drop `pack_positions()`, no longer used.
- use `piker.pp` apis to both write a trades ledger file and update the
`pps.toml` inside the `trades_dialogue()` endpoint startup.
- drop the weird paper engine swap over if auth can't be done, we should
be doing something with messaging in the ems over this..
- more web API error response raising.
- pass the `pp.Transaction` set loaded from ledger into
`process_trade_msgs()` do avoid duplicate sends of already collected
trades msgs.
- add `norm_trade_records()` public endpoing (used by `piker.pp` api)
and `update_ledger()` helper.
- rejig `process_trade_msgs()` to drop the weird `try:` assertion block
and skip already-recorded-in-ledger trade msgs as well as yield *each*
trade instead of sub-sequences.
Before we weren't emitting pp msgs when a position went back to "net
zero" (aka the size is zero) nor when a new one was opened (wasn't
previously loaded from the `pps.toml`). This reworks a bunch of the
incremental update logic as well as ports to the changes in the
`piker.pp` module:
- rename a few of the normalizing helpers to be more explicit.
- drop calling `pp.get_pps()` in the trades dialog task and instead
create msgs iteratively, per account, by iterating through collected
position and API trade records and calling instead
`pp.update_pps_conf()`.
- always from-ledger-update both positions reported from ib's pp sys and
session api trades detected on ems-trade-dialog startup.
- `update_ledger_from_api_trades()` now does **just** that: only updates
the trades ledger and returns the transaction set.
- `update_and_audit_msgs()` now only the input list of msgs and properly
generates new msgs for newly created positions that weren't previously
loaded from the `pps.toml`.
We can probably make this better (and with less file sys accesses) later
such that we keep a consistent pps state in mem and only write async
maybe from another side-task?
What a nightmare this was.. main holdup was that cost (commissions)
reports are fired independent from "fills" so you can't really emit
a proper full position update until they both arrive.
Deatz:
- move `push_tradesies()` and relay loop in `deliver_trade_events()` to
the new py3.10 `match:` syntax B)
- subscribe for, and handle `CommissionReport` events from `ib_insync`
and repack as a `cost` event type.
- handle cons with no primary/listing exchange (like futes) in
`update_ledger_from_api_trades()` by falling back to the plain
'exchange' field.
- drop reverse fqsn lookup from ib positions map; just use contract
lookup for api trade logs since we're already connected..
- make validation in `update_and_audit()` optional via flag.
- pass in the accounts def, ib pp msg table and the proxies table to the
trade event relay task-loop.
- add `emit_pp_update()` too encapsulate a full api trade entry
incremental update which calls into the `piker.pp` apis to,
- update the ledger
- update the pps.toml
- generate a new `BrokerdPosition` msg to send to the ems
- adjust trades relay loop to only emit pp updates when a cost report
arrives for the fill/execution by maintaining a small table per exec
id.
I don't want to rant too much any more since it's pretty clear `ib` has
either zero concern for its (api) user's or a severely terrible data
management team and/or general inter-team coordination system, but this
patch more or less hacks the flex report records to be similar enough to
API "execution" / "fill" records such that they can be similarly
normalized and stored as well as processed for position calculations..
Dirty deats,
- use the `IB.fills()` method for pulling current session trade events
since it's both recommended in the docs and does seem to capture
more extensive meta-data.
- add a `update_ledger_from_api()` helper which does all the insane work
of making sure api trade entries are usable both within piker's global
fqsn system but also compatible with incremental updates of positions
computed from trade ledgers derived from ib's "flex reports".
- add "auditting" of `ib`'s reported positioning API messages by
comparison with piker's new "traders first" breakeven price style and
complain via logging on mismatches.
- handle buy vs. sell arithmetic (via a +ve or -ve multiplier) to make
"size" arithmetic work for API trade entries..
- draft out options contract transaction parsing but skip in pps
generation for now.
- always use the "execution id" as ledger keys both in flex and api
trade processing.
- for whatever weird reason `ib_insync` doesn't include the so called
"primary exchange" in contracts reported in fill events, so do manual
contract lookups in such cases such that pps entries can be placed
in the right fqsn section...
Still ToDo:
- incremental update on trade clears / position updates
- pps audit from ledger depending on user config?
Since "flex reports" are only available for the current session's trades
the day after, this adds support for also collecting trade execution
records for the current session and writing them to the equivalent
ledger file.
Summary:
- add `trades_to_records()` to handle parsing both flex and API event
objects into a common record form.
- add `norm_trade_records()` to handle converting ledger entries into
`TradeRecord` types from the new `piker.pps` mod (coming in next
commit).
The single-file module was getting way out of hand size-wise with the
new flex report parsing stuff so this starts the process of breaking
things up into smaller modules oriented around trade, data, and ledger
related endpoints.
Add support for backends to declare sub-modules to enable in
a `__enable_modules__: list[str]` module var which is parsed by the
daemon spawning code passed to `tractor`'s `enable_modules: list[str]`
input.
Relates to the bug discovered in #310, this should avoid out-of-order
msgs which do not have a `.reqid` set to be error logged to console.
Further, add `pformat()` to kraken logging of ems msging.
Given that naming the port map is mostly pointless, since accounts can
be detected once the client connects, just expect a `brokers.toml` to
define a simple sequence of port numbers. Toss in a warning for using
the old map/`dict` style.
Now that we have working client auth thanks to:
https://github.com/barneygale/asyncvnc/pull/4 and related issue,
we can use a pw for the vnc server, though we should eventually
auto-generate a random one from a docker super obviously.
Add logic to the data reset hack loop to do a connection reset after
2 failed/timeout attempts at the regular data reset. We need to also add
this logic around reconnectionn events that are due to the host
network connection: aka roaming that's faster then timing logic
builtin to the gateway.
`ib-gw` seems particularly fragile to connections from clients with the
same id (can result in weird connect hangs and even crashes) and
`ib_insync` doesn't handle intermittent tcp disconnects that
well..(especially on dockerized IBC setups). This adds a bunch of
changes to our client caching and scan loop as well a proper
task-locking-to-cache-proxies so that,
- `asyncio`-side clients aren't double-loaded/connected even when
explicitly trying to reconnect repeatedly with a given client to work
around the unreliability of the `asyncio.Transport` design in
`ib_insync`.
- we can use `tractor.trionics.maybe_open_context()` to lock the `trio`
side from loading more then one `Client` on the `asyncio` side and
instead on cache hits only making a new `MethodProxy` around the
reused `asyncio`-side client (since each `trio` task needs its own
inter-task msg channel).
- a `finally:` block teardown on all clients loaded in the scan loop
avoids stale connections.
- the connect params are now exposed as named args to
`load_aio_clients()` can be easily controlled from caller code.
Oh, and we properly hooked up the internal `ib_insync` logging to our
own internal schema - makes it a lot easier to debug wtf is going on XD
In order to expose more `asyncio` powered `Client` methods to endpoint
task-code this adds a more extensive and layered set of `MethodProxy`
loading routines, in dependency order these are:
- `load_clients_for_trio()` a `tractor.to_asyncio.open_channel_from()`
entry-point factory for loading all scanned clients on the `asyncio` side
and delivering them over the inter-task channel to a `trio`-side task.
- `get_preferred_data_client()` a simple client instance loading routine
which reads from the users `brokers.toml -> `prefer_data_account:
list[str]` which must list account names, in priority order, that are
acceptable to be used as the main "data connection client" such that
only one of the detected clients is used for data (whereas the rest
are used only for order entry).
- `open_client_proxies()` which delivers the detected `Client` set
wrapped each in a `MethodProxy`.
- `open_data_client()` which directly delivers the preferred data client
as a proxy for `trio` tasks.
- update `open_client_method_proxy()` and `open_client_proxy` to require
an input `Client` instance.
Further impl details:
- add `MethodProxy._aio_ns` to ref the original `asyncio` side proxied instance
- add `Client.trades()` to pull executions from the last day/session
- load proxies inside `trades_dialogue` and use the new `.trades()`
method to try and pull a fill ledger for eventual correct pp price
calcs (pertains to #307)..