{ "cells": [ { "cell_type": "markdown", "id": "a5e74c39", "metadata": {}, "source": [ "# Execute a workflow with pypushflow\n", "\n", "This page presents benchmarks of the execution of a Ewoks workflow with and without pypushplow (`ppf`) engine.\n", "\n", "## Initial setup\n", "\n", "Install *ewoks[ppf]* and *numpy*" ] }, { "cell_type": "code", "execution_count": null, "id": "61e749b1", "metadata": {}, "outputs": [], "source": [ "!pip install ewoks[ppf]\n", "!pip install numpy" ] }, { "cell_type": "markdown", "id": "9f5cd5c7", "metadata": { "vscode": { "languageId": "plaintext" } }, "source": [ "## Task and workflow definition\n", "\n", "For the demonstration, let's create a workflow with two branchs doing each a simple matrix operation:\n", "- Branch 1 will generate a matrix and transpose it\n", "- Branch 2 will generate a matrix and flip it\n", "\n", "Both branches will join in a single node that will multiply the resulting matrices.\n", "\n", "Below, we define the Ewoks tasks needed for the workflow" ] }, { "cell_type": "code", "execution_count": null, "id": "e5e81b87", "metadata": {}, "outputs": [], "source": [ "import numpy\n", "from ewokscore import Task\n", "\n", "\n", "class MatrixGeneration(\n", " Task, input_names=[\"rows\", \"cols\", \"fill\"], output_names=[\"matrix\"]\n", "):\n", " def run(self):\n", " rows = self.inputs.rows\n", " cols = self.inputs.cols\n", " val = self.inputs.fill\n", " self.outputs.matrix = numpy.full((rows, cols), val)\n", "\n", "\n", "class MatrixTranspose(Task, input_names=[\"M\"], output_names=[\"Mt\"]):\n", " def run(self):\n", " M = self.inputs.M\n", " self.outputs.Mt = M.transpose()\n", "\n", "\n", "class MatrixVerticalFlip(Task, input_names=[\"M\"], output_names=[\"Mf\"]):\n", " def run(self):\n", " M = self.inputs.M\n", "\n", " self.outputs.Mf = numpy.flip(M, 0)\n", "\n", "\n", "class MatrixMultiplication(Task, input_names=[\"A\", \"B\"], output_names=[\"C\"]):\n", " \"\"\"C = A * B\"\"\"\n", "\n", " def run(self):\n", " A = self.inputs.A\n", " B = self.inputs.B\n", " self.outputs.C = A @ B" ] }, { "cell_type": "markdown", "id": "a0c2a70f", "metadata": {}, "source": [ "Now, we create a workflow out of this task:\n", "\n", "```\n", "Generation matrix A -> Transpose \n", " \\\n", " Matrix multiplication\n", " /\n", "Generation matrix B ------> Flip\n", "```" ] }, { "cell_type": "code", "execution_count": null, "id": "b3bb1dab", "metadata": {}, "outputs": [], "source": [ "nodes = [\n", " {\n", " \"id\": \"matrixGenerationA\",\n", " \"task_identifier\": \"__main__.MatrixGeneration\",\n", " \"task_type\": \"class\",\n", " },\n", " {\n", " \"id\": \"matrixTransposeA\",\n", " \"task_identifier\": \"__main__.MatrixTranspose\",\n", " \"task_type\": \"class\",\n", " },\n", " {\n", " \"id\": \"matrixGenerationB\",\n", " \"task_identifier\": \"__main__.MatrixGeneration\",\n", " \"task_type\": \"class\",\n", " },\n", " {\n", " \"id\": \"matrixVerticalFlipB\",\n", " \"task_identifier\": \"__main__.MatrixVerticalFlip\",\n", " \"task_type\": \"class\",\n", " },\n", " {\n", " \"id\": \"matrixMultiplication\",\n", " \"task_identifier\": \"__main__.MatrixMultiplication\",\n", " \"task_type\": \"class\",\n", " },\n", "]\n", "\n", "links = [\n", " {\n", " \"source\": \"matrixGenerationA\",\n", " \"target\": \"matrixTransposeA\",\n", " \"data_mapping\": [{\"source_output\": \"matrix\", \"target_input\": \"M\"}],\n", " },\n", " {\n", " \"source\": \"matrixTransposeA\",\n", " \"target\": \"matrixMultiplication\",\n", " \"data_mapping\": [{\"source_output\": \"Mt\", \"target_input\": \"A\"}],\n", " },\n", " {\n", " \"source\": \"matrixGenerationB\",\n", " \"target\": \"matrixVerticalFlipB\",\n", " \"data_mapping\": [{\"source_output\": \"matrix\", \"target_input\": \"M\"}],\n", " },\n", " {\n", " \"source\": \"matrixVerticalFlipB\",\n", " \"target\": \"matrixMultiplication\",\n", " \"data_mapping\": [{\"source_output\": \"Mf\", \"target_input\": \"B\"}],\n", " },\n", "]\n", "\n", "workflow = {\n", " \"graph\": {\"id\": \"parallelMatrixWorkflow\"},\n", " \"nodes\": nodes,\n", " \"links\": links,\n", "}" ] }, { "cell_type": "markdown", "id": "66177d8f", "metadata": {}, "source": [ "\n", "## Parallel execution with pypushflow against standard Ewoks executon\n", "\n", "We will now benchmark the performance of the workflow by running it with and without `ppf` and with and without NumPy's internal multithreading.\n", "\n", "## Run with default NumPy behavior (multi-threaded BLAS/CBLAS)" ] }, { "cell_type": "code", "execution_count": null, "id": "b4c15be9", "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "from ewoksutils.task_utils import task_inputs\n", "\n", "from ewoks import execute_graph\n", "\n", "inputs = [\n", " *task_inputs(\n", " id=\"matrixGenerationA\", inputs={\"rows\": 8000, \"cols\": 8000, \"fill\": 0.2}\n", " ),\n", " *task_inputs(\n", " id=\"matrixGenerationB\", inputs={\"rows\": 8000, \"cols\": 80, \"fill\": 0.1}\n", " ),\n", "]" ] }, { "cell_type": "code", "execution_count": null, "id": "9bbd4027", "metadata": {}, "outputs": [], "source": [ "%time\n", "\n", "execute_graph(workflow, inputs=inputs, merge_outputs=True)" ] }, { "cell_type": "markdown", "id": "5d7c6746", "metadata": {}, "source": [ "Now let's execute the workflow using the ppf engine, which enables concurrent workflow execution:" ] }, { "cell_type": "code", "execution_count": null, "id": "1ffeb8f9", "metadata": {}, "outputs": [], "source": [ "%time\n", "\n", "execute_graph(workflow, engine=\"ppf\", inputs=inputs, merge_outputs=True)" ] }, { "cell_type": "markdown", "id": "222290be", "metadata": {}, "source": [ "The time difference between the two may not be significant. The reason is that, on few systems (especially laptops), NumPy's multithreading can obscure or even outperform multi-threaded parallelism due to efficient cache usage and the highly optimized nature of BLAS operations.\n", "\n", "To truly benchmark the difference between different workflow execution engine, we will run the execution by disabling NumPy's own internal parallel processing\n", "\n", "## Run with NumPy restricted to a single thread\n", "\n", "We can restrict NumPy to a single thread by setting some environment variables:" ] }, { "cell_type": "code", "execution_count": null, "id": "6cb2700d", "metadata": {}, "outputs": [], "source": [ "os.environ[\"OMP_NUM_THREADS\"] = \"1\"\n", "os.environ[\"OPENBLAS_NUM_THREADS\"] = \"1\"\n", "os.environ[\"MKL_NUM_THREADS\"] = \"1\"\n", "os.environ[\"NUMEXPR_NUM_THREADS\"] = \"1\"\n", "os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"1\"\n", "os.environ[\"BLIS_NUM_THREADS\"] = \"1\"" ] }, { "cell_type": "markdown", "id": "63570aa8", "metadata": {}, "source": [ "Let's now re-execute the workflow without and with `ppf`:" ] }, { "cell_type": "code", "execution_count": null, "id": "2bb05f61", "metadata": {}, "outputs": [], "source": [ "%time\n", "\n", "execute_graph(workflow, inputs=inputs, merge_outputs=True)" ] }, { "cell_type": "code", "execution_count": null, "id": "9c9335d0", "metadata": {}, "outputs": [], "source": [ "%time\n", "\n", "execute_graph(workflow, engine=\"ppf\", inputs=inputs, merge_outputs=True)" ] }, { "cell_type": "markdown", "id": "7fd1a7bd", "metadata": {}, "source": [ "## Performance Notes\n", "\n", "This workflow is structured to allow for parallel execution of the different branches. Specifically, the matrix generation and matrix transformation (transpose and flip) can run concurrently before converging at the final matrix multiplication.\n", "\n", "However, the actual performance gain from using `ppf` depends heavily on your system:\n", "- On laptops, the default single-core NumPy version may perform better than ppf, since the shared memory cache and uniform compute-intensive operations benefit from \n", "long uninterrupted CPU execution.\n", "- On servers or multi-core machines, where processor affinity and independent caches are more favorable, the ppf engine typically performs better.\n", "\n", "In summary, ppf provides true workflow level concurrent/parallelism, which is advantageous for heterogeneous workflows or IO-bound tasks" ] } ], "metadata": { "kernelspec": { "display_name": "ewoks (3.12.3)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 5 }