Parallel programming with PyCOMPSs

PyCOMPSs is a parallel programming model that aims to ease the development of applications for distributed computing platforms (i.e, HPC clusters, clouds, or container-managed infrastructures).

This poster will give an overview of PyCOMPSs, including new features, such as: the ability of reading/writing from streamed data, failure management at task level, and integration with NUMBA.

The poster will also include a section for the dislib, a machine learning library parallelized with PyCOMPSs.

This poster is divided in the following sections:

PyCOMPSs overview

PyCOMPSs is a parallel task-based programming model for distributed computing platforms. Based on a sequential interface, at execution time the COMPSs runtime is able to exploit the inherent parallelism of applications at task level.

A PyCOMPSs application is composed of tasks, methods annotated with decorators. The decorator identifies the tasks and includes the directionality of their parameters. The runtime builds at execution time a task-graph taking into account the task data-dependencies, and schedules and executes the tasks in the distributed infrastructure, taking also care of the required data transfers.

_images/tdg.png

PyCOMPSs/COMPSs applications are executed in distributed mode following the master-worker paradigm. The computational infrastructure is described in an XML file.

The application starts in the master node and tasks are offloaded to worker nodes. All data scheduling decisions and data transfers are performed by the runtime.

With regard to the execution environment, COMPSs runtime supports the execution of applications in large clusters, clouds and federated clouds, and container-managed infrastructures (Docker and Singularity). The runtime supports different features, such as elasticity (both in clouds and slurm-managed clusters), failure management at task-level and tasks’ exception management.

_images/execution_flow.png

PyCOMPSs/COMPSs is not only the programming model and runtime but comes with a set of tools that provides the user with a full environment. The runtime is instrumented with the Extrae library (also from BSC) and post-mortem tracefiles of the applications can be generated using an execution flag. These traces can be visualized with the Paraver browser, offering a very powerful environment to analyze the performance of the applications.

Another component is the COMPSs monitor, a web-based application that enables to monitor the execution of the applications. It visualizes the task-graph under execution, the available resources, the workload, etc.

Jupyter notebooks are also integrated with the COMPSs runtime, enabling to run PyCOMPSs notebooks, both in in-house infrastructures or in Binder servers.

_images/environment.png

PyCOMPSs decorators

The @task is the basic decorator which is used to identify a method that will become a task at execution time. The directionality of the method parameters are indicated in the @task annotation as “IN” (when the parameter is read, default value), “OUT” (when the parameter is written), or “INOUT” (when the parameter is read and written).

@task(c=INOUT)
def multiply(a, b, c):
    c += a*b

Constraints

PyCOMPSs supports the definition of constraints on the tasks, i.e., specific hardware or software that is required to execute the tasks. Sample constraints can be a given number of CPU cores or a minimum amount of memory to be allocated in the node.

@constraint(memory_size=6.0, processor_performance="5000")
@task(c=INOUT)
def myfunc(a, b, c):
    # ...

Versions

PyCOMPSs also supports the definition of multiple versions of the same task type: i.e., a specific version for a general-purpose processor and another one for a GPU.

@implement(source_class="myclass", method="myfunc")
@constraint(memory_size=1.0, processor_type="ARM")
@task(c=INOUT)
def myfunc_in_the_edge(a, b, c):
    # ...

Linking with other programming models

A task can be more than a sequential function: can be a sequential task run in a single core, a multicore task or a task spanning into multiple nodes of a cluster or a cloud.

PyCOMPSs is also very flexible with regard to the nature of the tasks that compose their applications, with features to support other programming models, such as OpenMP, OmpSs or MPI, and also with external binaries. In this sense, a PyCOMPSs application can be a large workflow that orchestrates multiple MPI simulations and then executes some analytics in Python, for example.

@constraint(computing_units="248")
@mpi(runner="mpirun", computingNodes= "16", ...)
@task(returns=int, stdOutFile=FILE_OUT_STDOUT, ...)
def nems(stdOutFile, stdErrFile):
    pass

Integration with Numba

PyCOMPSs has been integrated with Numba just in time compilation. Tasks in PyCOMPSs can be annotated with the @jit decorator below the @task decorator. An alternative syntax is to use the “numba=’jit’” clause inside the @task decorator. The code of the tasks is passed to the Numba compiler and the compiled version used at execution time.

The two alternative syntaxes are shown below:

@task(returns=1)
@jit()
def numba_func(a, b):
    ...
@task(numba='jit')
def jit_func(a, b):
    ...

Recent extensions

Failure management and exceptions

A recent extension to PyCOMPSs is an interface that enables the programmer to give hints about how to proceed in case of failure of individual tasks. This new feature enables the workflow execution to continue in the occurrence of individual task failures. The programmer can also define timeouts to tasks.

@task(file_path=FILE_INOUT, on_failure='CANCEL_SUCCESSORS')
def task(file_path):
    # ...
    if cond :
        raise Exception()

Another new mechanism enables to throw exceptions from tasks that are captured by the main program. This mechanism is combined with the definition of task groups. When a task in the group throws an exception, then the outstanding tasks in the group are canceled and the main program can define a new alternative behaviour.

This enables the definition of very dynamic workflows that depend on the actual results of the tasks.

@task(file_path=FILE_INOUT)
def comp_task(file_path):
    # ...
    raise COMPSsException("Exception")
def test_cancellation(file_name):
    try:
        with TaskGroup('failedGroup'):
            long_task(file_name)
            long_task(file_name)
            executed_task(file_name)
            comp_task(file_name)
    except COMPSsException:
        print("COMPSsException caught")
        write_two(file_name)
    write_two(file_name)

Support for data streams

A new interface to support streaming data in tasks has been defined Tasks that exchange streamed data persist while streams are not closed (data-flow tasks).

This extension enables the combination of Task and Data Flows (Hybrid Flows) using the same programming model and allows developers to build complex Data Science pipelines with different approaches.

@task(fds=STREAM_OUT)
def sensor(fds):
    # ...
    while not end():
        data = get_data_from_sensor()
        f.write(data)
    fds.close()
@task(fds_sensor=STREAM_IN, filtered=OUT)
def filter(fds_sensor, filtered):
    # ...
    while not fds_sensor.is_closed():
        get_and_filter(fds_sensor, filtered)

Dislib

Dislib is a distributed machine learning library parallelized with PyCOMPSs that enables large-scale data analytics on HPC infrastructures. Inspired by scikit-learn, dislib provides an estimator-based interface that improves productivity by making algorithms easy to use and interchangeable. This interface also makes programming with dislib very easy to scientists already familiar with scikit-learn. Dislib also provides a distributed data structure that can be operated as a regular Python object. The combination of this data structure and the estimator-based interface makes dislib a distributed version of scikit-learn, where communications, data transfers, and parallelism are automatically handled behind the scene.

_images/dislib_clustering.png

Comparison with MLlib and Dask-ML

Dislib performance has been compared with MLlib and Dask-ML. The performance of dislib is not only better but also, for very large sizes, dislib can obtain results while MLlib and dask fail to finish the execution.

The figure below shows the execution time of a K-means clustering of 500 million samples of 100 features, and 500 clusters. The execution is done in the MareNostrum supercomputer which accounts with nodes of 48 cores, using from 1 to 32 nodes. Those core counts for which there is no information indicate that we have not been able to run that case due to memory issues.

500 million samples and 100 features

The figure below shows the execution time of 2 billion samples of 100 features, and 500 clusters. We were not able to run this sample case neither with Dask-ML, nor with MLlib.

2 billion samples and 100 features

Use cases

Example use of constraints: Guidance

A sample application where constraints are useful is Guidance, a tool for Genome-Wide Association Studies (GWAS) developed by the BSC Life Sciences department with other collaborators. This workflow composes multiple external binaries and scripts

A lot of its tasks, although sequential, have different high memory requirements. With individual memory constraints per each task type, the runtime can decide how many tasks can run concurrently in a given node, without exceeding the node memory capacity.

_images/guidance.png
@constraint(memory_size=6.0)
@task(gmapFile=FILE_IN, knownHapFile=FILE_IN, ...)
def imputeWithImpute(gmapFile, knownHapFile, theChromo, ...):
    # ...

NMMB-Monarch: Weather and dust forecast

An example of usage of this idea is the application Multiscale Online Nonhydrostatic Atmosphere Chemistry model (NMMB-Monarch) that aims at providing short to medium range weather and gas-phase chemistry forecasts from regional to global scales that performs weather and dust forecast. NMMB-Monarch is used as an operational tool to provide information services at BSC.

The application combines multiple sequential scripts and MPI simulations. PyCOMPSs enables the smooth orchestration of all them as a single workflow.

_images/monarch_result.png
_images/monarch_graph.png

Use case: MPI simulations, analytics and streaming

In this section, we illustrate a sample case where multiple MPI simulations generating data at given steps to be processed by some analytics.

For instance, the graph below shows the case of a pure task-based application that launches a given number of MPI simulations (blue nodes). Each simulation produces output files at different time steps of the simulation (i.e., an output file every iteration of the simulation). The results of these simulations are processed separately (white and red nodes) and merged to a single resulting output per simulation (pink nodes).

_images/streaming_graph1.png

With a regular task-based execution model, the tasks processing the results will need to wait till the end of the simulations. The figure below shows a tracefile of the execution of a sample application with such behavior. Each line represents a core executing the different tasks. The simulation tasks, although could be run with multiple cores/nodes, in this trace are represented as single core tasks.

_images/streaming_trace1.png

In the graph below, we represent the hybrid task-based/data-flow workflow, where the simulation tasks write into a stream. The data is then read by the main program and forward to the process tasks. Another alternative would have been to connect directly the process tasks with the stream, reading directly the results of the simulations.

_images/streaming_graph2.png

The trace below show the improvement obtained by the use of streams, since process tasks do not need to wait until the end of the simulation tasks to start processing the results. This enable the overlapping of the two type of tasks.

_images/streaming_trace2.png

Evaluation of task exception mechanism with the dislib

The task exception mechanism has been evaluated with the Grid search model selection algorithm executed with the Cascade-SVM estimator.

Cascade-SVM performs a convergence check at the end of each iteration. The convergence check requires a data synchronization that prevents concurrent execution of multiple estimators.

The convergence check has been encapsulated into a task that throws an exception if the convergence criterium is met. The main program cancels the remaining iterations when the exception is captured.

This new implementation (Except) shows better performance results than previous versions, either with convergence check (Synch) or without convergence check running a maximum number of iterations (Max it).

_images/dislib_exceptions.png

Demos

A larger set of interactive notebooks is available at mybinder

logo1 logo4 logo6

logo5 logo2 logo8

logo7 logo3