Compare commits
	
		
			9 Commits 
		
	
	
		
			310_plus
			...
			gb_kraken_
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						628f2a6473 | |
| 
							
							
								 | 
						8b7f605416 | |
| 
							
							
								 | 
						ca8ef26ea5 | |
| 
							
							
								 | 
						4a3515541d | |
| 
							
							
								 | 
						ecd53459f6 | |
| 
							
							
								 | 
						e5a3b8643f | |
| 
							
							
								 | 
						369fd45c8e | |
| 
							
							
								 | 
						3ac48656a9 | |
| 
							
							
								 | 
						0c537a67a8 | 
| 
						 | 
					@ -8,8 +8,8 @@ expires_at = 1616095326.355846
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[kraken]
 | 
					[kraken]
 | 
				
			||||||
key_descr = "api_0"
 | 
					key_descr = "api_0"
 | 
				
			||||||
public_key = ""
 | 
					api_key = ""
 | 
				
			||||||
private_key = ""
 | 
					secret = ""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[ib]
 | 
					[ib]
 | 
				
			||||||
host = "127.0.0.1"
 | 
					host = "127.0.0.1"
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -20,7 +20,7 @@ Kraken backend.
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
from contextlib import asynccontextmanager
 | 
					from contextlib import asynccontextmanager
 | 
				
			||||||
from dataclasses import asdict, field
 | 
					from dataclasses import asdict, field
 | 
				
			||||||
from typing import List, Dict, Any, Tuple, Optional
 | 
					from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from trio_typing import TaskStatus
 | 
					from trio_typing import TaskStatus
 | 
				
			||||||
| 
						 | 
					@ -34,11 +34,23 @@ from pydantic.dataclasses import dataclass
 | 
				
			||||||
from pydantic import BaseModel
 | 
					from pydantic import BaseModel
 | 
				
			||||||
import wsproto
 | 
					import wsproto
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from .. import config
 | 
				
			||||||
from .._cacheables import open_cached_client
 | 
					from .._cacheables import open_cached_client
 | 
				
			||||||
from ._util import resproc, SymbolNotFound, BrokerError
 | 
					from ._util import resproc, SymbolNotFound, BrokerError
 | 
				
			||||||
from ..log import get_logger, get_console_log
 | 
					from ..log import get_logger, get_console_log
 | 
				
			||||||
from ..data import ShmArray
 | 
					from ..data import ShmArray
 | 
				
			||||||
from ..data._web_bs import open_autorecon_ws
 | 
					from ..data._web_bs import open_autorecon_ws
 | 
				
			||||||
 | 
					from ..clearing._messages import (
 | 
				
			||||||
 | 
					    BrokerdPosition, BrokerdOrder, BrokerdStatus,
 | 
				
			||||||
 | 
					    BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import urllib.parse
 | 
				
			||||||
 | 
					import hashlib
 | 
				
			||||||
 | 
					import hmac
 | 
				
			||||||
 | 
					import base64
 | 
				
			||||||
 | 
					import pandas as pd
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
log = get_logger(__name__)
 | 
					log = get_logger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -129,6 +141,41 @@ class OHLC:
 | 
				
			||||||
    ticks: List[Any] = field(default_factory=list)
 | 
					    ticks: List[Any] = field(default_factory=list)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_config() -> dict[str, Any]:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    conf, path = config.load()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    section = conf.get('kraken')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if section is None:
 | 
				
			||||||
 | 
					        log.warning(f'No config section found for kraken in {path}')
 | 
				
			||||||
 | 
					        return {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return section
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_kraken_signature(
 | 
				
			||||||
 | 
					    urlpath: str,
 | 
				
			||||||
 | 
					    data: Dict[str, Any], 
 | 
				
			||||||
 | 
					    secret: str
 | 
				
			||||||
 | 
					) -> str:
 | 
				
			||||||
 | 
					    postdata = urllib.parse.urlencode(data)
 | 
				
			||||||
 | 
					    encoded = (str(data['nonce']) + postdata).encode()
 | 
				
			||||||
 | 
					    message = urlpath.encode() + hashlib.sha256(encoded).digest()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512)
 | 
				
			||||||
 | 
					    sigdigest = base64.b64encode(mac.digest())
 | 
				
			||||||
 | 
					    return sigdigest.decode()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class InvalidKey(ValueError):
 | 
				
			||||||
 | 
					    """EAPI:Invalid key
 | 
				
			||||||
 | 
					        This error is returned when the API key used for the call is 
 | 
				
			||||||
 | 
					        either expired or disabled, please review the API key in your 
 | 
				
			||||||
 | 
					        Settings -> API tab of account management or generate a new one 
 | 
				
			||||||
 | 
					        and update your application."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Client:
 | 
					class Client:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def __init__(self) -> None:
 | 
					    def __init__(self) -> None:
 | 
				
			||||||
| 
						 | 
					@ -139,6 +186,9 @@ class Client:
 | 
				
			||||||
                'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
 | 
					                'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
        self._pairs: list[str] = []
 | 
					        self._pairs: list[str] = []
 | 
				
			||||||
 | 
					        self._name = ''
 | 
				
			||||||
 | 
					        self._api_key = ''
 | 
				
			||||||
 | 
					        self._secret = ''
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @property
 | 
					    @property
 | 
				
			||||||
    def pairs(self) -> Dict[str, Any]:
 | 
					    def pairs(self) -> Dict[str, Any]:
 | 
				
			||||||
| 
						 | 
					@ -162,6 +212,71 @@ class Client:
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        return resproc(resp, log)
 | 
					        return resproc(resp, log)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def _private(
 | 
				
			||||||
 | 
					        self,
 | 
				
			||||||
 | 
					        method: str,
 | 
				
			||||||
 | 
					        data: dict,
 | 
				
			||||||
 | 
					        uri_path: str
 | 
				
			||||||
 | 
					    ) -> Dict[str, Any]:
 | 
				
			||||||
 | 
					        headers = {
 | 
				
			||||||
 | 
					            'Content-Type': 
 | 
				
			||||||
 | 
					                'application/x-www-form-urlencoded',
 | 
				
			||||||
 | 
					            'API-Key':
 | 
				
			||||||
 | 
					                self._api_key,
 | 
				
			||||||
 | 
					            'API-Sign':
 | 
				
			||||||
 | 
					                get_kraken_signature(uri_path, data, self._secret)
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        resp = await self._sesh.post(
 | 
				
			||||||
 | 
					            path=f'/private/{method}',
 | 
				
			||||||
 | 
					            data=data,
 | 
				
			||||||
 | 
					            headers=headers,
 | 
				
			||||||
 | 
					            timeout=float('inf')
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        return resproc(resp, log)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def get_user_data(
 | 
				
			||||||
 | 
					        self,
 | 
				
			||||||
 | 
					        method: str,
 | 
				
			||||||
 | 
					        data: Dict[str, Any]
 | 
				
			||||||
 | 
					    ) -> Dict[str, Any]:
 | 
				
			||||||
 | 
					        uri_path = f'/0/private/{method}'
 | 
				
			||||||
 | 
					        data['nonce'] = str(int(1000*time.time()))
 | 
				
			||||||
 | 
					        resp = await self._private(method, data, uri_path)
 | 
				
			||||||
 | 
					        err = resp['error']
 | 
				
			||||||
 | 
					        if err:
 | 
				
			||||||
 | 
					            print(err)
 | 
				
			||||||
 | 
					        return resp['result']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def get_positions(
 | 
				
			||||||
 | 
					        self,
 | 
				
			||||||
 | 
					        data: Dict[str, Any] = {}
 | 
				
			||||||
 | 
					    ) -> Dict[str, Any]:
 | 
				
			||||||
 | 
					        balances = await self.get_user_data('Balance', data)
 | 
				
			||||||
 | 
					        ## TODO: grab all entries, not just first 50
 | 
				
			||||||
 | 
					        traders = await self.get_user_data('TradesHistory', data)
 | 
				
			||||||
 | 
					        positions = {}
 | 
				
			||||||
 | 
					        vols = {}
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        # positions
 | 
				
			||||||
 | 
					        ## TODO: Make sure to add option to include fees in positions calc
 | 
				
			||||||
 | 
					        for trade in traders['trades'].values():
 | 
				
			||||||
 | 
					            sign = -1 if trade['type'] == 'sell' else 1
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                positions[trade['pair']] += sign * float(trade['cost'])
 | 
				
			||||||
 | 
					                vols[trade['pair']] += sign * float(trade['vol'])
 | 
				
			||||||
 | 
					            except KeyError:
 | 
				
			||||||
 | 
					                positions[trade['pair']] = sign * float(trade['cost'])
 | 
				
			||||||
 | 
					                vols[trade['pair']] = sign * float(trade['vol'])
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					        for pair in positions.keys():
 | 
				
			||||||
 | 
					            asset_balance = vols[pair]
 | 
				
			||||||
 | 
					            if asset_balance == 0:
 | 
				
			||||||
 | 
					                positions[pair] = 0
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                positions[pair] /= asset_balance
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return positions, vols
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def symbol_info(
 | 
					    async def symbol_info(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        pair: Optional[str] = None,
 | 
					        pair: Optional[str] = None,
 | 
				
			||||||
| 
						 | 
					@ -275,12 +390,155 @@ class Client:
 | 
				
			||||||
async def get_client() -> Client:
 | 
					async def get_client() -> Client:
 | 
				
			||||||
    client = Client()
 | 
					    client = Client()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ## TODO: maybe add conditional based on section
 | 
				
			||||||
 | 
					    section = get_config()
 | 
				
			||||||
 | 
					    client._name = section['key_descr']
 | 
				
			||||||
 | 
					    client._api_key = section['api_key']
 | 
				
			||||||
 | 
					    client._secret = section['secret']
 | 
				
			||||||
 | 
					    ## TODO: Add a client attribute to hold this info
 | 
				
			||||||
 | 
					    #data = {
 | 
				
			||||||
 | 
					    #    # add non-nonce and non-ofs vars
 | 
				
			||||||
 | 
					    #}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # at startup, load all symbols locally for fast search
 | 
					    # at startup, load all symbols locally for fast search
 | 
				
			||||||
    await client.cache_symbols()
 | 
					    await client.cache_symbols()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    yield client
 | 
					    yield client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def pack_position(
 | 
				
			||||||
 | 
					    acc: str,
 | 
				
			||||||
 | 
					    symkey: str,
 | 
				
			||||||
 | 
					    pos: float,
 | 
				
			||||||
 | 
					    vol: float
 | 
				
			||||||
 | 
					) -> dict[str, Any]:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return BrokerdPosition(
 | 
				
			||||||
 | 
					        broker='kraken',
 | 
				
			||||||
 | 
					        account=acc,
 | 
				
			||||||
 | 
					        symbol=symkey,
 | 
				
			||||||
 | 
					        currency=symkey[-3:],
 | 
				
			||||||
 | 
					        size=float(vol),
 | 
				
			||||||
 | 
					        avg_price=float(pos),
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def normalize_symbol(
 | 
				
			||||||
 | 
					    ticker: str
 | 
				
			||||||
 | 
					) -> str:
 | 
				
			||||||
 | 
					    symlen = len(ticker)
 | 
				
			||||||
 | 
					    if symlen == 6:
 | 
				
			||||||
 | 
					        return ticker.lower()
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        for sym in ['XXBT', 'XXMR', 'ZEUR']:
 | 
				
			||||||
 | 
					            if sym in ticker:
 | 
				
			||||||
 | 
					                ticker = ticker.replace(sym, sym[1:])
 | 
				
			||||||
 | 
					        return ticker.lower()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def handle_order_requests(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        client: #kraken,
 | 
				
			||||||
 | 
					        ems_order_stream: tractor.MsgStream,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # order_request: dict
 | 
				
			||||||
 | 
					    async for request_msg in ems_order_stream:
 | 
				
			||||||
 | 
					        log.info(f'Received order request {request_msg}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        action = request_msg['action']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if action in {'buy', 'sell'}:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            account = request_msg['account']
 | 
				
			||||||
 | 
					            if account != 'kraken.spot':
 | 
				
			||||||
 | 
					                log.error(
 | 
				
			||||||
 | 
					                    'This is a kraken account, \
 | 
				
			||||||
 | 
					                    only a `kraken.spot` selection is valid'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                await ems_order_stream.send(BrokerError(
 | 
				
			||||||
 | 
					                    oid=request_msg['oid']
 | 
				
			||||||
 | 
					                    symbol=request_msg['symbol']
 | 
				
			||||||
 | 
					                    reason=f'Kraken only, No account found: `{account}` ?',
 | 
				
			||||||
 | 
					                ).dict())
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # validate
 | 
				
			||||||
 | 
					            order = BrokerdOrder(**request_msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # call our client api to submit the order
 | 
				
			||||||
 | 
					            ## TODO: look into the submit_limit method, do it write my own?
 | 
				
			||||||
 | 
					            reqid = await client.submit_limit(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                oid=order.oid,
 | 
				
			||||||
 | 
					                symbol=order.symbol,
 | 
				
			||||||
 | 
					                price=order.price,
 | 
				
			||||||
 | 
					                action=order.action,
 | 
				
			||||||
 | 
					                size=order.size,
 | 
				
			||||||
 | 
					                ## XXX: how do I handle new orders
 | 
				
			||||||
 | 
					                reqid=order.reqid,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # deliver ack that order has been submitted to broker routing
 | 
				
			||||||
 | 
					            await ems_order_stream.send(
 | 
				
			||||||
 | 
					                BrokerdOrderAck(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # ems order request id
 | 
				
			||||||
 | 
					                    oid=order.oid,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # broker specific request id
 | 
				
			||||||
 | 
					                    reqid=reqid,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                ).dict()
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        elif action == 'cancel':
 | 
				
			||||||
 | 
					             msg = BrokerdCancel(**request_msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					             await client.submit_cancel(
 | 
				
			||||||
 | 
					                reqid=msg.reqid
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            log.error(f'Unknown order command: {request_msg}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@tractor.context
 | 
				
			||||||
 | 
					async def trades_dialogue(
 | 
				
			||||||
 | 
					    ctx: tractor.Context,
 | 
				
			||||||
 | 
					    loglevel: str = None,
 | 
				
			||||||
 | 
					) -> AsyncIterator[Dict[str, Any]]:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
				
			||||||
 | 
					    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Authenticated block
 | 
				
			||||||
 | 
					    async with get_client() as client:
 | 
				
			||||||
 | 
					        acc_name = 'kraken.' + client._name
 | 
				
			||||||
 | 
					        positions, vols = await client.get_positions()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        all_positions = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for ticker, pos in positions.items():
 | 
				
			||||||
 | 
					            norm_sym = normalize_symbol(ticker)
 | 
				
			||||||
 | 
					            if float(vols[ticker]) != 0:
 | 
				
			||||||
 | 
					                msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
 | 
				
			||||||
 | 
					                all_positions.append(msg.dict())
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        open_orders = await client.get_user_data('OpenOrders', {})
 | 
				
			||||||
 | 
					        await tractor.breakpoint()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        await ctx.started((all_positions, (acc_name,)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        await trio.sleep_forever()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					       # async with (
 | 
				
			||||||
 | 
					       #     ctx.open_stream() as ems_stream,
 | 
				
			||||||
 | 
					       #     trio.open_nursery as n,
 | 
				
			||||||
 | 
					       # ):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def stream_messages(ws):
 | 
					async def stream_messages(ws):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    too_slow_count = last_hb = 0
 | 
					    too_slow_count = last_hb = 0
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -489,7 +489,8 @@ async def open_brokerd_trades_dialogue(
 | 
				
			||||||
        finally:
 | 
					        finally:
 | 
				
			||||||
            # parent context must have been closed
 | 
					            # parent context must have been closed
 | 
				
			||||||
            # remove from cache so next client will respawn if needed
 | 
					            # remove from cache so next client will respawn if needed
 | 
				
			||||||
            _router.relays.pop(broker)
 | 
					            ## TODO: Maybe add a warning
 | 
				
			||||||
 | 
					            _router.relays.pop(broker, None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@tractor.context
 | 
					@tractor.context
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -389,7 +389,7 @@ async def handle_order_requests(
 | 
				
			||||||
            account = request_msg['account']
 | 
					            account = request_msg['account']
 | 
				
			||||||
            if account != 'paper':
 | 
					            if account != 'paper':
 | 
				
			||||||
                log.error(
 | 
					                log.error(
 | 
				
			||||||
                    'On a paper account, only a `paper` selection is valid'
 | 
					                    'This is a paper account, only a `paper` selection is valid'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
                await ems_order_stream.send(BrokerdError(
 | 
					                await ems_order_stream.send(BrokerdError(
 | 
				
			||||||
                    oid=request_msg['oid'],
 | 
					                    oid=request_msg['oid'],
 | 
				
			||||||
| 
						 | 
					@ -463,7 +463,8 @@ async def trades_dialogue(
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        # TODO: load paper positions per broker from .toml config file
 | 
					        # TODO: load paper positions per broker from .toml config file
 | 
				
			||||||
        # and pass as symbol to position data mapping: ``dict[str, dict]``
 | 
					        # and pass as symbol to position data mapping: ``dict[str, dict]``
 | 
				
			||||||
        await ctx.started(({}, ['paper']))
 | 
					        # await ctx.started(all_positions)
 | 
				
			||||||
 | 
					        await ctx.started(({}, {'paper',}))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with (
 | 
					        async with (
 | 
				
			||||||
            ctx.open_stream() as ems_stream,
 | 
					            ctx.open_stream() as ems_stream,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue