Skip to content

Integrate Ray execution engine and enable passing of options [ENG-231]#77

Merged
eywalker merged 6 commits intowalkerlab:devfrom
brian-arnold:claude/ray-pipeline-integration
Mar 11, 2026
Merged

Integrate Ray execution engine and enable passing of options [ENG-231]#77
eywalker merged 6 commits intowalkerlab:devfrom
brian-arnold:claude/ray-pipeline-integration

Conversation

@brian-arnold
Copy link
Collaborator

This PR restores the API for Ray integration that broke with the recent refactor. The major changes that were made:

  • function_pod now returns a wrapper closure instead of mutating the original func, keeping func.dict clean for Ray serialization
  • Pipeline.run() defaults to async channels mode when execution_engine is provided, removing the need for an explicit PipelineConfig
  • Re-adds execution_engine / execution_engine_opts kwargs to Pipeline.run() and PipelineConfig. _apply_execution_engine() distributes the engine to every PersistentFunctionNode, merging per-node opt overrides.
  • added tests for function_pod wrapper pattern and pipeline execution engine API

The git history is not the cleanest, as the initial changes that were made to incorporate the Ray execution engine had to deal with the fact that function pods mutated the functions they decorated, such that serializing them and sending them over the wire to Ray required orcapod to be installed on the Ray cluster, which we did not want. Consequently, some code was made, but subsequently deleted, to purify the function of anything orcapod added.

Brian Arnold added 4 commits March 7, 2026 00:41
…isation

  Re-adds execution_engine / execution_engine_opts kwargs to Pipeline.run()
  and PipelineConfig. _apply_execution_engine() distributes the engine to
  every PersistentFunctionNode, merging per-node opt overrides. RayExecutor
  gains _clean_fn() to strip the FunctionPod __dict__ attachment before
  pickling, and _make_run_fn() to serialise the worker function by value so
  orcapod is not required on Ray workers.
  - function_pod now returns a wrapper closure instead of mutating the
    original func, keeping func.__dict__ clean for Ray serialisation
  - RayExecutor drops _clean_fn/_make_run_fn; execute() calls
    ray.remote(fn).remote(**kwargs) directly on the pristine func
  - Pipeline.run() defaults to async channels mode when execution_engine
    is provided, removing the need for an explicit PipelineConfig
  - ray_sleeper.py simplified accordingly with mode-override docs
@brian-arnold brian-arnold changed the title Claude/ray pipeline integration Claude/ray pipeline integration [ENG 231] Mar 10, 2026
@brian-arnold brian-arnold changed the title Claude/ray pipeline integration [ENG 231] Claude/ray pipeline integration [ENG-231] Mar 10, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Restores the public API for Ray (and other) execution-engine integration after a refactor, focusing on safe function serialization and simpler Pipeline.run() ergonomics.

Changes:

  • Updates function_pod to return a @wraps wrapper closure instead of mutating the original function object.
  • Extends Pipeline.run() / PipelineConfig to accept an execution_engine + execution_engine_opts, and applies the engine to all PersistentFunctionNodes (with per-node option overrides).
  • Expands Ray executor option handling and adds tests covering the wrapper pattern and execution-engine behavior (including async defaulting).

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/test_pipeline/test_pipeline.py Adds tests for engine propagation, async default behavior, and per-node option overrides.
tests/test_core/function_pod/test_function_pod_decorator.py Adds tests ensuring decorated functions preserve a clean original callable via __wrapped__.
src/orcapod/types.py Adds execution_engine and execution_engine_opts to PipelineConfig.
src/orcapod/pipeline/graph.py Extends Pipeline.run() signature and adds _apply_execution_engine() helper.
src/orcapod/core/nodes/function_node.py Introduces per-node execution_engine_opts storage.
src/orcapod/core/function_pod.py Switches decorator behavior to return a wrapper and attach .pod to it.
src/orcapod/core/executors/ray.py Refactors Ray execution to pass arbitrary Ray remote options and execute underlying functions remotely.
src/orcapod/core/executors/base.py Adds a default with_options() hook for executor option specialization.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +325 to +331
# Explicit kwargs take precedence over values baked into config.
effective_engine = execution_engine or config.execution_engine
effective_opts = (
execution_engine_opts
if execution_engine_opts is not None
else config.execution_engine_opts
)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

effective_engine = execution_engine or config.execution_engine relies on truthiness, so a valid executor instance that is falsy (e.g., implements __bool__/__len__ and returns False/0) will be ignored and the config engine used instead. Use an explicit is not None check so precedence is based on presence, not truthiness.

Copilot uses AI. Check for mistakes.
Comment on lines +94 to +100
fn = packet_function._function # type: ignore[attr-defined]
kwargs = packet.as_dict()

ref = _run.remote(packet_function, packet)
return ray.get(ref)
remote_fn = ray.remote(**self._build_remote_opts())(fn)
ref = remote_fn.remote(**kwargs)
raw_result = ray.get(ref)
return packet_function._build_output_packet(raw_result) # type: ignore[attr-defined]
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RayExecutor.execute() bypasses PythonPacketFunction.direct_call() and calls the underlying function directly. This means PythonPacketFunction.set_active(False) is ignored and inactive functions will still run remotely. Consider short-circuiting when the packet function is inactive (e.g., via the public is_active() method when available) so Ray execution preserves the same semantics as in-process execution.

Copilot uses AI. Check for mistakes.
Comment on lines +111 to +117
fn = packet_function._function # type: ignore[attr-defined]
kwargs = packet.as_dict()

remote_fn = ray.remote(**self._build_remote_opts())(fn)
ref = remote_fn.remote(**kwargs)
raw_result = await asyncio.wrap_future(ref.future())
return packet_function._build_output_packet(raw_result) # type: ignore[attr-defined]
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RayExecutor.execute/async_execute depend on private attributes (_function, _build_output_packet) with type: ignore. Since supported_function_type_ids() only checks a string type id, it’s still possible for a different PacketFunctionProtocol implementation to claim the same id and then fail at runtime. Add a runtime guard (e.g., hasattr checks or an isinstance check against the concrete Python packet-function type) and raise a clear TypeError if unsupported.

Copilot uses AI. Check for mistakes.
Comment on lines +385 to +389
node_opts = node.execution_engine_opts or {}
merged = {**pipeline_opts, **node_opts}
node.executor = (
execution_engine.with_options(**merged) if merged else execution_engine
)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_apply_execution_engine() unconditionally calls execution_engine.with_options(**merged) when opts are present. If a caller passes an engine that doesn't implement with_options, this will fail at runtime despite the docstring saying it "must implement" it. Consider validating the interface up front (e.g., hasattr(execution_engine, "with_options")) and raising a TypeError with a clear message so failures are immediate and easier to diagnose.

Copilot uses AI. Check for mistakes.
Comment on lines 305 to 310
Args:
config: Pipeline configuration. When ``config.executor`` is
``ExecutorType.ASYNC_CHANNELS``, the pipeline runs
``ExecutorType.ASYNC_CHANNELS``, or when an
``execution_engine`` is provided, the pipeline runs
asynchronously via the orchestrator. Otherwise nodes are
executed synchronously in topological order.
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The run() docstring says the pipeline runs asynchronously whenever an execution_engine is provided, but the implementation keeps synchronous execution when the caller explicitly supplies config with executor=SYNCHRONOUS (see explicit_config / use_async). Update the docstring to reflect this precedence so the public API contract matches behavior.

Copilot uses AI. Check for mistakes.
@brian-arnold brian-arnold changed the title Claude/ray pipeline integration [ENG-231] Integrate Ray execution engine and enable passing of options [ENG-231] Mar 10, 2026
@codecov-commenter
Copy link

codecov-commenter commented Mar 10, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 47.94521% with 38 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/core/executors/ray.py 5.40% 35 Missing ⚠️
src/orcapod/core/executors/base.py 50.00% 1 Missing ⚠️
src/orcapod/protocols/core_protocols/executor.py 66.66% 1 Missing ⚠️
src/orcapod/types.py 80.00% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

"""Return the Ray remote options dict."""
return self._remote_opts

def execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it'd make more sense to inverse the logic of executor invocation, letting the packet function make use of the executor however it sees fit -- this however changes the logic surround the packet function and executor quite a bit. Since this PR is in a working state, I'll accept the PR with little to no design change around executor logic but will follow up with a redesign.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you suggest, we can revisit this down the road

# Per-node Ray (or other engine) resource overrides. When a pipeline
# is run with an execution_engine, these opts are merged on top of the
# pipeline-level execution_engine_opts for this node only.
self.execution_engine_opts: dict[str, Any] | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not defer the execution engine opts to the wrapped FunctionPod?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree giving them to FunctionPod would make sense; I thought FunctionNode was reasonable since I was thinking more about pipelines, as we give the executor to pipeline.run(). Should we also defer this?

def run(
self,
config: PipelineConfig | None = None,
execution_engine: Any = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I'm not so sure about passing execution_engine and execution_engine_opts separately. It'd be more canonical to set the options to the execution engine first and then pass that into run method. Also, let's not use type Any unless strictly required. In this case, execution_engine should be of type ExecutionEngineProtocol

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea configuring the execution engine with options, but then the executor would need to store node-specific options?

But I now have execution_engine specified as type PacketFunctionExecutorProtocol

# Per-node Ray (or other engine) resource overrides. When a pipeline
# is run with an execution_engine, these opts are merged on top of the
# pipeline-level execution_engine_opts for this node only.
self.execution_engine_opts: dict[str, Any] | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FunctionNode holding it's own set of execution_engine_opts is quite awkward and muddles the separation between FunctionNode/FunctionPod, ExecutionEngine and ExecutionEngine's configuration. Ideally, FunctionNode/FunctionPod should not know anything about the ExecutionEngine's internal configuration. Let's discuss and consider a redesign in the future iteration.,

  - Use  instead of truthiness for effective_engine resolution
  - Update Pipeline.run() docstring to reflect sync config override precedence
  - Add RayExecutor.__init__ docstring explaining all parameters
  - Return a dict copy from _build_remote_opts() to prevent mutation
  - Add @wraps annotation/docstring preservation tests
  - Add with_options() to PacketFunctionExecutorProtocol
  - Add _as_python_packet_function() guard in RayExecutor for clear TypeError
    on unsupported packet function types; removes type: ignore comments
  - Check is_active() in RayExecutor.execute/async_execute so inactive
    functions return None instead of running remotely
  - Replace Any with PacketFunctionExecutorProtocol | None on Pipeline.run(),
    _apply_execution_engine(), and PipelineConfig.execution_engine
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +154 to +158
kwargs = packet.as_dict()
remote_fn = ray.remote(**self._build_remote_opts())(pf._function)
ref = remote_fn.remote(**kwargs)
raw_result = await asyncio.wrap_future(ref.future())
return pf._build_output_packet(raw_result)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async path also reconstructs remote_fn = ray.remote(...)(pf._function) per packet, incurring the same overhead as the sync path. Consider reusing a cached RemoteFunction and only varying options via .options() to avoid repeated remote function construction.

Copilot uses AI. Check for mistakes.
Comment on lines +135 to +139
kwargs = packet.as_dict()
remote_fn = ray.remote(**self._build_remote_opts())(pf._function)
ref = remote_fn.remote(**kwargs)
raw_result = ray.get(ref)
return pf._build_output_packet(raw_result)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray.remote(**opts)(pf._function) is constructed on every packet. This adds per-packet overhead and can lead to excessive Ray task-function registrations for large streams. Cache the constructed RemoteFunction (e.g., keyed by (pf._function, frozenset(remote_opts.items()))) and reuse it, or create it once and apply per-call overrides via .options(**remote_opts).

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@eywalker eywalker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all the comments!

"""
...

def with_options(self, **opts: Any) -> "PacketFunctionExecutorProtocol":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually since we have from __future__ import annotations already, you don't have to surround the return type "PacketFunctionExecutionProtocol" -- all type annotations are treated as strings once you have the from __future__ import annotations. That being said, I recommend that you actually change the signature here to

def with_options(*self, **opts: Any) -> Self:
    ...

using from typing import Self

One key nature of this is that Self is always repaced with the class that's currently used as satisfying the Protocol. So if class X implements the protocol, with_options will return an instance of Self which is X. If on the other hand you just type it as "PacketFunctionExecutionProtocol", then protocol doesn't require that a new instance of the same class should be returned. I honestly think either is fine, but worth considering this.

@eywalker eywalker merged commit 8454681 into walkerlab:dev Mar 11, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants