`.clearing`: broad rename of `fqsn` -> `fqme`

pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-21 16:59:45 -04:00
parent d7c1e5e188
commit 76fe9018cf
2 changed files with 51 additions and 50 deletions

View File

@ -27,7 +27,7 @@ import trio
import tractor
from tractor.trionics import broadcast_receiver
from ..accounting._mktinfo import unpack_fqsn
from ..accounting._mktinfo import unpack_fqme
from ..log import get_logger
from ..data.types import Struct
from ..service import maybe_open_emsd
@ -177,7 +177,7 @@ async def relay_order_cmds_from_sync_code(
@acm
async def open_ems(
fqsn: str,
fqme: str,
mode: str = 'live',
loglevel: str = 'error',
@ -229,7 +229,7 @@ async def open_ems(
# ready for order commands
book = get_orders()
broker, symbol, suffix = unpack_fqsn(fqsn)
broker, symbol, suffix = unpack_fqme(fqme)
async with maybe_open_emsd(broker) as portal:
@ -246,7 +246,7 @@ async def open_ems(
portal.open_context(
_emsd_main,
fqsn=fqsn,
fqme=fqme,
exec_mode=mode,
loglevel=loglevel,
@ -266,7 +266,7 @@ async def open_ems(
async with trio.open_nursery() as n:
n.start_soon(
relay_order_cmds_from_sync_code,
fqsn,
fqme,
trades_stream
)

View File

@ -44,7 +44,7 @@ import tractor
from ..log import get_logger
from ..data._normalize import iterticks
from ..accounting._mktinfo import (
unpack_fqsn,
unpack_fqme,
float_digits,
)
from ..data.feed import (
@ -156,7 +156,7 @@ async def clear_dark_triggers(
brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
fqsn: str,
fqme: str,
book: DarkBook,
@ -231,7 +231,7 @@ async def clear_dark_triggers(
account=account,
size=size,
):
bfqsn: str = symbol.replace(f'.{broker}', '')
bfqme: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away
resp = 'triggered' # hidden on client-side
@ -244,7 +244,7 @@ async def clear_dark_triggers(
oid=oid,
account=account,
time_ns=time.time_ns(),
symbol=bfqsn,
symbol=bfqme,
price=submit_price,
size=size,
)
@ -287,14 +287,14 @@ async def clear_dark_triggers(
# send response to client-side
await router.client_broadcast(
fqsn,
fqme,
status,
)
else: # condition scan loop complete
log.debug(f'execs are {execs}')
if execs:
book.triggers[fqsn] = execs
book.triggers[fqme] = execs
# print(f'execs scan took: {time.time() - start}')
@ -335,7 +335,7 @@ class Router(Struct):
# sets of clients mapped from subscription keys
subscribers: defaultdict[
str, # sub key, default fqsn
str, # sub key, default fqme
set[tractor.MsgStream], # unique client streams
] = defaultdict(set)
@ -424,7 +424,7 @@ class Router(Struct):
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
fqsn='.'.join([symbol, broker]),
fqme='.'.join([symbol, broker]),
loglevel=loglevel,
)
@ -506,7 +506,7 @@ class Router(Struct):
async def open_trade_relays(
self,
fqsn: str,
fqme: str,
exec_mode: str,
loglevel: str,
@ -520,24 +520,24 @@ class Router(Struct):
none already exists.
'''
broker, symbol, suffix = unpack_fqsn(fqsn)
broker, symbol, suffix = unpack_fqme(fqme)
async with (
maybe_open_feed(
[fqsn],
[fqme],
loglevel=loglevel,
) as feed,
):
brokername, _, _ = unpack_fqsn(fqsn)
brokername, _, _ = unpack_fqme(fqme)
brokermod = feed.mods[brokername]
broker = brokermod.name
portal = feed.portals[brokermod]
# XXX: this should be initial price quote from target provider
flume = feed.flumes[fqsn]
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last']
book.lasts[fqme]: float = first_quote['last']
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -556,7 +556,7 @@ class Router(Struct):
relay.brokerd_stream,
flume.stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
fqme, # form: <name>.<venue>.<suffix>.<broker>
book
)
@ -945,7 +945,7 @@ async def translate_and_relay_brokerd_events(
# may end up with collisions?
status_msg = Status(**brokerd_msg)
# NOTE: be sure to pack an fqsn for the client side!
# NOTE: be sure to pack an fqme for the client side!
order = Order(**status_msg.req)
order.symbol = f'{order.symbol}.{broker}'
@ -1022,7 +1022,7 @@ async def process_client_order_cmds(
client_order_stream: tractor.MsgStream,
brokerd_order_stream: tractor.MsgStream,
fqsn: str,
fqme: str,
flume: Flume,
dark_book: DarkBook,
router: Router,
@ -1049,11 +1049,11 @@ async def process_client_order_cmds(
# backend can be routed and relayed to subscribed clients.
subs = router.dialogs[oid]
# add all subscribed clients for this fqsn (should eventually be
# add all subscribed clients for this fqme (should eventually be
# a more generalize subscription system) to received order msg
# updates (and thus show stuff in the UI).
subs.add(client_order_stream)
subs.update(router.subscribers[fqsn])
subs.update(router.subscribers[fqme])
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
@ -1111,7 +1111,7 @@ async def process_client_order_cmds(
and status.resp == 'dark_open'
):
# remove from dark book clearing
entry = dark_book.triggers[fqsn].pop(oid, None)
entry = dark_book.triggers[fqme].pop(oid, None)
if entry:
(
pred,
@ -1127,7 +1127,7 @@ async def process_client_order_cmds(
status.req = cmd
await router.client_broadcast(
fqsn,
fqme,
status,
)
@ -1137,7 +1137,7 @@ async def process_client_order_cmds(
dark_book._active.pop(oid)
else:
log.exception(f'No dark order for {fqsn}?')
log.exception(f'No dark order for {fqme}?')
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
@ -1146,7 +1146,7 @@ async def process_client_order_cmds(
# LIVE order REQUEST
case {
'oid': oid,
'symbol': fqsn,
'symbol': fqme,
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
@ -1159,7 +1159,7 @@ async def process_client_order_cmds(
# remove the broker part before creating a message
# to send to the specific broker since they probably
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
sym = fqme.replace(f'.{broker}', '')
if status is not None:
# if we already had a broker order id then
@ -1216,7 +1216,7 @@ async def process_client_order_cmds(
# DARK-order / alert REQUEST
case {
'oid': oid,
'symbol': fqsn,
'symbol': fqme,
'price': trigger_price,
'size': size,
'exec_mode': exec_mode,
@ -1238,7 +1238,7 @@ async def process_client_order_cmds(
# price received from the feed, instead of being
# like every other shitty tina platform that makes
# the user choose the predicate operator.
last = dark_book.lasts[fqsn]
last = dark_book.lasts[fqme]
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
@ -1280,7 +1280,7 @@ async def process_client_order_cmds(
# NOTE: this may result in an override of an existing
# dark book entry if the order id already exists
dark_book.triggers.setdefault(
fqsn, {}
fqme, {}
)[oid] = (
pred,
tickfilter,
@ -1305,7 +1305,7 @@ async def process_client_order_cmds(
# broadcast status to all subscribed clients
await router.client_broadcast(
fqsn,
fqme,
status,
)
@ -1316,35 +1316,36 @@ async def process_client_order_cmds(
@acm
async def maybe_open_trade_relays(
router: Router,
fqsn: str,
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
) -> tuple:
def cache_on_fqsn_unless_paper(
def cache_on_fqme_unless_paper(
router: Router,
fqsn: str,
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
) -> Hashable:
if exec_mode == 'paper':
return f'paper_{fqsn}'
return f'paper_{fqme}'
else:
return fqsn
return fqme
# XXX: closure to enable below use of
# ``tractor.trionics.maybe_open_context()``
@acm
async def cached_mngr(
router: Router,
fqsn: str,
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
):
relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays,
fqsn,
fqme,
exec_mode,
loglevel,
)
@ -1354,11 +1355,11 @@ async def maybe_open_trade_relays(
acm_func=cached_mngr,
kwargs={
'router': _router,
'fqsn': fqsn,
'fqme': fqme,
'exec_mode': exec_mode,
'loglevel': loglevel,
},
key=cache_on_fqsn_unless_paper,
key=cache_on_fqme_unless_paper,
) as (
cache_hit,
(relay, feed, client_ready)
@ -1369,7 +1370,7 @@ async def maybe_open_trade_relays(
@tractor.context
async def _emsd_main(
ctx: tractor.Context,
fqsn: str,
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
@ -1426,7 +1427,7 @@ async def _emsd_main(
global _router
assert _router
broker, symbol, suffix = unpack_fqsn(fqsn)
broker, symbol, suffix = unpack_fqme(fqme)
# TODO: would be nice if in tractor we can require either a ctx arg,
# or a named arg with ctx in it and a type annotation of
@ -1443,7 +1444,7 @@ async def _emsd_main(
# few duplicate streams as necessary per ems actor.
async with maybe_open_trade_relays(
_router,
fqsn,
fqme,
exec_mode,
loglevel,
) as (relay, feed, client_ready):
@ -1466,28 +1467,28 @@ async def _emsd_main(
# register the client side before starting the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
# TODO: instead of by fqsn we need a subscription
# TODO: instead of by fqme we need a subscription
# system/schema here to limit what each new client is
# allowed to see in terms of broadcasted order flow
# updates per dialog.
_router.subscribers[fqsn].add(client_stream)
_router.subscribers[fqme].add(client_stream)
client_ready.set()
# start inbound (from attached client) order request processing
# main entrypoint, run here until cancelled.
try:
flume = feed.flumes[fqsn]
flume = feed.flumes[fqme]
await process_client_order_cmds(
client_stream,
brokerd_stream,
fqsn,
fqme,
flume,
dark_book,
_router,
)
finally:
# try to remove client from subscription registry
_router.subscribers[fqsn].remove(client_stream)
_router.subscribers[fqme].remove(client_stream)
for oid, client_streams in _router.dialogs.items():
client_streams.discard(client_stream)