Compare commits
No commits in common. "54658016f1a00135c08b5145e04d16f981f330f0" and "07781e38cd5dd0b60b071b5d9ca6f68457ff38da" have entirely different histories.
54658016f1
...
07781e38cd
|
@ -24,10 +24,14 @@ from tractor._testing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX TODO cases:
|
# XXX TODO cases:
|
||||||
|
# - [ ] peer cancelled itself - so other peers should
|
||||||
|
# get errors reflecting that the peer was itself the .canceller?
|
||||||
|
|
||||||
# - [x] WE cancelled the peer and thus should not see any raised
|
# - [x] WE cancelled the peer and thus should not see any raised
|
||||||
# `ContextCancelled` as it should be reaped silently?
|
# `ContextCancelled` as it should be reaped silently?
|
||||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||||
# already covers this case?
|
# already covers this case?
|
||||||
|
|
||||||
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
||||||
# Portal.cancel_actor().
|
# Portal.cancel_actor().
|
||||||
# => all other connected peers should get that cancel requesting peer's
|
# => all other connected peers should get that cancel requesting peer's
|
||||||
|
@ -40,6 +44,16 @@ from tractor._testing import (
|
||||||
# that also spawned a remote task task in that same peer-parent.
|
# that also spawned a remote task task in that same peer-parent.
|
||||||
|
|
||||||
|
|
||||||
|
# def test_self_cancel():
|
||||||
|
# '''
|
||||||
|
# 2 cases:
|
||||||
|
# - calls `Actor.cancel()` locally in some task
|
||||||
|
# - calls LocalPortal.cancel_actor()` ?
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# ...
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def open_stream_then_sleep_forever(
|
async def open_stream_then_sleep_forever(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
@ -792,7 +806,7 @@ async def basic_echo_server(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str = 'wittle_bruv',
|
peer_name: str = 'wittle_bruv',
|
||||||
|
|
||||||
err_after_imsg: int|None = None,
|
err_after: int|None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -821,9 +835,8 @@ async def basic_echo_server(
|
||||||
await ipc.send(resp)
|
await ipc.send(resp)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
err_after_imsg
|
err_after
|
||||||
and
|
and i > err_after
|
||||||
i > err_after_imsg
|
|
||||||
):
|
):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'Simulated error in `{peer_name}`'
|
f'Simulated error in `{peer_name}`'
|
||||||
|
@ -965,8 +978,7 @@ async def tell_little_bro(
|
||||||
actor_name: str,
|
actor_name: str,
|
||||||
|
|
||||||
caller: str = '',
|
caller: str = '',
|
||||||
err_after: float|None = None,
|
err_after: int|None = None,
|
||||||
rng_seed: int = 50,
|
|
||||||
):
|
):
|
||||||
# contact target actor, do a stream dialog.
|
# contact target actor, do a stream dialog.
|
||||||
async with (
|
async with (
|
||||||
|
@ -977,18 +989,14 @@ async def tell_little_bro(
|
||||||
basic_echo_server,
|
basic_echo_server,
|
||||||
|
|
||||||
# XXX proxy any delayed err condition
|
# XXX proxy any delayed err condition
|
||||||
err_after_imsg=(
|
err_after=err_after,
|
||||||
err_after * rng_seed
|
|
||||||
if err_after is not None
|
|
||||||
else None
|
|
||||||
),
|
|
||||||
) as (sub_ctx, first),
|
) as (sub_ctx, first),
|
||||||
|
|
||||||
sub_ctx.open_stream() as echo_ipc,
|
sub_ctx.open_stream() as echo_ipc,
|
||||||
):
|
):
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
uid: tuple = actor.uid
|
uid: tuple = actor.uid
|
||||||
for i in range(rng_seed):
|
for i in range(100):
|
||||||
msg: tuple = (
|
msg: tuple = (
|
||||||
uid,
|
uid,
|
||||||
i,
|
i,
|
||||||
|
@ -1013,13 +1021,13 @@ async def tell_little_bro(
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'raise_sub_spawn_error_after',
|
'raise_sub_spawn_error_after',
|
||||||
[None, 0.5],
|
[None, 50],
|
||||||
)
|
)
|
||||||
def test_peer_spawns_and_cancels_service_subactor(
|
def test_peer_spawns_and_cancels_service_subactor(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
raise_client_error: str,
|
raise_client_error: str,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
raise_sub_spawn_error_after: float|None,
|
raise_sub_spawn_error_after: int|None,
|
||||||
):
|
):
|
||||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||||
# discovered as part of implementing workspace ctx
|
# discovered as part of implementing workspace ctx
|
||||||
|
@ -1033,7 +1041,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
# and the server's spawned child should cancel and terminate!
|
# and the server's spawned child should cancel and terminate!
|
||||||
peer_name: str = 'little_bro'
|
peer_name: str = 'little_bro'
|
||||||
|
|
||||||
|
|
||||||
def check_inner_rte(rae: RemoteActorError):
|
def check_inner_rte(rae: RemoteActorError):
|
||||||
'''
|
'''
|
||||||
Validate the little_bro's relayed inception!
|
Validate the little_bro's relayed inception!
|
||||||
|
@ -1127,7 +1134,8 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = await client_ctx.wait_for_result(hide_tb=False)
|
res = await client_ctx.result(hide_tb=False)
|
||||||
|
|
||||||
# in remote (relayed inception) error
|
# in remote (relayed inception) error
|
||||||
# case, we should error on the line above!
|
# case, we should error on the line above!
|
||||||
if raise_sub_spawn_error_after:
|
if raise_sub_spawn_error_after:
|
||||||
|
@ -1138,23 +1146,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
assert isinstance(res, ContextCancelled)
|
assert isinstance(res, ContextCancelled)
|
||||||
assert client_ctx.cancel_acked
|
assert client_ctx.cancel_acked
|
||||||
assert res.canceller == root.uid
|
assert res.canceller == root.uid
|
||||||
assert not raise_sub_spawn_error_after
|
|
||||||
|
|
||||||
# cancelling the spawner sub should
|
|
||||||
# transitively cancel it's sub, the little
|
|
||||||
# bruv.
|
|
||||||
print('root cancelling server/client sub-actors')
|
|
||||||
await spawn_ctx.cancel()
|
|
||||||
async with tractor.find_actor(
|
|
||||||
name=peer_name,
|
|
||||||
) as sub:
|
|
||||||
assert not sub
|
|
||||||
|
|
||||||
# XXX, only for tracing
|
|
||||||
# except BaseException as _berr:
|
|
||||||
# berr = _berr
|
|
||||||
# await tractor.pause(shield=True)
|
|
||||||
# raise berr
|
|
||||||
|
|
||||||
except RemoteActorError as rae:
|
except RemoteActorError as rae:
|
||||||
_err = rae
|
_err = rae
|
||||||
|
@ -1183,8 +1174,19 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
raise
|
raise
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert not raise_sub_spawn_error_after
|
||||||
|
|
||||||
|
# cancelling the spawner sub should
|
||||||
|
# transitively cancel it's sub, the little
|
||||||
|
# bruv.
|
||||||
|
print('root cancelling server/client sub-actors')
|
||||||
|
await spawn_ctx.cancel()
|
||||||
|
async with tractor.find_actor(
|
||||||
|
name=peer_name,
|
||||||
|
) as sub:
|
||||||
|
assert not sub
|
||||||
|
|
||||||
# await tractor.pause()
|
|
||||||
# await server.cancel_actor()
|
# await server.cancel_actor()
|
||||||
|
|
||||||
except RemoteActorError as rae:
|
except RemoteActorError as rae:
|
||||||
|
@ -1197,7 +1199,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
|
|
||||||
# since we called `.cancel_actor()`, `.cancel_ack`
|
# since we called `.cancel_actor()`, `.cancel_ack`
|
||||||
# will not be set on the ctx bc `ctx.cancel()` was not
|
# will not be set on the ctx bc `ctx.cancel()` was not
|
||||||
# called directly for this confext.
|
# called directly fot this confext.
|
||||||
except ContextCancelled as ctxc:
|
except ContextCancelled as ctxc:
|
||||||
_ctxc = ctxc
|
_ctxc = ctxc
|
||||||
print(
|
print(
|
||||||
|
@ -1237,19 +1239,12 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
|
|
||||||
# assert spawn_ctx.cancelled_caught
|
# assert spawn_ctx.cancelled_caught
|
||||||
|
|
||||||
async def _main():
|
|
||||||
with trio.fail_after(
|
|
||||||
3 if not debug_mode
|
|
||||||
else 999
|
|
||||||
):
|
|
||||||
await main()
|
|
||||||
|
|
||||||
if raise_sub_spawn_error_after:
|
if raise_sub_spawn_error_after:
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
trio.run(_main)
|
trio.run(main)
|
||||||
|
|
||||||
rae: RemoteActorError = excinfo.value
|
rae: RemoteActorError = excinfo.value
|
||||||
check_inner_rte(rae)
|
check_inner_rte(rae)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
trio.run(_main)
|
trio.run(main)
|
||||||
|
|
|
@ -1,239 +0,0 @@
|
||||||
'''
|
|
||||||
Define the details of inter-actor "out-of-band" (OoB) cancel
|
|
||||||
semantics, that is how cancellation works when a cancel request comes
|
|
||||||
from the different concurrency (primitive's) "layer" then where the
|
|
||||||
eventual `trio.Task` actually raises a signal.
|
|
||||||
|
|
||||||
'''
|
|
||||||
from functools import partial
|
|
||||||
# from contextlib import asynccontextmanager as acm
|
|
||||||
# import itertools
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
import trio
|
|
||||||
import tractor
|
|
||||||
from tractor import ( # typing
|
|
||||||
ActorNursery,
|
|
||||||
Portal,
|
|
||||||
Context,
|
|
||||||
# ContextCancelled,
|
|
||||||
# RemoteActorError,
|
|
||||||
)
|
|
||||||
# from tractor._testing import (
|
|
||||||
# tractor_test,
|
|
||||||
# expect_ctxc,
|
|
||||||
# )
|
|
||||||
|
|
||||||
# XXX TODO cases:
|
|
||||||
# - [ ] peer cancelled itself - so other peers should
|
|
||||||
# get errors reflecting that the peer was itself the .canceller?
|
|
||||||
|
|
||||||
# def test_self_cancel():
|
|
||||||
# '''
|
|
||||||
# 2 cases:
|
|
||||||
# - calls `Actor.cancel()` locally in some task
|
|
||||||
# - calls LocalPortal.cancel_actor()` ?
|
|
||||||
#
|
|
||||||
# things to ensure!
|
|
||||||
# -[ ] the ctxc raised in a child should ideally show the tb of the
|
|
||||||
# underlying `Cancelled` checkpoint, i.e.
|
|
||||||
# `raise scope_error from ctxc`?
|
|
||||||
#
|
|
||||||
# -[ ] a self-cancelled context, if not allowed to block on
|
|
||||||
# `ctx.result()` at some point will hang since the `ctx._scope`
|
|
||||||
# is never `.cancel_called`; cases for this include,
|
|
||||||
# - an `open_ctx()` which never starteds before being OoB actor
|
|
||||||
# cancelled.
|
|
||||||
# |_ parent task will be blocked in `.open_context()` for the
|
|
||||||
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
|
|
||||||
# will never have been signalled..
|
|
||||||
|
|
||||||
# '''
|
|
||||||
# ...
|
|
||||||
|
|
||||||
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
|
|
||||||
# but with the `Lock.acquire()` from a `@context` to ensure the
|
|
||||||
# implicit ignore-case-non-unmasking.
|
|
||||||
#
|
|
||||||
# @tractor.context
|
|
||||||
# async def acquire_actor_global_lock(
|
|
||||||
# ctx: tractor.Context,
|
|
||||||
# ignore_special_cases: bool,
|
|
||||||
# ):
|
|
||||||
|
|
||||||
# async with maybe_unmask_excs(
|
|
||||||
# ignore_special_cases=ignore_special_cases,
|
|
||||||
# ):
|
|
||||||
# await ctx.started('locked')
|
|
||||||
|
|
||||||
# # block til cancelled
|
|
||||||
# await trio.sleep_forever()
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def sleep_forever(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
# ignore_special_cases: bool,
|
|
||||||
do_started: bool,
|
|
||||||
):
|
|
||||||
|
|
||||||
# async with maybe_unmask_excs(
|
|
||||||
# ignore_special_cases=ignore_special_cases,
|
|
||||||
# ):
|
|
||||||
# await ctx.started('locked')
|
|
||||||
if do_started:
|
|
||||||
await ctx.started()
|
|
||||||
|
|
||||||
# block til cancelled
|
|
||||||
print('sleepin on child-side..')
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'cancel_ctx',
|
|
||||||
[True, False],
|
|
||||||
)
|
|
||||||
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
|
|
||||||
debug_mode: bool,
|
|
||||||
loglevel: str,
|
|
||||||
cancel_ctx: bool,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
The most "basic" out-of-band-task self-cancellation case where
|
|
||||||
`Portal.open_context()` is entered in a bg task and the
|
|
||||||
parent-task (of the containing nursery) calls `Context.cancel()`
|
|
||||||
without the child knowing; the `Context._scope` should be
|
|
||||||
`.cancel_called` when the IPC ctx's child-side relays
|
|
||||||
a `ContextCancelled` with a `.canceller` set to the parent
|
|
||||||
actor('s task).
|
|
||||||
|
|
||||||
'''
|
|
||||||
async def main():
|
|
||||||
with trio.fail_after(
|
|
||||||
2 if not debug_mode else 999,
|
|
||||||
):
|
|
||||||
an: ActorNursery
|
|
||||||
async with (
|
|
||||||
tractor.open_nursery(
|
|
||||||
debug_mode=debug_mode,
|
|
||||||
loglevel='devx',
|
|
||||||
enable_stack_on_sig=True,
|
|
||||||
) as an,
|
|
||||||
trio.open_nursery() as tn,
|
|
||||||
):
|
|
||||||
ptl: Portal = await an.start_actor(
|
|
||||||
'sub',
|
|
||||||
enable_modules=[__name__],
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _open_ctx_async(
|
|
||||||
do_started: bool = True,
|
|
||||||
task_status=trio.TASK_STATUS_IGNORED,
|
|
||||||
):
|
|
||||||
# do we expect to never enter the
|
|
||||||
# `.open_context()` below.
|
|
||||||
if not do_started:
|
|
||||||
task_status.started()
|
|
||||||
|
|
||||||
async with ptl.open_context(
|
|
||||||
sleep_forever,
|
|
||||||
do_started=do_started,
|
|
||||||
) as (ctx, first):
|
|
||||||
task_status.started(ctx)
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
# XXX, this is the key OoB part!
|
|
||||||
#
|
|
||||||
# - start the `.open_context()` in a bg task which
|
|
||||||
# blocks inside the embedded scope-body,
|
|
||||||
#
|
|
||||||
# - when we call `Context.cancel()` it **is
|
|
||||||
# not** from the same task which eventually runs
|
|
||||||
# `.__aexit__()`,
|
|
||||||
#
|
|
||||||
# - since the bg "opener" task will be in
|
|
||||||
# a `trio.sleep_forever()`, it must be interrupted
|
|
||||||
# by the `ContextCancelled` delivered from the
|
|
||||||
# child-side; `Context._scope: CancelScope` MUST
|
|
||||||
# be `.cancel_called`!
|
|
||||||
#
|
|
||||||
print('ASYNC opening IPC context in subtask..')
|
|
||||||
maybe_ctx: Context|None = await tn.start(partial(
|
|
||||||
_open_ctx_async,
|
|
||||||
))
|
|
||||||
|
|
||||||
if (
|
|
||||||
maybe_ctx
|
|
||||||
and
|
|
||||||
cancel_ctx
|
|
||||||
):
|
|
||||||
print('cancelling first IPC ctx!')
|
|
||||||
await maybe_ctx.cancel()
|
|
||||||
|
|
||||||
# XXX, note that despite `maybe_context.cancel()`
|
|
||||||
# being called above, it's the parent (bg) task
|
|
||||||
# which was originally never interrupted in
|
|
||||||
# the `ctx._scope` body due to missing case logic in
|
|
||||||
# `ctx._maybe_cancel_and_set_remote_error()`.
|
|
||||||
#
|
|
||||||
# It didn't matter that the subactor process was
|
|
||||||
# already terminated and reaped, nothing was
|
|
||||||
# cancelling the ctx-parent task's scope!
|
|
||||||
#
|
|
||||||
print('cancelling subactor!')
|
|
||||||
await ptl.cancel_actor()
|
|
||||||
|
|
||||||
if maybe_ctx:
|
|
||||||
try:
|
|
||||||
await maybe_ctx.wait_for_result()
|
|
||||||
except tractor.ContextCancelled as ctxc:
|
|
||||||
assert not cancel_ctx
|
|
||||||
assert (
|
|
||||||
ctxc.canceller
|
|
||||||
==
|
|
||||||
tractor.current_actor().aid.uid
|
|
||||||
)
|
|
||||||
# don't re-raise since it'll trigger
|
|
||||||
# an EG from the above tn.
|
|
||||||
|
|
||||||
if cancel_ctx:
|
|
||||||
# graceful self-cancel
|
|
||||||
trio.run(main)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# ctx parent task should see OoB ctxc due to
|
|
||||||
# `ptl.cancel_actor()`.
|
|
||||||
with pytest.raises(tractor.ContextCancelled) as excinfo:
|
|
||||||
trio.run(main)
|
|
||||||
|
|
||||||
'root' in excinfo.value.canceller[0]
|
|
||||||
|
|
||||||
|
|
||||||
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
|
|
||||||
# debug_mode: bool,
|
|
||||||
# loglevel: str,
|
|
||||||
# ):
|
|
||||||
# '''
|
|
||||||
# Demos OoB cancellation from the perspective of a ctx opened with
|
|
||||||
# a child subactor where the parent cancels the child at the "actor
|
|
||||||
# layer" using `Portal.cancel_actor()` and thus the
|
|
||||||
# `ContextCancelled.canceller` received by the ctx's parent-side
|
|
||||||
# task will appear to be a "self cancellation" even though that
|
|
||||||
# specific task itself was not cancelled and thus
|
|
||||||
# `Context.cancel_called ==False`.
|
|
||||||
# '''
|
|
||||||
# TODO, do we have an existing implied ctx
|
|
||||||
# cancel test like this?
|
|
||||||
# with trio.move_on_after(0.5):# as cs:
|
|
||||||
# await _open_ctx_async(
|
|
||||||
# do_started=False,
|
|
||||||
# )
|
|
||||||
|
|
||||||
|
|
||||||
# in-line ctx scope should definitely raise
|
|
||||||
# a ctxc with `.canceller = 'root'`
|
|
||||||
# async with ptl.open_context(
|
|
||||||
# sleep_forever,
|
|
||||||
# do_started=True,
|
|
||||||
# ) as pair:
|
|
||||||
|
|
|
@ -446,12 +446,12 @@ class ActorNursery:
|
||||||
@acm
|
@acm
|
||||||
async def _open_and_supervise_one_cancels_all_nursery(
|
async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
hide_tb: bool = True,
|
tb_hide: bool = False,
|
||||||
|
|
||||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
|
|
||||||
# normally don't need to show user by default
|
# normally don't need to show user by default
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = tb_hide
|
||||||
|
|
||||||
outer_err: BaseException|None = None
|
outer_err: BaseException|None = None
|
||||||
inner_err: BaseException|None = None
|
inner_err: BaseException|None = None
|
||||||
|
|
|
@ -613,9 +613,10 @@ async def drain_to_final_msg(
|
||||||
# msg: dict = await ctx._rx_chan.receive()
|
# msg: dict = await ctx._rx_chan.receive()
|
||||||
# if res_cs.cancelled_caught:
|
# if res_cs.cancelled_caught:
|
||||||
#
|
#
|
||||||
# -[x] make sure pause points work here for REPLing
|
# -[ ] make sure pause points work here for REPLing
|
||||||
# the runtime itself; i.e. ensure there's no hangs!
|
# the runtime itself; i.e. ensure there's no hangs!
|
||||||
# |_see masked code below in .cancel_called path
|
# |_from tractor.devx.debug import pause
|
||||||
|
# await pause()
|
||||||
|
|
||||||
# NOTE: we get here if the far end was
|
# NOTE: we get here if the far end was
|
||||||
# `ContextCancelled` in 2 cases:
|
# `ContextCancelled` in 2 cases:
|
||||||
|
@ -651,10 +652,6 @@ async def drain_to_final_msg(
|
||||||
f'IPC ctx cancelled externally during result drain ?\n'
|
f'IPC ctx cancelled externally during result drain ?\n'
|
||||||
f'{ctx}'
|
f'{ctx}'
|
||||||
)
|
)
|
||||||
# XXX, for tracing `Cancelled`..
|
|
||||||
# from tractor.devx.debug import pause
|
|
||||||
# await pause(shield=True)
|
|
||||||
|
|
||||||
# CASE 2: mask the local cancelled-error(s)
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
|
|
Loading…
Reference in New Issue