Parallel programming with PyCOMPSs
PyCOMPSs is a parallel programming model that aims to ease the development of applications for distributed computing platforms (i.e, HPC clusters, clouds, or container-managed infrastructures).
This poster will give an overview of PyCOMPSs, including new features, such as: the ability of reading/writing from streamed data, failure management at task level, and integration with NUMBA.
The poster will also include a section for the dislib, a machine learning library parallelized with PyCOMPSs.
This poster is divided in the following sections:
PyCOMPSs overview
PyCOMPSs is a parallel task-based programming model for distributed computing platforms. Based on a sequential interface, at execution time the COMPSs runtime is able to exploit the inherent parallelism of applications at task level.
A PyCOMPSs application is composed of tasks, methods annotated with decorators. The decorator identifies the tasks and includes the directionality of their parameters. The runtime builds at execution time a task-graph taking into account the task data-dependencies, and schedules and executes the tasks in the distributed infrastructure, taking also care of the required data transfers.
PyCOMPSs/COMPSs applications are executed in distributed mode following the master-worker paradigm. The computational infrastructure is described in an XML file.
The application starts in the master node and tasks are offloaded to worker nodes. All data scheduling decisions and data transfers are performed by the runtime.
With regard to the execution environment, COMPSs runtime supports the execution of applications in large clusters, clouds and federated clouds, and container-managed infrastructures (Docker and Singularity). The runtime supports different features, such as elasticity (both in clouds and slurm-managed clusters), failure management at task-level and tasks’ exception management.
PyCOMPSs/COMPSs is not only the programming model and runtime but comes with a set of tools that provides the user with a full environment. The runtime is instrumented with the Extrae library (also from BSC) and post-mortem tracefiles of the applications can be generated using an execution flag. These traces can be visualized with the Paraver browser, offering a very powerful environment to analyze the performance of the applications.
Another component is the COMPSs monitor, a web-based application that enables to monitor the execution of the applications. It visualizes the task-graph under execution, the available resources, the workload, etc.
Jupyter notebooks are also integrated with the COMPSs runtime, enabling to run PyCOMPSs notebooks, both in in-house infrastructures or in Binder servers.
PyCOMPSs decorators
The @task is the basic decorator which is used to identify a method that will become a task at execution time. The directionality of the method parameters are indicated in the @task annotation as “IN” (when the parameter is read, default value), “OUT” (when the parameter is written), or “INOUT” (when the parameter is read and written).
@task(c=INOUT)
def multiply(a, b, c):
c += a*b
Constraints
PyCOMPSs supports the definition of constraints on the tasks, i.e., specific hardware or software that is required to execute the tasks. Sample constraints can be a given number of CPU cores or a minimum amount of memory to be allocated in the node.
@constraint(memory_size=6.0, processor_performance="5000")
@task(c=INOUT)
def myfunc(a, b, c):
# ...
Versions
PyCOMPSs also supports the definition of multiple versions of the same task type: i.e., a specific version for a general-purpose processor and another one for a GPU.
@implement(source_class="myclass", method="myfunc")
@constraint(memory_size=1.0, processor_type="ARM")
@task(c=INOUT)
def myfunc_in_the_edge(a, b, c):
# ...
Linking with other programming models
A task can be more than a sequential function: can be a sequential task run in a single core, a multicore task or a task spanning into multiple nodes of a cluster or a cloud.
PyCOMPSs is also very flexible with regard to the nature of the tasks that compose their applications, with features to support other programming models, such as OpenMP, OmpSs or MPI, and also with external binaries. In this sense, a PyCOMPSs application can be a large workflow that orchestrates multiple MPI simulations and then executes some analytics in Python, for example.
@constraint(computing_units="248")
@mpi(runner="mpirun", computingNodes= "16", ...)
@task(returns=int, stdOutFile=FILE_OUT_STDOUT, ...)
def nems(stdOutFile, stdErrFile):
pass
Integration with Numba
PyCOMPSs has been integrated with Numba just in time compilation. Tasks in PyCOMPSs can be annotated with the @jit decorator below the @task decorator. An alternative syntax is to use the “numba=’jit’” clause inside the @task decorator. The code of the tasks is passed to the Numba compiler and the compiled version used at execution time.
The two alternative syntaxes are shown below:
@task(returns=1)
@jit()
def numba_func(a, b):
...
@task(numba='jit')
def jit_func(a, b):
...
Recent extensions
Failure management and exceptions
A recent extension to PyCOMPSs is an interface that enables the programmer to give hints about how to proceed in case of failure of individual tasks. This new feature enables the workflow execution to continue in the occurrence of individual task failures. The programmer can also define timeouts to tasks.
@task(file_path=FILE_INOUT, on_failure='CANCEL_SUCCESSORS')
def task(file_path):
# ...
if cond :
raise Exception()
Another new mechanism enables to throw exceptions from tasks that are captured by the main program. This mechanism is combined with the definition of task groups. When a task in the group throws an exception, then the outstanding tasks in the group are canceled and the main program can define a new alternative behaviour.
This enables the definition of very dynamic workflows that depend on the actual results of the tasks.
@task(file_path=FILE_INOUT)
def comp_task(file_path):
# ...
raise COMPSsException("Exception")
def test_cancellation(file_name):
try:
with TaskGroup('failedGroup'):
long_task(file_name)
long_task(file_name)
executed_task(file_name)
comp_task(file_name)
except COMPSsException:
print("COMPSsException caught")
write_two(file_name)
write_two(file_name)
Support for data streams
A new interface to support streaming data in tasks has been defined Tasks that exchange streamed data persist while streams are not closed (data-flow tasks).
This extension enables the combination of Task and Data Flows (Hybrid Flows) using the same programming model and allows developers to build complex Data Science pipelines with different approaches.
@task(fds=STREAM_OUT)
def sensor(fds):
# ...
while not end():
data = get_data_from_sensor()
f.write(data)
fds.close()
@task(fds_sensor=STREAM_IN, filtered=OUT)
def filter(fds_sensor, filtered):
# ...
while not fds_sensor.is_closed():
get_and_filter(fds_sensor, filtered)
Dislib
Dislib is a distributed machine learning library parallelized with PyCOMPSs that enables large-scale data analytics on HPC infrastructures. Inspired by scikit-learn, dislib provides an estimator-based interface that improves productivity by making algorithms easy to use and interchangeable. This interface also makes programming with dislib very easy to scientists already familiar with scikit-learn. Dislib also provides a distributed data structure that can be operated as a regular Python object. The combination of this data structure and the estimator-based interface makes dislib a distributed version of scikit-learn, where communications, data transfers, and parallelism are automatically handled behind the scene.
Comparison with MLlib and Dask-ML
Dislib performance has been compared with MLlib and Dask-ML. The performance of dislib is not only better but also, for very large sizes, dislib can obtain results while MLlib and dask fail to finish the execution.
The figure below shows the execution time of a K-means clustering of 500 million samples of 100 features, and 500 clusters. The execution is done in the MareNostrum supercomputer which accounts with nodes of 48 cores, using from 1 to 32 nodes. Those core counts for which there is no information indicate that we have not been able to run that case due to memory issues.
The figure below shows the execution time of 2 billion samples of 100 features, and 500 clusters. We were not able to run this sample case neither with Dask-ML, nor with MLlib.
Use cases
Example use of constraints: Guidance
A sample application where constraints are useful is Guidance, a tool for Genome-Wide Association Studies (GWAS) developed by the BSC Life Sciences department with other collaborators. This workflow composes multiple external binaries and scripts
A lot of its tasks, although sequential, have different high memory requirements. With individual memory constraints per each task type, the runtime can decide how many tasks can run concurrently in a given node, without exceeding the node memory capacity.
@constraint(memory_size=6.0)
@task(gmapFile=FILE_IN, knownHapFile=FILE_IN, ...)
def imputeWithImpute(gmapFile, knownHapFile, theChromo, ...):
# ...
NMMB-Monarch: Weather and dust forecast
An example of usage of this idea is the application Multiscale Online Nonhydrostatic Atmosphere Chemistry model (NMMB-Monarch) that aims at providing short to medium range weather and gas-phase chemistry forecasts from regional to global scales that performs weather and dust forecast. NMMB-Monarch is used as an operational tool to provide information services at BSC.
The application combines multiple sequential scripts and MPI simulations. PyCOMPSs enables the smooth orchestration of all them as a single workflow.
Use case: MPI simulations, analytics and streaming
In this section, we illustrate a sample case where multiple MPI simulations generating data at given steps to be processed by some analytics.
For instance, the graph below shows the case of a pure task-based application that launches a given number of MPI simulations (blue nodes). Each simulation produces output files at different time steps of the simulation (i.e., an output file every iteration of the simulation). The results of these simulations are processed separately (white and red nodes) and merged to a single resulting output per simulation (pink nodes).
With a regular task-based execution model, the tasks processing the results will need to wait till the end of the simulations. The figure below shows a tracefile of the execution of a sample application with such behavior. Each line represents a core executing the different tasks. The simulation tasks, although could be run with multiple cores/nodes, in this trace are represented as single core tasks.
In the graph below, we represent the hybrid task-based/data-flow workflow, where the simulation tasks write into a stream. The data is then read by the main program and forward to the process tasks. Another alternative would have been to connect directly the process tasks with the stream, reading directly the results of the simulations.
The trace below show the improvement obtained by the use of streams, since process tasks do not need to wait until the end of the simulation tasks to start processing the results. This enable the overlapping of the two type of tasks.
Evaluation of task exception mechanism with the dislib
The task exception mechanism has been evaluated with the Grid search model selection algorithm executed with the Cascade-SVM estimator.
Cascade-SVM performs a convergence check at the end of each iteration. The convergence check requires a data synchronization that prevents concurrent execution of multiple estimators.
The convergence check has been encapsulated into a task that throws an exception if the convergence criterium is met. The main program cancels the remaining iterations when the exception is captured.
This new implementation (Except) shows better performance results than previous versions, either with convergence check (Synch) or without convergence check running a maximum number of iterations (Max it).
Demos
Wordcount
Counts the number of appearances of each word from a set of files.
[1]:
import pycompss.interactive as ipycompss
[2]:
import os
if 'BINDER_SERVICE_HOST' in os.environ:
ipycompss.start(graph=True, trace=True, debug=False,
project_xml='../xml/project.xml',
resources_xml='../xml/resources.xml')
else:
ipycompss.start(graph=True, monitor=1000) # trace=True
******************************************************
*************** PyCOMPSs Interactive *****************
******************************************************
* .-~~-.--. _____ ________ *
* : ) |____ \ |____ / *
* .~ ~ -.\ /.- ~~ . ___) | / / *
* > `. .' < / ___/ / / *
* ( .- -. ) | |___ _ / / *
* `- -.-~ `- -' ~-.- -' |_____| |_| /__/ *
* ( : ) _ _ .-: *
* ~--. : .--~ .-~ .-~ } *
* ~-.-^-.-~ \_ .~ .-~ .~ *
* \ \ ' \ '_ _ -~ *
* \`.\`. // *
* . - ~ ~-.__\`.\`-.// *
* .-~ . - ~ }~ ~ ~-.~-. *
* .' .-~ .-~ :/~-.~-./: *
* /_~_ _ . - ~ ~-.~-._ *
* ~-.< *
******************************************************
* - Starting COMPSs runtime... *
* - Log path : /home/user/.COMPSs/Interactive_01/
* - PyCOMPSs Runtime started... Have fun! *
******************************************************
[3]:
from pycompss.api.task import task
from pycompss.api.parameter import *
[4]:
@task(returns=dict, file_path=FILE_IN)
def word_count(file_path):
"""
Read the given file and construct a frequency word dictionary from a list of words.
:param data: a list of words
:return: a dictionary where key=word and value=#appearances
"""
# Read the given file
data = []
with open(file_path, 'r') as f:
for line in f:
data += line.split()
# Parse the content
partialResult = {}
for entry in data:
if entry in partialResult:
partialResult[entry] += 1
else:
partialResult[entry] = 1
return partialResult
Found task: word_count
[5]:
@task(returns=dict, priority=True)
def merge_two_dicts(dic1, dic2):
"""
Update a dictionary with another dictionary.
:param dic1: first dictionary
:param dic2: second dictionary
:return: dic1+=dic2
"""
for k in dic2:
if k in dic1:
dic1[k] += dic2[k]
else:
dic1[k] = dic2[k]
return dic1
Found task: merge_two_dicts
Main wordcount code:
[6]:
from pycompss.api.api import compss_wait_on
# Get the dataset path
path_dataset = os.getcwd() + '/dataset'
# Construct a list with the file's paths from the dataset
partial_result = []
for file_name in os.listdir(path_dataset):
f = os.path.join(path_dataset, file_name)
partial_result.append(word_count(f))
# Accumulate the partial results to get the final result.
result = {}
for partial in partial_result:
result = merge_two_dicts(result, partial)
# Wait for result
result = compss_wait_on(result)
Now lets see the results:
[7]:
from pprint import pprint
print("Result:")
pprint(result)
print("Total words: {}".format(sum(result.values())))
Result:
{'Adipisci': 227,
'Aliquam': 233,
'Amet': 207,
'Consectetur': 201,
'Dolor': 198,
'Dolore': 236,
'Dolorem': 232,
'Eius': 251,
'Est': 197,
'Etincidunt': 232,
'Ipsum': 228,
'Labore': 229,
'Magnam': 195,
'Modi': 201,
'Neque': 205,
'Non': 226,
'Numquam': 253,
'Porro': 205,
'Quaerat': 217,
'Quiquia': 212,
'Quisquam': 214,
'Sed': 225,
'Sit': 220,
'Tempora': 189,
'Ut': 217,
'Velit': 218,
'Voluptatem': 235,
'adipisci': 1078,
'aliquam': 1107,
'amet': 1044,
'consectetur': 1073,
'dolor': 1120,
'dolore': 1065,
'dolorem': 1107,
'eius': 1048,
'est': 1101,
'etincidunt': 1114,
'ipsum': 1061,
'labore': 1070,
'magnam': 1096,
'modi': 1127,
'neque': 1093,
'non': 1099,
'numquam': 1094,
'porro': 1101,
'quaerat': 1086,
'quiquia': 1079,
'quisquam': 1144,
'sed': 1109,
'sit': 1130,
'tempora': 1064,
'ut': 1070,
'velit': 1105,
'voluptatem': 1121}
Total words: 35409
Plot the results:
[8]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.pyplot as plt
words = result.keys()
y_pos = np.arange(len(words))
appearances = result.values()
plt.rcParams['figure.figsize'] = [17, 5]
plt.bar(y_pos, appearances, align='center', alpha=0.5)
plt.grid(axis='y')
plt.xticks(y_pos, words, rotation=90)
plt.ylabel('# appearances')
plt.xlabel('Word')
plt.title('Wordcount')
plt.show()

[9]:
ipycompss.stop()
****************************************************
*************** STOPPING PyCOMPSs ******************
****************************************************
Warning: some of the variables used with PyCOMPSs may
have not been brought to the master.
****************************************************
KMeans
KMeans is machine-learning algorithm (NP-hard), popularly employed for cluster analysis in data mining, and interesting for benchmarking and performance evaluation.
The objective of the Kmeans algorithm to group a set of multidimensional points into a predefined number of clusters, in which each point belongs to the closest cluster (with the nearest mean distance), in an iterative process.
[1]:
import pycompss.interactive as ipycompss
[2]:
import os
if 'BINDER_SERVICE_HOST' in os.environ:
ipycompss.start(graph=True, # trace=True
project_xml='../xml/project.xml',
resources_xml='../xml/resources.xml')
else:
ipycompss.start(graph=True, monitor=1000) # trace=True
******************************************************
*************** PyCOMPSs Interactive *****************
******************************************************
* .-~~-.--. _____ ________ *
* : ) |____ \ |____ / *
* .~ ~ -.\ /.- ~~ . ___) | / / *
* > `. .' < / ___/ / / *
* ( .- -. ) | |___ _ / / *
* `- -.-~ `- -' ~-.- -' |_____| |_| /__/ *
* ( : ) _ _ .-: *
* ~--. : .--~ .-~ .-~ } *
* ~-.-^-.-~ \_ .~ .-~ .~ *
* \ \ ' \ '_ _ -~ *
* \`.\`. // *
* . - ~ ~-.__\`.\`-.// *
* .-~ . - ~ }~ ~ ~-.~-. *
* .' .-~ .-~ :/~-.~-./: *
* /_~_ _ . - ~ ~-.~-._ *
* ~-.< *
******************************************************
* - Starting COMPSs runtime... *
* - Log path : /home/user/.COMPSs/Interactive_01/
* - PyCOMPSs Runtime started... Have fun! *
******************************************************
[3]:
from pycompss.api.task import task
[4]:
import numpy as np
[5]:
def init_random(numV, dim, seed):
np.random.seed(seed)
c = [np.random.uniform(-3.5, 3.5, dim)]
while len(c) < numV:
p = np.random.uniform(-3.5, 3.5, dim)
distance = [np.linalg.norm(p-i) for i in c]
if min(distance) > 2:
c.append(p)
return c
[6]:
#@task(returns=list) # Not a task for plotting
def genFragment(numV, K, c, dim, mode='gauss'):
if mode == "gauss":
n = int(float(numV) / K)
r = numV % K
data = []
for k in range(K):
s = np.random.uniform(0.05, 0.75)
for i in range(n+r):
d = np.array([np.random.normal(c[k][j], s) for j in range(dim)])
data.append(d)
return np.array(data)[:numV]
else:
return [np.random.random(dim) for _ in range(numV)]
[7]:
@task(returns=dict)
def cluster_points_partial(XP, mu, ind):
dic = {}
for x in enumerate(XP):
bestmukey = min([(i[0], np.linalg.norm(x[1] - mu[i[0]])) for i in enumerate(mu)], key=lambda t: t[1])[0]
if bestmukey not in dic:
dic[bestmukey] = [x[0] + ind]
else:
dic[bestmukey].append(x[0] + ind)
return dic
Found task: cluster_points_partial
[8]:
@task(returns=dict)
def partial_sum(XP, clusters, ind):
p = [(i, [(XP[j - ind]) for j in clusters[i]]) for i in clusters]
dic = {}
for i, l in p:
dic[i] = (len(l), np.sum(l, axis=0))
return dic
Found task: partial_sum
[9]:
@task(returns=dict, priority=True)
def reduceCentersTask(a, b):
for key in b:
if key not in a:
a[key] = b[key]
else:
a[key] = (a[key][0] + b[key][0], a[key][1] + b[key][1])
return a
Found task: reduceCentersTask
[10]:
def mergeReduce(function, data):
from collections import deque
q = deque(list(range(len(data))))
while len(q):
x = q.popleft()
if len(q):
y = q.popleft()
data[x] = function(data[x], data[y])
q.append(x)
else:
return data[x]
[11]:
def has_converged(mu, oldmu, epsilon, iter, maxIterations):
print("iter: " + str(iter))
print("maxIterations: " + str(maxIterations))
if oldmu != []:
if iter < maxIterations:
aux = [np.linalg.norm(oldmu[i] - mu[i]) for i in range(len(mu))]
distancia = sum(aux)
if distancia < epsilon * epsilon:
print("Distance_T: " + str(distancia))
return True
else:
print("Distance_F: " + str(distancia))
return False
else:
# Reached the max amount of iterations
return True
[12]:
def plotKMEANS(dim, mu, clusters, data):
import pylab as plt
colors = ['b','g','r','c','m','y','k']
if dim == 2 and len(mu) <= len(colors):
from matplotlib.patches import Circle
from matplotlib.collections import PatchCollection
fig, ax = plt.subplots(figsize=(10,10))
patches = []
pcolors = []
for i in range(len(clusters)):
for key in clusters[i].keys():
d = clusters[i][key]
for j in d:
j = j - i * len(data[0])
C = Circle((data[i][j][0], data[i][j][1]), .05)
pcolors.append(colors[key])
patches.append(C)
collection = PatchCollection(patches)
collection.set_facecolor(pcolors)
ax.add_collection(collection)
x, y = zip(*mu)
plt.plot(x, y, '*', c='y', markersize=20)
plt.autoscale(enable=True, axis='both', tight=False)
plt.show()
elif dim == 3 and len(mu) <= len(colors):
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
for i in range(len(clusters)):
for key in clusters[i].keys():
d = clusters[i][key]
for j in d:
j = j - i * len(data[0])
ax.scatter(data[i][j][0], data[i][j][1], data[i][j][2], 'o', c=colors[key])
x, y, z = zip(*mu)
for i in range(len(mu)):
ax.scatter(x[i], y[i], z[i], s=80, c='y', marker='D')
plt.show()
else:
print("No representable dim or not enough colours")
MAIN
[13]:
%matplotlib inline
import ipywidgets as widgets
from pycompss.api.api import compss_wait_on
w_numV = widgets.IntText(value=10000) # Number of Vectors - with 1000 it is feasible to see the evolution across iterations
w_dim = widgets.IntText(value=2) # Number of Dimensions
w_k = widgets.IntText(value=4) # Centers
w_numFrag = widgets.IntText(value=16) # Fragments
w_epsilon = widgets.FloatText(value=1e-10) # Convergence condition
w_maxIterations = widgets.IntText(value=20) # Max number of iterations
w_seed = widgets.IntText(value=8) # Random seed
def kmeans(numV, dim, k, numFrag, epsilon, maxIterations, seed):
size = int(numV / numFrag)
cloudCenters = init_random(k, dim, seed) # centers to create data groups
X = [genFragment(size, k, cloudCenters, dim, mode='gauss') for _ in range(numFrag)]
mu = init_random(k, dim, seed - 1) # First centers
oldmu = []
n = 0
while not has_converged(mu, oldmu, epsilon, n, maxIterations):
oldmu = mu
clusters = [cluster_points_partial(X[f], mu, f * size) for f in range(numFrag)]
partialResult = [partial_sum(X[f], clusters[f], f * size) for f in range(numFrag)]
mu = mergeReduce(reduceCentersTask, partialResult)
mu = compss_wait_on(mu)
mu = [mu[c][1] / mu[c][0] for c in mu]
while len(mu) < k:
# Add new random center if one of the centers has no points.
indP = np.random.randint(0, size)
indF = np.random.randint(0, numFrag)
mu.append(X[indF][indP])
n += 1
clusters = compss_wait_on(clusters)
plotKMEANS(dim, mu, clusters, X)
print("--------------------")
print("Result:")
print("Iterations: ", n)
print("Centers: ", mu)
print("--------------------")
widgets.interact_manual(kmeans, numV=w_numV, dim=w_dim, k=w_k, numFrag=w_numFrag, epsilon=w_epsilon, maxIterations=w_maxIterations, seed=w_seed)
[13]:
<function __main__.kmeans>
[14]:
ipycompss.stop()
****************************************************
*************** STOPPING PyCOMPSs ******************
****************************************************
Warning: some of the variables used with PyCOMPSs may
have not been brought to the master.
****************************************************
Dislib tutorial
This tutorial will show the basics of using dislib.
Setup
First, we need to start an interactive PyCOMPSs session:
[1]:
import pycompss.interactive as ipycompss
import os
if 'BINDER_SERVICE_HOST' in os.environ:
ipycompss.start(graph=True,
project_xml='../xml/project.xml',
resources_xml='../xml/resources.xml')
else:
ipycompss.start(graph=True, monitor=1000)
******************************************************
*************** PyCOMPSs Interactive *****************
******************************************************
* .-~~-.--. _____ ________ *
* : ) |____ \ |____ / *
* .~ ~ -.\ /.- ~~ . ___) | / / *
* > `. .' < / ___/ / / *
* ( .- -. ) | |___ _ / / *
* `- -.-~ `- -' ~-.- -' |_____| |_| /__/ *
* ( : ) _ _ .-: *
* ~--. : .--~ .-~ .-~ } *
* ~-.-^-.-~ \_ .~ .-~ .~ *
* \ \ ' \ '_ _ -~ *
* \`.\`. // *
* . - ~ ~-.__\`.\`-.// *
* .-~ . - ~ }~ ~ ~-.~-. *
* .' .-~ .-~ :/~-.~-./: *
* /_~_ _ . - ~ ~-.~-._ *
* ~-.< *
******************************************************
* - Starting COMPSs runtime... *
* - Log path : /home/user/.COMPSs/Interactive_01/
* - PyCOMPSs Runtime started... Have fun! *
******************************************************
Next, we import dislib and we are all set to start working!
[2]:
import dislib as ds
Distributed arrays
The main data structure in dislib is the distributed array (or ds-array). These arrays are a distributed representation of a 2-dimensional array that can be operated as a regular Python object. Usually, rows in the array represent samples, while columns represent features.
To create a random array we can run the following NumPy-like command:
[3]:
x = ds.random_array(shape=(500, 500), block_size=(100, 100))
print(x.shape)
x
(500, 500)
[3]:
ds-array(blocks=(...), top_left_shape=(100, 100), reg_shape=(100, 100), shape=(500, 500), sparse=False)
Now x
is a 500x500 ds-array of random numbers stored in blocks of 100x100 elements. Note that x
is not stored in memory. Instead, random_array
generates the contents of the array in tasks that are usually executed remotely. This allows the creation of really big arrays.
The content of x
is a list of Futures
that represent the actual data (wherever it is stored).
To see this, we can access the _blocks
field of x
:
[4]:
x._blocks[0][0]
[4]:
<pycompss.runtime.binding.Future at 0x7fb45006c4a8>
block_size
is useful to control the granularity of dislib algorithms.
To retrieve the actual contents of x
, we use collect
, which synchronizes the data and returns the equivalent NumPy array:
[5]:
x.collect()
[5]:
array([[0.64524889, 0.69250058, 0.97374899, ..., 0.36928149, 0.75461806,
0.98784805],
[0.61753454, 0.092557 , 0.4967433 , ..., 0.87482708, 0.44454572,
0.0022951 ],
[0.78473344, 0.15288068, 0.70022708, ..., 0.82488172, 0.16980005,
0.30608108],
...,
[0.16257112, 0.94326181, 0.26206143, ..., 0.49725598, 0.80564738,
0.69616444],
[0.25089352, 0.10652958, 0.79657793, ..., 0.86936011, 0.67382938,
0.78140887],
[0.17716041, 0.24354163, 0.52866266, ..., 0.12053584, 0.9071268 ,
0.55058659]])
Another way of creating ds-arrays is using array-like structures like NumPy arrays or lists:
[6]:
x1 = ds.array([[1, 2, 3], [4, 5, 6]], block_size=(1, 3))
x1
[6]:
ds-array(blocks=(...), top_left_shape=(1, 3), reg_shape=(1, 3), shape=(2, 3), sparse=False)
Distributed arrays can also store sparse data in CSR format:
[7]:
from scipy.sparse import csr_matrix
sp = csr_matrix([[0, 0, 1], [1, 0, 1]])
x_sp = ds.array(sp, block_size=(1, 3))
x_sp
[7]:
ds-array(blocks=(...), top_left_shape=(1, 3), reg_shape=(1, 3), shape=(2, 3), sparse=True)
In this case, collect
returns a CSR matrix as well:
[8]:
x_sp.collect()
[8]:
<2x3 sparse matrix of type '<class 'numpy.int64'>'
with 3 stored elements in Compressed Sparse Row format>
Loading data
A typical way of creating ds-arrays is to load data from disk. Dislib currently supports reading data in CSV and SVMLight formats like this:
[9]:
x, y = ds.load_svmlight_file("./files/libsvm/1", block_size=(20, 100), n_features=780, store_sparse=True)
print(x)
csv = ds.load_txt_file("./files/csv/1", block_size=(500, 122))
print(csv)
ds-array(blocks=(...), top_left_shape=(20, 100), reg_shape=(20, 100), shape=(61, 780), sparse=True)
ds-array(blocks=(...), top_left_shape=(500, 122), reg_shape=(500, 122), shape=(4235, 122), sparse=False)
Slicing
Similar to NumPy, ds-arrays support the following types of slicing:
(Note that slicing a ds-array creates a new ds-array)
[10]:
x = ds.random_array((50, 50), (10, 10))
Get a single row:
[11]:
x[4]
[11]:
ds-array(blocks=(...), top_left_shape=(10, 10), reg_shape=(10, 10), shape=(1, 50), sparse=False)
Get a single element:
[12]:
x[2, 3]
[12]:
ds-array(blocks=(...), top_left_shape=(1, 1), reg_shape=(1, 1), shape=(1, 1), sparse=False)
Get a set of rows or a set of columns:
[13]:
# Consecutive rows
print(x[10:20])
# Consecutive columns
print(x[:, 10:20])
# Non consecutive rows
print(x[[3, 7, 22]])
# Non consecutive columns
print(x[:, [5, 9, 48]])
ds-array(blocks=(...), top_left_shape=(10, 10), reg_shape=(10, 10), shape=(10, 50), sparse=False)
ds-array(blocks=(...), top_left_shape=(10, 10), reg_shape=(10, 10), shape=(50, 10), sparse=False)
ds-array(blocks=(...), top_left_shape=(10, 10), reg_shape=(10, 10), shape=(3, 50), sparse=False)
ds-array(blocks=(...), top_left_shape=(10, 10), reg_shape=(10, 10), shape=(50, 3), sparse=False)
Get any set of elements:
[14]:
x[0:5, 40:45]
[14]:
ds-array(blocks=(...), top_left_shape=(10, 10), reg_shape=(10, 10), shape=(5, 5), sparse=False)
Other functions
Apart from this, ds-arrays also provide other useful operations like transpose
and mean
:
[15]:
x.mean(axis=0).collect()
[15]:
array([0.551327 , 0.49044018, 0.47707326, 0.505077 , 0.48609203,
0.5046837 , 0.48576743, 0.50085461, 0.59721606, 0.52625186,
0.54670041, 0.51213254, 0.4790549 , 0.56172745, 0.54560812,
0.52904012, 0.54524971, 0.52370167, 0.44268389, 0.58325103,
0.4754059 , 0.4990062 , 0.55135695, 0.53576298, 0.44728801,
0.57918722, 0.46289081, 0.48786927, 0.46635653, 0.53229369,
0.50648353, 0.53486796, 0.5159688 , 0.50703502, 0.46818427,
0.59886016, 0.48181209, 0.51001161, 0.46914881, 0.57437929,
0.52491673, 0.49711231, 0.50817903, 0.50102322, 0.42250693,
0.51456321, 0.53393824, 0.47924624, 0.49860827, 0.49424366])
[16]:
x.transpose().collect()
[16]:
array([[0.75899854, 0.83108977, 0.28083382, ..., 0.65348721, 0.10747938,
0.13453309],
[0.70515755, 0.22656129, 0.60863163, ..., 0.10640133, 0.3311688 ,
0.50884584],
[0.71224037, 0.95907871, 0.6010006 , ..., 0.41099068, 0.5671029 ,
0.06170055],
...,
[0.39789773, 0.69988175, 0.93784369, ..., 0.24439267, 0.45685381,
0.93017544],
[0.22410234, 0.13491992, 0.75906239, ..., 0.96917569, 0.96204333,
0.14629864],
[0.81496796, 0.96925576, 0.58510411, ..., 0.65520011, 0.05744591,
0.78974985]])
Machine learning with dislib
Dislib provides an estimator-based API very similar to scikit-learn. To run an algorithm, we first create an estimator. For example, a K-means estimator:
[17]:
from dislib.cluster import KMeans
km = KMeans(n_clusters=3)
Now, we create a ds-array with some blob data, and fit the estimator:
[18]:
from sklearn.datasets import make_blobs
# create ds-array
x, y = make_blobs(n_samples=1500)
x_ds = ds.array(x, block_size=(500, 2))
km.fit(x_ds)
[18]:
KMeans(arity=50, init='random', max_iter=10, n_clusters=3,
random_state=<mtrand.RandomState object at 0x7fb51401f9d8>, tol=0.0001,
verbose=False)
Finally, we can make predictions on new (or the same) data:
[19]:
y_pred = km.predict(x_ds)
y_pred
[19]:
ds-array(blocks=(...), top_left_shape=(500, 1), reg_shape=(500, 1), shape=(1500, 1), sparse=False)
y_pred
is a ds-array of predicted labels for x_ds
Let’s plot the results
[20]:
%matplotlib inline
import matplotlib.pyplot as plt
centers = km.centers
# set the color of each sample to the predicted label
plt.scatter(x[:, 0], x[:, 1], c=y_pred.collect())
# plot the computed centers in red
plt.scatter(centers[:, 0], centers[:, 1], c='red')
[20]:
<matplotlib.collections.PathCollection at 0x7fb42b4fc518>

Note that we need to call y_pred.collect()
to retrieve the actual labels and plot them. The rest is the same as if we were using scikit-learn.
Now let’s try a more complex example that uses some preprocessing tools.
First, we load a classification data set from scikit-learn into ds-arrays.
Note that this step is only necessary for demonstration purposes. Ideally, your data should be already loaded in ds-arrays.
[21]:
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
x, y = load_breast_cancer(return_X_y=True)
x_train, x_test, y_train, y_test = train_test_split(x, y)
x_train = ds.array(x_train, block_size=(100, 10))
y_train = ds.array(y_train.reshape(-1, 1), block_size=(100, 1))
x_test = ds.array(x_test, block_size=(100, 10))
y_test = ds.array(y_test.reshape(-1, 1), block_size=(100, 1))
Next, we can see how support vector machines perform in classifying the data. We first fit the model (ignore any warnings in this step):
[22]:
from dislib.classification import CascadeSVM
csvm = CascadeSVM()
csvm.fit(x_train, y_train)
/usr/lib/python3.6/site-packages/dislib-0.4.0-py3.6.egg/dislib/classification/csvm/base.py:374: RuntimeWarning: overflow encountered in exp
k = np.exp(k)
/usr/lib/python3.6/site-packages/dislib-0.4.0-py3.6.egg/dislib/classification/csvm/base.py:342: RuntimeWarning: invalid value encountered in double_scalars
delta = np.abs((w - self._last_w) / self._last_w)
[22]:
CascadeSVM(c=1, cascade_arity=2, check_convergence=True, gamma='auto',
kernel='rbf', max_iter=5, random_state=None, tol=0.001,
verbose=False)
and now we can make predictions on new data using csvm.predict()
, or we can get the model accuracy on the test set with:
[23]:
score = csvm.score(x_test, y_test)
score
represents the classifier accuracy, however, it is returned as a Future
. We need to synchronize to get the actual value:
[24]:
from pycompss.api.api import compss_wait_on
print(compss_wait_on(score))
0.5944055944055944
The accuracy should be around 0.6, which is not very good. We can scale the data before classification to improve accuracy. This can be achieved using dislib’s StandardScaler
.
The StandardScaler
provides the same API as other estimators. In this case, however, instead of making predictions on new data, we transform it:
[25]:
from dislib.preprocessing import StandardScaler
sc = StandardScaler()
# fit the scaler with train data and transform it
scaled_train = sc.fit_transform(x_train)
# transform test data
scaled_test = sc.transform(x_test)
Now scaled_train
and scaled_test
are the scaled samples. Let’s see how SVM perfroms now.
[26]:
csvm.fit(scaled_train, y_train)
score = csvm.score(scaled_test, y_test)
print(compss_wait_on(score))
0.972027972027972
The new accuracy should be around 0.9, which is a great improvement!
Close the session
To finish the session, we need to stop PyCOMPSs:
[27]:
ipycompss.stop()
****************************************************
*************** STOPPING PyCOMPSs ******************
****************************************************
Warning: some of the variables used with PyCOMPSs may
have not been brought to the master.
****************************************************
Further information
Contact: support-compss@bsc.es.