Use different engines#

This notebook demonstrates how to execute a workflow with different engines.

[1]:
import os
import shutil

from ewoks import execute_graph
from ewokscore.variable import value_from_transfer
from ewokscore.tests.utils.show import show_graph
from ewokscore.tests.examples.graphs import get_graph

Select and configure a task scheduler#

[2]:
scheduler = None  # select any of the schedulers shown below

execute_options = dict()

if scheduler == "dask_multithreading":
    # tasks are distributed by a local scheduler over multiple threads
    execute_options["engine"] = "dask"
    execute_options["scheduler"] = "multithreading"
elif scheduler == "dask_multiprocessing":
    # tasks are distributed by a local scheduler over multiple processes
    execute_options["engine"] = "dask"
    execute_options["scheduler"] = "multiprocessing"
elif scheduler == "dask_local_cluster":
    # tasks are distributed by a local scheduler
    execute_options["engine"] = "dask"
    execute_options["scheduler"] = "cluster"
elif scheduler == "dask_remote_cluster":
    # tasks are distributed by a remote scheduler
    execute_options["engine"] = "dask"
    execute_options["scheduler"] = "127.0.0.1:40331"

    # There are different ways to start a remote scheduler
    #
    # Start a dask scheduler on any host:
    #  >>> from ewoksdask.schedulers import local_scheduler
    #  >>> scheduler = local_scheduler(n_workers=5)
    #
    # Start a dask scheduler on a host that can launch slurm jobs:
    #  >>> from ewoksdask.schedulers import slurm_scheduler
    #  >>> scheduler = slurm_scheduler(maximum_jobs=5)

elif scheduler == "pypushflow":
    # tasks are distributed by a local scheduler over multiple processes
    execute_options["engine"] = "ppf"
else:
    # tasks are executed in a single thread
    scheduler = None
    pass

Select a task graph#

[7]:
taskgraph, expected_results = get_graph("acyclic1")
show_graph(taskgraph, stdout=False, plot=True)
../_images/tutorials_running_workflows_5_0.png

Configure storage of results (if any)#

[4]:
def prepare_storage(clean=True):
    if not varinfo["root_uri"]:
        return  # results are not persisted by ewoks
    if clean:
        shutil.rmtree(varinfo["root_uri"], ignore_errors=True)
    os.makedirs(varinfo["root_uri"], exist_ok=True)


varinfo = {"root_uri": None}  # folder where all results will be saved
execute_options["varinfo"] = varinfo
prepare_storage(clean=True)

Execute the workflow#

[5]:
print("Execution options:")
print(execute_options)
results = execute_graph(taskgraph, **execute_options)
Execution options:
{'varinfo': {'root_uri': None}}

Inspect results#

[6]:
for name, result in results.items():
    value = value_from_transfer(result, varinfo=varinfo)
    print(f" {name} =", value)
 result = 16