Skip to content

Commit 3cb281e

Browse files
Nexus cancellation sample (#281)
Add sample for cancellation
1 parent 2fd1407 commit 3cb281e

File tree

10 files changed

+321
-0
lines changed

10 files changed

+321
-0
lines changed

nexus_cancel/README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Nexus Cancellation
2+
3+
This sample shows how a caller workflow can fan out multiple Nexus operations concurrently, take the first result, and cancel the rest using `WAIT_REQUESTED` cancellation semantics.
4+
5+
With `WAIT_REQUESTED`, the caller proceeds once the handler has received the cancel request — it does not wait for the handler to finish processing the cancellation.
6+
7+
Start a Temporal server. (See the main samples repo [README](../README.md)).
8+
9+
Run the following:
10+
11+
```
12+
temporal operator namespace create --namespace nexus-cancel-handler-namespace
13+
temporal operator namespace create --namespace nexus-cancel-caller-namespace
14+
15+
temporal operator nexus endpoint create \
16+
--name nexus-cancel-endpoint \
17+
--target-namespace nexus-cancel-handler-namespace \
18+
--target-task-queue nexus-cancel-handler-task-queue
19+
```
20+
21+
Next, in separate terminal windows:
22+
23+
## Nexus Handler Worker
24+
25+
```bash
26+
uv run nexus_cancel/handler/worker.py
27+
```
28+
29+
## Nexus Caller App
30+
31+
```bash
32+
uv run nexus_cancel/caller/app.py
33+
```
34+
35+
## Expected Output
36+
37+
On the caller side, you should see a greeting in whichever language completed first:
38+
```
39+
Hello Nexus 👋
40+
```
41+
42+
On the handler side, you should see cancellation log messages for the remaining operations:
43+
```
44+
HelloHandlerWorkflow was cancelled successfully.
45+
HelloHandlerWorkflow was cancelled successfully.
46+
HelloHandlerWorkflow was cancelled successfully.
47+
HelloHandlerWorkflow was cancelled successfully.
48+
```
49+
50+
The caller workflow returns before all handler workflows have completed their cancellation cleanup. This demonstrates `WAIT_REQUESTED` semantics: the caller didn't wait for the handler workflows to finish, but still guaranteed that all handlers received the cancellation request.

nexus_cancel/__init__.py

Whitespace-only changes.

nexus_cancel/caller/__init__.py

Whitespace-only changes.

nexus_cancel/caller/app.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import asyncio
2+
import uuid
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
from temporalio.envconfig import ClientConfig
7+
from temporalio.worker import Worker
8+
9+
from nexus_cancel.caller.workflows import HelloCallerWorkflow
10+
11+
NAMESPACE = "nexus-cancel-caller-namespace"
12+
TASK_QUEUE = "nexus-cancel-caller-task-queue"
13+
14+
15+
async def execute_caller_workflow(
16+
client: Optional[Client] = None,
17+
) -> str:
18+
if client is None:
19+
config = ClientConfig.load_client_connect_config()
20+
config.setdefault("target_host", "localhost:7233")
21+
config.setdefault("namespace", NAMESPACE)
22+
client = await Client.connect(**config)
23+
24+
async with Worker(
25+
client,
26+
task_queue=TASK_QUEUE,
27+
workflows=[HelloCallerWorkflow],
28+
):
29+
return await client.execute_workflow(
30+
HelloCallerWorkflow.run,
31+
"Nexus",
32+
id=f"hello-caller-{uuid.uuid4()}",
33+
task_queue=TASK_QUEUE,
34+
)
35+
36+
37+
if __name__ == "__main__":
38+
loop = asyncio.new_event_loop()
39+
try:
40+
result = loop.run_until_complete(execute_caller_workflow())
41+
print(result)
42+
except KeyboardInterrupt:
43+
loop.run_until_complete(loop.shutdown_asyncgens())

nexus_cancel/caller/workflows.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
Caller workflow that demonstrates Nexus operation cancellation.
3+
4+
Fans out 5 concurrent Nexus hello operations (one per language), takes the first
5+
result, and cancels the rest using WAIT_REQUESTED cancellation semantics.
6+
"""
7+
8+
import asyncio
9+
from datetime import timedelta
10+
11+
from temporalio import workflow
12+
from temporalio.exceptions import CancelledError, NexusOperationError
13+
14+
with workflow.unsafe.imports_passed_through():
15+
from nexus_cancel.service import HelloInput, Language, NexusService
16+
17+
NEXUS_ENDPOINT = "nexus-cancel-endpoint"
18+
19+
20+
@workflow.defn
21+
class HelloCallerWorkflow:
22+
def __init__(self) -> None:
23+
self.nexus_client = workflow.create_nexus_client(
24+
service=NexusService,
25+
endpoint=NEXUS_ENDPOINT,
26+
)
27+
28+
@workflow.run
29+
async def run(self, message: str) -> str:
30+
# Fan out 5 concurrent Nexus calls, one per language.
31+
# Each task starts and awaits its own operation so all race concurrently.
32+
async def run_operation(language: Language):
33+
handle = await self.nexus_client.start_operation(
34+
NexusService.hello,
35+
HelloInput(name=message, language=language),
36+
schedule_to_close_timeout=timedelta(seconds=10),
37+
cancellation_type=workflow.NexusOperationCancellationType.WAIT_REQUESTED,
38+
)
39+
return await handle
40+
41+
tasks = [asyncio.create_task(run_operation(lang)) for lang in Language]
42+
43+
# Wait for the first operation to complete
44+
workflow.logger.info(
45+
f"Started {len(tasks)} operations, waiting for first to complete..."
46+
)
47+
done, pending = await workflow.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
48+
49+
# Get the result from the first completed operation
50+
result = await done.pop()
51+
workflow.logger.info(f"First operation completed with: {result.message}")
52+
53+
# Cancel all remaining operations
54+
workflow.logger.info(f"Cancelling {len(pending)} remaining operations...")
55+
for task in pending:
56+
task.cancel()
57+
58+
# Wait for all cancellations to be acknowledged.
59+
# If the workflow completes before cancellation requests are delivered,
60+
# the server drops them. Waiting ensures all handlers receive the
61+
# cancellation.
62+
for task in pending:
63+
try:
64+
await task
65+
except (NexusOperationError, CancelledError):
66+
# Expected: the operation was cancelled
67+
workflow.logger.info("Operation was cancelled")
68+
69+
return result.message

nexus_cancel/handler/__init__.py

Whitespace-only changes.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""
2+
Nexus service handler for the cancellation sample.
3+
4+
The hello operation is backed by a workflow, using the Nexus request ID as the
5+
workflow ID for idempotency across retries.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import nexusrpc
11+
from temporalio import nexus
12+
13+
from nexus_cancel.handler.workflows import HelloHandlerWorkflow
14+
from nexus_cancel.service import HelloInput, HelloOutput, NexusService
15+
16+
17+
@nexusrpc.handler.service_handler(service=NexusService)
18+
class NexusServiceHandler:
19+
@nexus.workflow_run_operation
20+
async def hello(
21+
self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput
22+
) -> nexus.WorkflowHandle[HelloOutput]:
23+
return await ctx.start_workflow(
24+
HelloHandlerWorkflow.run,
25+
input,
26+
id=ctx.request_id,
27+
)

nexus_cancel/handler/worker.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""
2+
Worker for the handler namespace that processes Nexus operations and workflows.
3+
"""
4+
5+
import asyncio
6+
import logging
7+
from typing import Optional
8+
9+
from temporalio.client import Client
10+
from temporalio.envconfig import ClientConfig
11+
from temporalio.worker import Worker
12+
13+
from nexus_cancel.handler.service_handler import NexusServiceHandler
14+
from nexus_cancel.handler.workflows import HelloHandlerWorkflow
15+
16+
interrupt_event = asyncio.Event()
17+
18+
NAMESPACE = "nexus-cancel-handler-namespace"
19+
TASK_QUEUE = "nexus-cancel-handler-task-queue"
20+
21+
22+
async def main(client: Optional[Client] = None):
23+
logging.basicConfig(level=logging.INFO)
24+
25+
if not client:
26+
config = ClientConfig.load_client_connect_config()
27+
config.setdefault("target_host", "localhost:7233")
28+
config.setdefault("namespace", NAMESPACE)
29+
client = await Client.connect(**config)
30+
31+
async with Worker(
32+
client,
33+
task_queue=TASK_QUEUE,
34+
workflows=[HelloHandlerWorkflow],
35+
nexus_service_handlers=[NexusServiceHandler()],
36+
):
37+
logging.info("Worker started, ctrl+c to exit")
38+
await interrupt_event.wait()
39+
logging.info("Shutting down")
40+
41+
42+
if __name__ == "__main__":
43+
loop = asyncio.new_event_loop()
44+
try:
45+
loop.run_until_complete(main())
46+
except KeyboardInterrupt:
47+
interrupt_event.set()
48+
loop.run_until_complete(loop.shutdown_asyncgens())

nexus_cancel/handler/workflows.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""
2+
Handler workflow started by the hello Nexus operation.
3+
4+
Demonstrates how to handle cancellation from the caller workflow using a
5+
detached cancellation scope (asyncio.shield) for cleanup work.
6+
"""
7+
8+
import asyncio
9+
10+
from temporalio import workflow
11+
12+
with workflow.unsafe.imports_passed_through():
13+
from nexus_cancel.service import HelloInput, HelloOutput, Language
14+
15+
GREETINGS = {
16+
Language.EN: "Hello {name} 👋",
17+
Language.FR: "Bonjour {name} 👋",
18+
Language.DE: "Hallo {name} 👋",
19+
Language.ES: "¡Hola! {name} 👋",
20+
Language.TR: "Merhaba {name} 👋",
21+
}
22+
23+
24+
@workflow.defn
25+
class HelloHandlerWorkflow:
26+
@workflow.run
27+
async def run(self, input: HelloInput) -> HelloOutput:
28+
try:
29+
# Sleep for a random duration to simulate work (0-5 seconds)
30+
random_seconds = workflow.random().randint(0, 5)
31+
workflow.logger.info(f"Working for {random_seconds} seconds...")
32+
await asyncio.sleep(random_seconds)
33+
34+
# Return a greeting based on the language
35+
greeting = GREETINGS[input.language].format(name=input.name)
36+
return HelloOutput(message=greeting)
37+
38+
except asyncio.CancelledError:
39+
# Perform cleanup in a detached cancellation scope.
40+
# asyncio.shield prevents the cleanup work from being cancelled.
41+
workflow.logger.info("Received cancellation request, performing cleanup...")
42+
try:
43+
cleanup_seconds = workflow.random().randint(0, 5)
44+
await asyncio.shield(asyncio.sleep(cleanup_seconds))
45+
except asyncio.CancelledError:
46+
pass
47+
workflow.logger.info("HelloHandlerWorkflow was cancelled successfully.")
48+
# Re-raise the cancellation error
49+
raise

nexus_cancel/service.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""
2+
Nexus service definition for the cancellation sample.
3+
4+
Defines a NexusService with a single `hello` operation that takes a name and
5+
language, and returns a greeting message.
6+
"""
7+
8+
from dataclasses import dataclass
9+
from enum import IntEnum
10+
11+
import nexusrpc
12+
13+
14+
class Language(IntEnum):
15+
EN = 0
16+
FR = 1
17+
DE = 2
18+
ES = 3
19+
TR = 4
20+
21+
22+
@dataclass
23+
class HelloInput:
24+
name: str
25+
language: Language
26+
27+
28+
@dataclass
29+
class HelloOutput:
30+
message: str
31+
32+
33+
@nexusrpc.service
34+
class NexusService:
35+
hello: nexusrpc.Operation[HelloInput, HelloOutput]

0 commit comments

Comments
 (0)