Integrate Ray execution engine and enable passing of options [ENG-231]#77
Conversation
…isation Re-adds execution_engine / execution_engine_opts kwargs to Pipeline.run() and PipelineConfig. _apply_execution_engine() distributes the engine to every PersistentFunctionNode, merging per-node opt overrides. RayExecutor gains _clean_fn() to strip the FunctionPod __dict__ attachment before pickling, and _make_run_fn() to serialise the worker function by value so orcapod is not required on Ray workers.
- function_pod now returns a wrapper closure instead of mutating the
original func, keeping func.__dict__ clean for Ray serialisation
- RayExecutor drops _clean_fn/_make_run_fn; execute() calls
ray.remote(fn).remote(**kwargs) directly on the pristine func
- Pipeline.run() defaults to async channels mode when execution_engine
is provided, removing the need for an explicit PipelineConfig
- ray_sleeper.py simplified accordingly with mode-override docs
There was a problem hiding this comment.
Pull request overview
Restores the public API for Ray (and other) execution-engine integration after a refactor, focusing on safe function serialization and simpler Pipeline.run() ergonomics.
Changes:
- Updates
function_podto return a@wrapswrapper closure instead of mutating the original function object. - Extends
Pipeline.run()/PipelineConfigto accept anexecution_engine+execution_engine_opts, and applies the engine to allPersistentFunctionNodes (with per-node option overrides). - Expands Ray executor option handling and adds tests covering the wrapper pattern and execution-engine behavior (including async defaulting).
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_pipeline/test_pipeline.py | Adds tests for engine propagation, async default behavior, and per-node option overrides. |
| tests/test_core/function_pod/test_function_pod_decorator.py | Adds tests ensuring decorated functions preserve a clean original callable via __wrapped__. |
| src/orcapod/types.py | Adds execution_engine and execution_engine_opts to PipelineConfig. |
| src/orcapod/pipeline/graph.py | Extends Pipeline.run() signature and adds _apply_execution_engine() helper. |
| src/orcapod/core/nodes/function_node.py | Introduces per-node execution_engine_opts storage. |
| src/orcapod/core/function_pod.py | Switches decorator behavior to return a wrapper and attach .pod to it. |
| src/orcapod/core/executors/ray.py | Refactors Ray execution to pass arbitrary Ray remote options and execute underlying functions remotely. |
| src/orcapod/core/executors/base.py | Adds a default with_options() hook for executor option specialization. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| # Explicit kwargs take precedence over values baked into config. | ||
| effective_engine = execution_engine or config.execution_engine | ||
| effective_opts = ( | ||
| execution_engine_opts | ||
| if execution_engine_opts is not None | ||
| else config.execution_engine_opts | ||
| ) |
There was a problem hiding this comment.
effective_engine = execution_engine or config.execution_engine relies on truthiness, so a valid executor instance that is falsy (e.g., implements __bool__/__len__ and returns False/0) will be ignored and the config engine used instead. Use an explicit is not None check so precedence is based on presence, not truthiness.
src/orcapod/core/executors/ray.py
Outdated
| fn = packet_function._function # type: ignore[attr-defined] | ||
| kwargs = packet.as_dict() | ||
|
|
||
| ref = _run.remote(packet_function, packet) | ||
| return ray.get(ref) | ||
| remote_fn = ray.remote(**self._build_remote_opts())(fn) | ||
| ref = remote_fn.remote(**kwargs) | ||
| raw_result = ray.get(ref) | ||
| return packet_function._build_output_packet(raw_result) # type: ignore[attr-defined] |
There was a problem hiding this comment.
RayExecutor.execute() bypasses PythonPacketFunction.direct_call() and calls the underlying function directly. This means PythonPacketFunction.set_active(False) is ignored and inactive functions will still run remotely. Consider short-circuiting when the packet function is inactive (e.g., via the public is_active() method when available) so Ray execution preserves the same semantics as in-process execution.
src/orcapod/core/executors/ray.py
Outdated
| fn = packet_function._function # type: ignore[attr-defined] | ||
| kwargs = packet.as_dict() | ||
|
|
||
| remote_fn = ray.remote(**self._build_remote_opts())(fn) | ||
| ref = remote_fn.remote(**kwargs) | ||
| raw_result = await asyncio.wrap_future(ref.future()) | ||
| return packet_function._build_output_packet(raw_result) # type: ignore[attr-defined] |
There was a problem hiding this comment.
RayExecutor.execute/async_execute depend on private attributes (_function, _build_output_packet) with type: ignore. Since supported_function_type_ids() only checks a string type id, it’s still possible for a different PacketFunctionProtocol implementation to claim the same id and then fail at runtime. Add a runtime guard (e.g., hasattr checks or an isinstance check against the concrete Python packet-function type) and raise a clear TypeError if unsupported.
| node_opts = node.execution_engine_opts or {} | ||
| merged = {**pipeline_opts, **node_opts} | ||
| node.executor = ( | ||
| execution_engine.with_options(**merged) if merged else execution_engine | ||
| ) |
There was a problem hiding this comment.
_apply_execution_engine() unconditionally calls execution_engine.with_options(**merged) when opts are present. If a caller passes an engine that doesn't implement with_options, this will fail at runtime despite the docstring saying it "must implement" it. Consider validating the interface up front (e.g., hasattr(execution_engine, "with_options")) and raising a TypeError with a clear message so failures are immediate and easier to diagnose.
src/orcapod/pipeline/graph.py
Outdated
| Args: | ||
| config: Pipeline configuration. When ``config.executor`` is | ||
| ``ExecutorType.ASYNC_CHANNELS``, the pipeline runs | ||
| ``ExecutorType.ASYNC_CHANNELS``, or when an | ||
| ``execution_engine`` is provided, the pipeline runs | ||
| asynchronously via the orchestrator. Otherwise nodes are | ||
| executed synchronously in topological order. |
There was a problem hiding this comment.
The run() docstring says the pipeline runs asynchronously whenever an execution_engine is provided, but the implementation keeps synchronous execution when the caller explicitly supplies config with executor=SYNCHRONOUS (see explicit_config / use_async). Update the docstring to reflect this precedence so the public API contract matches behavior.
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
| """Return the Ray remote options dict.""" | ||
| return self._remote_opts | ||
|
|
||
| def execute( |
There was a problem hiding this comment.
Actually it'd make more sense to inverse the logic of executor invocation, letting the packet function make use of the executor however it sees fit -- this however changes the logic surround the packet function and executor quite a bit. Since this PR is in a working state, I'll accept the PR with little to no design change around executor logic but will follow up with a redesign.
There was a problem hiding this comment.
as you suggest, we can revisit this down the road
| # Per-node Ray (or other engine) resource overrides. When a pipeline | ||
| # is run with an execution_engine, these opts are merged on top of the | ||
| # pipeline-level execution_engine_opts for this node only. | ||
| self.execution_engine_opts: dict[str, Any] | None = None |
There was a problem hiding this comment.
Why not defer the execution engine opts to the wrapped FunctionPod?
There was a problem hiding this comment.
I agree giving them to FunctionPod would make sense; I thought FunctionNode was reasonable since I was thinking more about pipelines, as we give the executor to pipeline.run(). Should we also defer this?
src/orcapod/pipeline/graph.py
Outdated
| def run( | ||
| self, | ||
| config: PipelineConfig | None = None, | ||
| execution_engine: Any = None, |
There was a problem hiding this comment.
actually I'm not so sure about passing execution_engine and execution_engine_opts separately. It'd be more canonical to set the options to the execution engine first and then pass that into run method. Also, let's not use type Any unless strictly required. In this case, execution_engine should be of type ExecutionEngineProtocol
There was a problem hiding this comment.
I like the idea configuring the execution engine with options, but then the executor would need to store node-specific options?
But I now have execution_engine specified as type PacketFunctionExecutorProtocol
| # Per-node Ray (or other engine) resource overrides. When a pipeline | ||
| # is run with an execution_engine, these opts are merged on top of the | ||
| # pipeline-level execution_engine_opts for this node only. | ||
| self.execution_engine_opts: dict[str, Any] | None = None |
There was a problem hiding this comment.
FunctionNode holding it's own set of execution_engine_opts is quite awkward and muddles the separation between FunctionNode/FunctionPod, ExecutionEngine and ExecutionEngine's configuration. Ideally, FunctionNode/FunctionPod should not know anything about the ExecutionEngine's internal configuration. Let's discuss and consider a redesign in the future iteration.,
- Use instead of truthiness for effective_engine resolution - Update Pipeline.run() docstring to reflect sync config override precedence - Add RayExecutor.__init__ docstring explaining all parameters - Return a dict copy from _build_remote_opts() to prevent mutation - Add @wraps annotation/docstring preservation tests - Add with_options() to PacketFunctionExecutorProtocol - Add _as_python_packet_function() guard in RayExecutor for clear TypeError on unsupported packet function types; removes type: ignore comments - Check is_active() in RayExecutor.execute/async_execute so inactive functions return None instead of running remotely - Replace Any with PacketFunctionExecutorProtocol | None on Pipeline.run(), _apply_execution_engine(), and PipelineConfig.execution_engine
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| kwargs = packet.as_dict() | ||
| remote_fn = ray.remote(**self._build_remote_opts())(pf._function) | ||
| ref = remote_fn.remote(**kwargs) | ||
| raw_result = await asyncio.wrap_future(ref.future()) | ||
| return pf._build_output_packet(raw_result) |
There was a problem hiding this comment.
Async path also reconstructs remote_fn = ray.remote(...)(pf._function) per packet, incurring the same overhead as the sync path. Consider reusing a cached RemoteFunction and only varying options via .options() to avoid repeated remote function construction.
| kwargs = packet.as_dict() | ||
| remote_fn = ray.remote(**self._build_remote_opts())(pf._function) | ||
| ref = remote_fn.remote(**kwargs) | ||
| raw_result = ray.get(ref) | ||
| return pf._build_output_packet(raw_result) |
There was a problem hiding this comment.
ray.remote(**opts)(pf._function) is constructed on every packet. This adds per-packet overhead and can lead to excessive Ray task-function registrations for large streams. Cache the constructed RemoteFunction (e.g., keyed by (pf._function, frozenset(remote_opts.items()))) and reuse it, or create it once and apply per-call overrides via .options(**remote_opts).
eywalker
left a comment
There was a problem hiding this comment.
Thanks for addressing all the comments!
| """ | ||
| ... | ||
|
|
||
| def with_options(self, **opts: Any) -> "PacketFunctionExecutorProtocol": |
There was a problem hiding this comment.
actually since we have from __future__ import annotations already, you don't have to surround the return type "PacketFunctionExecutionProtocol" -- all type annotations are treated as strings once you have the from __future__ import annotations. That being said, I recommend that you actually change the signature here to
def with_options(*self, **opts: Any) -> Self:
...
using from typing import Self
One key nature of this is that Self is always repaced with the class that's currently used as satisfying the Protocol. So if class X implements the protocol, with_options will return an instance of Self which is X. If on the other hand you just type it as "PacketFunctionExecutionProtocol", then protocol doesn't require that a new instance of the same class should be returned. I honestly think either is fine, but worth considering this.
This PR restores the API for Ray integration that broke with the recent refactor. The major changes that were made:
The git history is not the cleanest, as the initial changes that were made to incorporate the Ray execution engine had to deal with the fact that function pods mutated the functions they decorated, such that serializing them and sending them over the wire to Ray required orcapod to be installed on the Ray cluster, which we did not want. Consequently, some code was made, but subsequently deleted, to purify the function of anything orcapod added.