Compare commits

...

6 Commits

Author SHA1 Message Date
Tyler Goodlet c7c823046e `.accounting._ledger`: typing anda more multiline styling 2025-02-13 21:00:49 -05:00
Tyler Goodlet 41c35c07d3 Drop some bps and style logic to multiline 2025-02-13 21:00:49 -05:00
Tyler Goodlet 23eb089a2f `.accounting` add synopsis section to readme 2025-02-13 21:00:49 -05:00
Tyler Goodlet 1c072b3ccd `.questrade`: link in ws-API issue! 2025-02-13 20:20:21 -05:00
Tyler Goodlet 45731e1bea `.kraken.broker`: need to `await verify_balances()` .. 2025-02-13 20:20:21 -05:00
Tyler Goodlet aaf7796cf2 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! 2025-02-13 20:20:21 -05:00
8 changed files with 110 additions and 30 deletions

View File

@ -1,8 +1,40 @@
.accounting
-----------
piker.accounting
________________
A subsystem for transaction processing, storage and historical
measurement.
synopsis
--------
The big question for any trader is this:
*what is the price that determines whether i take a loss or a gain on my
trade?*
In other words, at any given state of accounting your current assets,
what is the price between any 2 assets you've transacted that determines
at which price you can conduct **the next** transaction and know if you
are making or losing more (or less) of the *source* asset versus the
*destination* asset?
Let's do a very simple example:
> Joe wants to buy some tacos bc they're super hungo.
> Joe has a friend who also likes tacos but doesn't mind if they're fresh; he doesn't mind having day old tacos.
> Inflation is rampant and taco prices are trending up for no good reason besides everyone thinks prices are going up.
> Joe goes to the taco stand and buys 4 tacos at 25 mxn.
> This makes Joe's net cost `4 * 25 = 200` mxn.
> Joe eats 3 tacos and realizes that he can't finish the last, so he puts it in the fridge to save for the next day (since he owns a comal).
> The next day the price of tacos goes up to 30 mxn (for no good reason > besides the taco stand noticing Joe is a tourist and that > "inflation" is some thing that's used as an excuse for price changes).
> Joe's friend from before got lit up (like he does every morning) and msgs Joe to buy him 2 tacos for when he shows up in the late morning.
> Joe says "sure, but i also have a leftover if you want it, and I'm fasting today so you can have my sobras and i'll buy you a new one".
> The friend coughs a couple times, and says "yee no problem man, just make sure you get them"
>
Prior *suit* definitions:
- the canucks equiv of the IRS call this idea ["Adjusted cost base"](https://www.canada.ca/en/revenue-agency/services/tax/individuals/topics/about-your-tax-return/tax-return/completing-a-tax-return/personal-income/line-12700-capital-gains/definitions-capital-gains.html#Adjustedcostbase)
.pnl
----

View File

@ -40,7 +40,7 @@ import tomli_w # for fast ledger writing
from piker.types import Struct
from piker import config
from ..log import get_logger
from piker.log import get_logger
from .calc import (
iter_by_dt,
)
@ -239,7 +239,9 @@ class TransactionLedger(UserDict):
symcache: SymbologyCache = self._symcache
towrite: dict[str, Any] = {}
for tid, txdict in self.tx_sort(self.data.copy()):
for tid, txdict in self.tx_sort(
self.data.copy()
):
# write blank-str expiry for non-expiring assets
if (
'expiry' in txdict
@ -377,7 +379,7 @@ def open_trade_ledger(
account,
dirpath=_fp,
)
cpy = ledger_dict.copy()
cpy: dict = ledger_dict.copy()
# XXX NOTE: if not provided presume we are being called from
# sync code and need to maybe run `trio` to generate..
@ -406,7 +408,13 @@ def open_trade_ledger(
account=account,
mod=mod,
symcache=symcache,
tx_sort=getattr(mod, 'tx_sort', tx_sort),
# NOTE: allow backends to provide custom ledger sorting
tx_sort=getattr(
mod,
'tx_sort',
tx_sort,
),
)
try:
yield ledger

View File

@ -305,8 +305,8 @@ class MktPair(Struct, frozen=True):
# config right?
# src_type: AssetTypeName
# for derivs, info describing contract, egs.
# strike price, call or put, swap type, exercise model, etc.
# for derivs, info describing contract, egs. strike price, call
# or put, swap type, exercise model, etc.
contract_info: list[str] | None = None
# TODO: rename to sectype since all of these can

View File

@ -251,10 +251,16 @@ def iter_by_dt(
for k in parsers:
if (
isdict and k in tx
or getattr(tx, k, None)
or
getattr(tx, k, None)
):
v = tx[k] if isdict else tx.dt
assert v is not None, f'No valid value for `{k}`!?'
v = (
tx[k] if isdict
else tx.dt
)
assert v is not None, (
f'No valid value for `{k}`!?'
)
# only call parser on the value if not None from
# the `parsers` table above (when NOT using
@ -269,8 +275,21 @@ def iter_by_dt(
return v
else:
# XXX: should never get here..
breakpoint()
# TODO: move to top?
from piker.log import get_logger
log = get_logger(__name__)
# XXX: we should really never get here..
# only if a ledger record has no expected sort(able)
# field will we likely hit this.. like with ze IB.
# if no sortable field just deliver epoch?
log.warning(
'No (time) sortable field for TXN:\n'
f'{tx}\n'
)
return from_timestamp(0)
# breakpoint()
entry: tuple[str, dict] | Transaction
for entry in sorted(

View File

@ -300,7 +300,8 @@ def disect(
assert not df.is_empty()
# muck around in pdbp REPL
breakpoint()
# tractor.devx.mk_pdb().set_trace()
# breakpoint()
# TODO: we REALLY need a better console REPL for this
# kinda thing..

View File

@ -587,7 +587,7 @@ async def get_bars(
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
data_cs, reset_done = await tn.start(
partial(
wait_on_data_reset,
proxy,
@ -607,11 +607,11 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
async with trio.open_nursery() as tn:
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
tn.start_soon(query)
# start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
@ -631,7 +631,7 @@ async def get_bars(
unset_resetter: bool = True
# spawn new data reset task
data_cs, reset_done = await nurse.start(
data_cs, reset_done = await tn.start(
partial(
wait_on_data_reset,
proxy,
@ -705,7 +705,9 @@ async def _setup_quote_stream(
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
log.error(
f'Disconnected stream for `{symbol}`'
)
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
@ -761,7 +763,10 @@ async def open_aio_quote_stream(
symbol: str,
contract: Contract | None = None,
) -> trio.abc.ReceiveStream:
) -> (
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
from tractor.trionics import broadcast_receiver
global _quote_streams
@ -778,6 +783,7 @@ async def open_aio_quote_stream(
yield from_aio
return
from_aio: tractor.to_asyncio.LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream,
symbol=symbol,
@ -983,17 +989,18 @@ async def stream_quotes(
)
cs: trio.CancelScope | None = None
startup: bool = True
iter_quotes: trio.abc.Channel
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
trio.open_nursery() as tn,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
) as iter_quotes,
):
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
@ -1021,9 +1028,9 @@ async def stream_quotes(
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
tn.start_soon(reset_on_feed)
async with aclosing(stream):
async with aclosing(iter_quotes):
# if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']:
@ -1038,19 +1045,21 @@ async def stream_quotes(
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await stream.receive()
ticker = await iter_quotes.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
False
# not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
@ -1066,13 +1075,18 @@ async def stream_quotes(
log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live
# quotes are now streaming.
# quotes are now active desptie not having
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# last = time.time()
async for ticker in stream:
async for ticker in iter_quotes:
quote = normalize(ticker)
fqme = quote['fqme']
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them

View File

@ -544,7 +544,7 @@ async def open_trade_dialog(
# to be reloaded.
balances: dict[str, float] = await client.get_balances()
verify_balances(
await verify_balances(
acnt,
src_fiat,
balances,

View File

@ -37,6 +37,12 @@ import tractor
from async_generator import asynccontextmanager
import numpy as np
import wrapt
# TODO, port to `httpx`/`trio-websocket` whenver i get back to
# writing a proper ws-api streamer for this backend (since the data
# feeds are free now) as per GH feat-req:
# https://github.com/pikers/piker/issues/509
#
import asks
from ..calc import humanize, percent_change