First working `brokerd` -> `trades_dialogue()` ep loader
							parent
							
								
									f234483a1f
								
							
						
					
					
						commit
						312c4cdec7
					
				| 
						 | 
				
			
			@ -18,24 +18,137 @@
 | 
			
		|||
CLI front end for trades ledger and position tracking management.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import tractor
 | 
			
		||||
import trio
 | 
			
		||||
import typer
 | 
			
		||||
 | 
			
		||||
from ._pos import open_pps
 | 
			
		||||
from ..service import (
 | 
			
		||||
    open_piker_runtime,
 | 
			
		||||
)
 | 
			
		||||
# from ._pos import open_pps
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ledger = typer.Typer()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def broker_init(
 | 
			
		||||
    brokername: str,
 | 
			
		||||
    loglevel: str | None = None,
 | 
			
		||||
 | 
			
		||||
    **start_actor_kwargs,
 | 
			
		||||
 | 
			
		||||
) -> dict:
 | 
			
		||||
    '''
 | 
			
		||||
    Given an input broker name, load all named arguments
 | 
			
		||||
    which can be passed to a daemon + context spawn for
 | 
			
		||||
    the relevant `brokerd` service endpoint.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # log.info(f'Spawning {brokername} broker daemon')
 | 
			
		||||
    from ..brokers import get_brokermod
 | 
			
		||||
    brokermod = get_brokermod(brokername)
 | 
			
		||||
    modpath = brokermod.__name__
 | 
			
		||||
 | 
			
		||||
    start_actor_kwargs['name'] = f'brokerd.{brokername}'
 | 
			
		||||
    start_actor_kwargs.update(
 | 
			
		||||
        getattr(
 | 
			
		||||
            brokermod,
 | 
			
		||||
            '_spawn_kwargs',
 | 
			
		||||
            {},
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # lookup actor-enabled modules declared by the backend offering the
 | 
			
		||||
    # `brokerd` endpoint(s).
 | 
			
		||||
    enabled = start_actor_kwargs['enable_modules'] = [modpath]
 | 
			
		||||
    for submodname in getattr(
 | 
			
		||||
        brokermod,
 | 
			
		||||
        '__enable_modules__',
 | 
			
		||||
        [],
 | 
			
		||||
    ):
 | 
			
		||||
        subpath = f'{modpath}.{submodname}'
 | 
			
		||||
        enabled.append(subpath)
 | 
			
		||||
 | 
			
		||||
    # non-blocking setup of brokerd service nursery
 | 
			
		||||
    from ..data import _setup_persistent_brokerd
 | 
			
		||||
 | 
			
		||||
    return (
 | 
			
		||||
        start_actor_kwargs,  # to `ActorNursery.start_actor()`
 | 
			
		||||
        _setup_persistent_brokerd,  # service task ep
 | 
			
		||||
        getattr(  # trades endpoint
 | 
			
		||||
            brokermod,
 | 
			
		||||
            'trades_dialogue',
 | 
			
		||||
            None,
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ledger.command()
 | 
			
		||||
def sync(
 | 
			
		||||
    brokername: str,
 | 
			
		||||
    account: str,
 | 
			
		||||
 | 
			
		||||
    loglevel: str = 'cancel',
 | 
			
		||||
):
 | 
			
		||||
    with open_pps(
 | 
			
		||||
 | 
			
		||||
    start_kwargs, _, trades_ep = broker_init(
 | 
			
		||||
        brokername,
 | 
			
		||||
        account,
 | 
			
		||||
    ) as table:
 | 
			
		||||
        breakpoint()
 | 
			
		||||
        loglevel=loglevel,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
        async with (
 | 
			
		||||
            open_piker_runtime(
 | 
			
		||||
                name='ledger_cli',
 | 
			
		||||
                loglevel=loglevel,
 | 
			
		||||
            ) as (actor, sockaddr),
 | 
			
		||||
 | 
			
		||||
            tractor.open_nursery() as an,
 | 
			
		||||
        ):
 | 
			
		||||
            portal = await an.start_actor(
 | 
			
		||||
                loglevel=loglevel,
 | 
			
		||||
                debug_mode=True,
 | 
			
		||||
                **start_kwargs,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if (
 | 
			
		||||
                brokername == 'paper'
 | 
			
		||||
                or trades_ep is None
 | 
			
		||||
            ):
 | 
			
		||||
                # from . import _paper_engine as paper
 | 
			
		||||
                # open_trades_endpoint = paper.open_paperboi(
 | 
			
		||||
                #     fqme='.'.join([symbol, broker]),
 | 
			
		||||
                #     loglevel=loglevel,
 | 
			
		||||
                # )
 | 
			
		||||
                RuntimeError('Paper mode not supported for sync!')
 | 
			
		||||
            else:
 | 
			
		||||
                # open live brokerd trades endpoint
 | 
			
		||||
                open_trades_endpoint = portal.open_context(
 | 
			
		||||
                    trades_ep,
 | 
			
		||||
                    loglevel=loglevel,
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            positions: dict[str, Any]
 | 
			
		||||
            accounts: list[str]
 | 
			
		||||
            # brokerd_trades_stream: tractor.MsgStream
 | 
			
		||||
            async with (
 | 
			
		||||
                open_trades_endpoint as (
 | 
			
		||||
                    brokerd_ctx,
 | 
			
		||||
                    (positions, accounts,),
 | 
			
		||||
                ),
 | 
			
		||||
                # brokerd_ctx.open_stream() as brokerd_trades_stream,
 | 
			
		||||
            ):
 | 
			
		||||
                await tractor.breakpoint()
 | 
			
		||||
                await brokerd_ctx.cancel()
 | 
			
		||||
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue