Premiers pas avec apache-spark

Introduction

Prototype:

agrégat(zeroValue, seqOp, combOp)

La description:

aggregate() vous permet de prendre un RDD et de générer une valeur unique d’un type différent de celui qui était stocké dans le RDD d’origine.

Paramètres:

  1. zeroValue : la valeur d’initialisation, pour votre résultat, dans la format.
  2. seqOp : l’opération que vous souhaitez appliquer aux enregistrements RDD. Fonctionne une fois pour every record in a partition.
  3. combOp : Définit comment les objets résultants (un pour chaque partition), gets combined.

Exemple:

Calculer la somme d’une liste et la longueur de cette liste. Renvoie le résultat dans une paire de (sum, length).

Dans un shell Spark, créez une liste de 4 éléments, avec 2 partitions :

listRDD = sc.parallelize([1,2,3,4], 2)

Définissez ensuite seqOp :

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

Définissez ensuite combOp :

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

Puis agrégé :

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

La première partition a la sous-liste [1, 2]. Cela applique le seqOp à chaque élément de cette liste, ce qui produit un résultat local - Une paire de “(somme, longueur)” qui reflétera le résultat localement, uniquement dans cette première partition.

local_result est initialisé au paramètre zeroValue avec lequel aggregate() a été fourni. Par exemple, (0, 0) et list_element est le premier élément de la liste :

0 + 1 = 1
0 + 1 = 1

Le résultat local est (1, 1), ce qui signifie que la somme est 1 et la longueur 1 pour la 1ère partition après avoir traité uniquement le premier élément. local_result est mis à jour de (0, 0) à (1, 1).

1 + 2 = 3
1 + 1 = 2

Le résultat local est maintenant (3, 2), qui sera le résultat final de la 1ère partition, puisqu’il n’y a pas d’autres éléments dans la sous-liste de la 1ère partition. Faire de même pour la 2ème partition revient (7, 2).

Appliquez combOp à chaque résultat local pour former le résultat global final :

(3,2) + (7,2) = (10, 4)

Exemple décrit dans ‘figure’ :

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

[1] : https://spark.apache.org/docs/1.2.0/api/python/pyspark.html?highlight=aggregate#pyspark.RDD.aggregate [2] : http://atlantageek.com/2015/05/30/python-aggregate-rdd/

Transformation vs Action

Spark utilise une évaluation paresseuse ; cela signifie qu’il ne fera aucun travail, à moins qu’il n’y soit vraiment obligé. Cette approche nous permet d’éviter l’utilisation inutile de la mémoire, nous permettant ainsi de travailler avec des mégadonnées.

Une transformation est évaluée paresseusement et le travail réel se produit, lorsqu’une action se produit.

Exemple:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example

Ainsi, dans [1], nous avons dit à Spark de lire un fichier dans un RDD, nommé lines. Spark nous a entendus et nous a dit : “Oui je vais le faire”, mais en fait il n’a pas encore lu le fichier.

Dans [2], nous filtrons les lignes du fichier, en supposant que son contenu contient des lignes avec des erreurs marquées d’une “erreur” au début. Nous disons donc à Spark de créer un nouveau RDD, appelé errors, qui contiendra les éléments des lines RDD, qui avaient le mot error au début.

Maintenant, dans [3], nous demandons à Spark de compter les erreurs, c’est-à-dire de compter le nombre d’éléments que contient le RDD appelé errors. count() est une action, qui ne laisse pas d’autre choix à Spark que d’effectuer l’opération, afin qu’il puisse trouver le résultat de count(), qui sera un entier.

Par conséquent, lorsque [3] est atteint, [1] et [2] seront effectivement exécutés, c’est-à-dire que lorsque nous atteignons [3], alors et alors seulement :

  1. le fichier va être lu dans textFile() (à cause de [1])

  2. lines sera filter()‘ed (à cause de [2])

  3. count() s’exécutera, à cause de [3]


Astuce de débogage : étant donné que Spark ne fera aucun travail réel tant que [3] ne sera pas atteint, il est important de comprendre que si une erreur existe dans [1] et/ou [2 ], il n’apparaîtra pas tant que l’action dans [3] ne déclenchera pas le travail réel de Spark. Par exemple, si vos données dans le fichier ne prennent pas en charge le startsWith() que j’ai utilisé, alors [2] sera correctement accepté par Spark et il ne générera aucune erreur, mais quand [3] est soumis, et Spark évalue réellement à la fois [1] et [2], alors et seulement alors, il comprendra que quelque chose n’est pas correct avec [2] et produira une erreur descriptive.

Par conséquent, une erreur peut être déclenchée lors de l’exécution de [3], mais cela ne signifie pas que l’erreur doit se trouver dans l’instruction de [3] !

Notez que ni lines ni errors ne seront stockés en mémoire après [3]. Ils continueront d’exister uniquement en tant qu’ensemble d’instructions de traitement. Si plusieurs actions sont effectuées sur l’un de ces RDD, Spark lira et filtrera les données plusieurs fois. Pour éviter la duplication des opérations lors de l’exécution de plusieurs actions sur un seul RDD, il est souvent utile de stocker les données en mémoire à l’aide du “cache”.


Vous pouvez voir plus de transformations/actions dans [Spark docs][1].

[1] : http://spark.apache.org/docs/latest/programming-guide.html#transformations

Vérifier la version Spark

Dans spark-shell :

sc.version

Généralement dans un programme :

SparkContext.version

Utilisation de spark-submit :

 spark-submit --version