Port broker core to tractor; fix chan unsub bug
parent
31b8277f08
commit
6c977cfb7b
|
@ -9,10 +9,9 @@ from types import ModuleType
|
||||||
from typing import Coroutine, Callable
|
from typing import Coroutine, Callable
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
from .. import tractor
|
from ..log import get_logger, get_console_log
|
||||||
from ..log import get_logger
|
|
||||||
from ..ipc import Channel
|
|
||||||
from . import get_brokermod
|
from . import get_brokermod
|
||||||
|
|
||||||
|
|
||||||
|
@ -75,7 +74,7 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
get_quotes: Coroutine,
|
get_quotes: Coroutine,
|
||||||
tickers2chans: {str: Channel},
|
tickers2chans: {str: tractor.Channel},
|
||||||
rate: int = 5, # delay between quote requests
|
rate: int = 5, # delay between quote requests
|
||||||
diff_cached: bool = True, # only deliver "new" quotes to the queue
|
diff_cached: bool = True, # only deliver "new" quotes to the queue
|
||||||
cid: str = None,
|
cid: str = None,
|
||||||
|
@ -246,8 +245,10 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None):
|
||||||
lambda ticker: ticker not in tickers, tickers2chans.copy()
|
lambda ticker: ticker not in tickers, tickers2chans.copy()
|
||||||
):
|
):
|
||||||
chanset = tickers2chans.get(ticker)
|
chanset = tickers2chans.get(ticker)
|
||||||
if chanset:
|
# XXX: cid will be different on unsub call
|
||||||
chanset.discard((chan, cid))
|
for item in chanset.copy():
|
||||||
|
if chan in item:
|
||||||
|
chanset.discard(item)
|
||||||
|
|
||||||
if not chanset:
|
if not chanset:
|
||||||
# pop empty sets which will trigger bg quoter task termination
|
# pop empty sets which will trigger bg quoter task termination
|
||||||
|
@ -257,7 +258,7 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None):
|
||||||
async def start_quote_stream(
|
async def start_quote_stream(
|
||||||
broker: str,
|
broker: str,
|
||||||
tickers: [str],
|
tickers: [str],
|
||||||
chan: 'Channel' = None,
|
chan: tractor.Channel = None,
|
||||||
cid: str = None,
|
cid: str = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Handle per-broker quote stream subscriptions.
|
"""Handle per-broker quote stream subscriptions.
|
||||||
|
@ -266,8 +267,11 @@ async def start_quote_stream(
|
||||||
Since most brokers seems to support batch quote requests we
|
Since most brokers seems to support batch quote requests we
|
||||||
limit to one task per process for now.
|
limit to one task per process for now.
|
||||||
"""
|
"""
|
||||||
|
actor = tractor.current_actor()
|
||||||
|
# set log level after fork
|
||||||
|
get_console_log(actor.loglevel)
|
||||||
# pull global vars from local actor
|
# pull global vars from local actor
|
||||||
ss = tractor.current_actor().statespace
|
ss = actor.statespace
|
||||||
broker2tickersubs = ss['broker2tickersubs']
|
broker2tickersubs = ss['broker2tickersubs']
|
||||||
clients = ss['clients']
|
clients = ss['clients']
|
||||||
dtasks = ss['dtasks']
|
dtasks = ss['dtasks']
|
||||||
|
|
Loading…
Reference in New Issue