Execute a workflow with pypushflow#
This page presents benchmarks of the execution of a Ewoks workflow with and without pypushplow (ppf) engine.
Initial setup#
Install ewoks[ppf] and numpy
[1]:
!pip install ewoks[ppf]
!pip install numpy
Requirement already satisfied: ewoks[ppf] in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (5.0.0)
Requirement already satisfied: ewokscore<5,>=4.0.1 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewoks[ppf]) (4.0.2)
Requirement already satisfied: ewoksutils<2,>=1.6.0 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewoks[ppf]) (1.9.2)
Requirement already satisfied: tabulate in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewoks[ppf]) (0.10.0)
Requirement already satisfied: ewoksppf<3,>=2 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewoks[ppf]) (2.0.2)
Requirement already satisfied: numpy>=1.15 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewokscore<5,>=4.0.1->ewoks[ppf]) (2.4.4)
Requirement already satisfied: networkx>=2 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewokscore<5,>=4.0.1->ewoks[ppf]) (3.6.1)
Requirement already satisfied: silx>=1 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewokscore<5,>=4.0.1->ewoks[ppf]) (3.0.0)
Requirement already satisfied: pyyaml>=5.1 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewokscore<5,>=4.0.1->ewoks[ppf]) (6.0.3)
Requirement already satisfied: h5py>=2.8 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewokscore<5,>=4.0.1->ewoks[ppf]) (3.16.0)
Requirement already satisfied: packaging in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewokscore<5,>=4.0.1->ewoks[ppf]) (26.2)
Requirement already satisfied: pydantic>=2 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewokscore<5,>=4.0.1->ewoks[ppf]) (2.13.3)
Requirement already satisfied: pypushflow>=1.1.0 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from ewoksppf<3,>=2->ewoks[ppf]) (1.3.0)
Requirement already satisfied: annotated-types>=0.6.0 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from pydantic>=2->ewokscore<5,>=4.0.1->ewoks[ppf]) (0.7.0)
Requirement already satisfied: pydantic-core==2.46.3 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from pydantic>=2->ewokscore<5,>=4.0.1->ewoks[ppf]) (2.46.3)
Requirement already satisfied: typing-extensions>=4.14.1 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from pydantic>=2->ewokscore<5,>=4.0.1->ewoks[ppf]) (4.15.0)
Requirement already satisfied: typing-inspection>=0.4.2 in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from pydantic>=2->ewokscore<5,>=4.0.1->ewoks[ppf]) (0.4.2)
Requirement already satisfied: fabio in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from silx>=1->ewokscore<5,>=4.0.1->ewoks[ppf]) (2025.10.0)
Requirement already satisfied: hdf5plugin in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from fabio->silx>=1->ewokscore<5,>=4.0.1->ewoks[ppf]) (6.0.0)
Requirement already satisfied: lxml in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from fabio->silx>=1->ewokscore<5,>=4.0.1->ewoks[ppf]) (6.1.0)
Requirement already satisfied: pillow in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (from fabio->silx>=1->ewokscore<5,>=4.0.1->ewoks[ppf]) (12.2.0)
Requirement already satisfied: numpy in /home/docs/checkouts/readthedocs.org/user_builds/ewoks/envs/latest/lib/python3.12/site-packages (2.4.4)
Task and workflow definition#
For the demonstration, let’s create a workflow with two branchs doing each a simple matrix operation:
Branch 1 will generate a matrix and transpose it
Branch 2 will generate a matrix and flip it
Both branches will join in a single node that will multiply the resulting matrices.
Below, we define the Ewoks tasks needed for the workflow
[2]:
import numpy
from ewokscore import Task
class MatrixGeneration(
Task, input_names=["rows", "cols", "fill"], output_names=["matrix"]
):
def run(self):
rows = self.inputs.rows
cols = self.inputs.cols
val = self.inputs.fill
self.outputs.matrix = numpy.full((rows, cols), val)
class MatrixTranspose(Task, input_names=["M"], output_names=["Mt"]):
def run(self):
M = self.inputs.M
self.outputs.Mt = M.transpose()
class MatrixVerticalFlip(Task, input_names=["M"], output_names=["Mf"]):
def run(self):
M = self.inputs.M
self.outputs.Mf = numpy.flip(M, 0)
class MatrixMultiplication(Task, input_names=["A", "B"], output_names=["C"]):
"""C = A * B"""
def run(self):
A = self.inputs.A
B = self.inputs.B
self.outputs.C = A @ B
Now, we create a workflow out of this task:
Generation matrix A -> Transpose
\
Matrix multiplication
/
Generation matrix B ------> Flip
[3]:
nodes = [
{
"id": "matrixGenerationA",
"task_identifier": "__main__.MatrixGeneration",
"task_type": "class",
},
{
"id": "matrixTransposeA",
"task_identifier": "__main__.MatrixTranspose",
"task_type": "class",
},
{
"id": "matrixGenerationB",
"task_identifier": "__main__.MatrixGeneration",
"task_type": "class",
},
{
"id": "matrixVerticalFlipB",
"task_identifier": "__main__.MatrixVerticalFlip",
"task_type": "class",
},
{
"id": "matrixMultiplication",
"task_identifier": "__main__.MatrixMultiplication",
"task_type": "class",
},
]
links = [
{
"source": "matrixGenerationA",
"target": "matrixTransposeA",
"data_mapping": [{"source_output": "matrix", "target_input": "M"}],
},
{
"source": "matrixTransposeA",
"target": "matrixMultiplication",
"data_mapping": [{"source_output": "Mt", "target_input": "A"}],
},
{
"source": "matrixGenerationB",
"target": "matrixVerticalFlipB",
"data_mapping": [{"source_output": "matrix", "target_input": "M"}],
},
{
"source": "matrixVerticalFlipB",
"target": "matrixMultiplication",
"data_mapping": [{"source_output": "Mf", "target_input": "B"}],
},
]
workflow = {
"graph": {"id": "parallelMatrixWorkflow"},
"nodes": nodes,
"links": links,
}
Parallel execution with pypushflow against standard Ewoks executon#
We will now benchmark the performance of the workflow by running it with and without ppf and with and without NumPy’s internal multithreading.
Run with default NumPy behavior (multi-threaded BLAS/CBLAS)#
[4]:
import os
from ewoksutils.task_utils import task_inputs
from ewoks import execute_graph
inputs = [
*task_inputs(
id="matrixGenerationA", inputs={"rows": 8000, "cols": 8000, "fill": 0.2}
),
*task_inputs(
id="matrixGenerationB", inputs={"rows": 8000, "cols": 80, "fill": 0.1}
),
]
[5]:
%time
execute_graph(workflow, inputs=inputs, merge_outputs=True)
CPU times: user 2 μs, sys: 0 ns, total: 2 μs
Wall time: 6.91 μs
[5]:
{'C': array([[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
...,
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.]], shape=(8000, 80))}
Now let’s execute the workflow using the ppf engine, which enables concurrent workflow execution:
[6]:
%time
execute_graph(workflow, engine="ppf", inputs=inputs, merge_outputs=True)
CPU times: user 2 μs, sys: 0 ns, total: 2 μs
Wall time: 5.48 μs
[6]:
{'C': array([[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
...,
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.]], shape=(8000, 80))}
The time difference between the two may not be significant. The reason is that, on few systems (especially laptops), NumPy’s multithreading can obscure or even outperform multi-threaded parallelism due to efficient cache usage and the highly optimized nature of BLAS operations.
To truly benchmark the difference between different workflow execution engine, we will run the execution by disabling NumPy’s own internal parallel processing
Run with NumPy restricted to a single thread#
We can restrict NumPy to a single thread by setting some environment variables:
[7]:
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["BLIS_NUM_THREADS"] = "1"
Let’s now re-execute the workflow without and with ppf:
[8]:
%time
execute_graph(workflow, inputs=inputs, merge_outputs=True)
CPU times: user 1 μs, sys: 1 μs, total: 2 μs
Wall time: 6.44 μs
[8]:
{'C': array([[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
...,
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.]], shape=(8000, 80))}
[9]:
%time
execute_graph(workflow, engine="ppf", inputs=inputs, merge_outputs=True)
CPU times: user 1 μs, sys: 1 μs, total: 2 μs
Wall time: 5.25 μs
[9]:
{'C': array([[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
...,
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.],
[160., 160., 160., ..., 160., 160., 160.]], shape=(8000, 80))}
Performance Notes#
This workflow is structured to allow for parallel execution of the different branches. Specifically, the matrix generation and matrix transformation (transpose and flip) can run concurrently before converging at the final matrix multiplication.
However, the actual performance gain from using ppf depends heavily on your system:
On laptops, the default single-core NumPy version may perform better than ppf, since the shared memory cache and uniform compute-intensive operations benefit from long uninterrupted CPU execution.
On servers or multi-core machines, where processor affinity and independent caches are more favorable, the ppf engine typically performs better.
In summary, ppf provides true workflow level concurrent/parallelism, which is advantageous for heterogeneous workflows or IO-bound tasks