Primeros pasos con apache-spark

Introducción

Prototipo:

agregado (valor cero, seqOp, combOp)

Descripción:

aggregate() le permite tomar un RDD y generar un valor único que es de un tipo diferente al que se almacenó en el RDD original.

Parámetros:

  1. zeroValue: El valor de inicialización, para su resultado, en el deseado format.
  2. seqOp: la operación que desea aplicar a los registros RDD. Se ejecuta una vez por every record in a partition.
  3. combOp: Define cómo los objetos resultantes (uno para cada partición), gets combined.

Ejemplo:

Calcular la suma de una lista y la longitud de esa lista. Devuelve el resultado en un par de (sum, length).

En un shell de Spark, cree una lista con 4 elementos, con 2 particiones:

listRDD = sc.parallelize([1,2,3,4], 2)

Luego defina seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

Luego define combOp:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

Luego agregado:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

La primera partición tiene la sublista [1, 2]. Esto aplica seqOp a cada elemento de esa lista, lo que produce un resultado local: un par de (suma, longitud) que reflejarán el resultado localmente, solo en esa primera partición.

local_result se inicializa en el parámetro zeroValue que se proporcionó con aggregate(). Por ejemplo, (0, 0) y list_element es el primer elemento de la lista:

0 + 1 = 1
0 + 1 = 1

El resultado local es (1, 1), lo que significa que la suma es 1 y la longitud 1 para la primera partición después de procesar solo el primer elemento. local_result se actualiza de (0, 0) a (1, 1).

1 + 2 = 3
1 + 1 = 2

El resultado local ahora es (3, 2), que será el resultado final de la primera partición, ya que no hay otros elementos en la sublista de la primera partición. Haciendo lo mismo para los retornos de la segunda partición (7, 2).

Aplique combOp a cada resultado local para formar el resultado global final:

(3,2) + (7,2) = (10, 4)

Ejemplo descrito en ‘figura’:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Transformación vs Acción

Spark usa evaluación perezosa; eso significa que no hará ningún trabajo, a menos que realmente tenga que hacerlo. Ese enfoque nos permite evitar el uso innecesario de la memoria, lo que nos permite trabajar con big data.

Una transformación se evalúa de forma perezosa y el trabajo real ocurre cuando ocurre una acción.

Ejemplo:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example

Entonces, en [1] le dijimos a Spark que leyera un archivo en un RDD, llamado lines. Spark nos escuchó y nos dijo: “Sí, lo haré”, pero de hecho todavía no leyó el archivo.

En 2, estamos filtrando las líneas del archivo, asumiendo que su contenido contiene líneas con errores que están marcados con un error en su inicio. Así que le decimos a Spark que cree un nuevo RDD, llamado errors, que tendrá los elementos de las líneas de RDD, que tenían la palabra error al comienzo.

Ahora, en [3], le pedimos a Spark que cuente los errores, es decir, cuente la cantidad de elementos que tiene el RDD llamado errores. count() es una acción, que no deja opción a Spark, sino realizar la operación, de modo que pueda encontrar el resultado de count(), que será un número entero.

Como resultado, cuando se alcanza [3], [1] y [2] se ejecutarán, es decir, cuando lleguemos a [3], entonces y solo entonces:

  1. el archivo se leerá en textFile() (debido a [1])

  2. Las líneas serán filter()‘ed (debido a [2])

  3. count() se ejecutará, debido a [3]


Consejo de depuración: dado que Spark no hará ningún trabajo real hasta que se alcance el [3], es importante comprender que si existe un error en [1] y/o [2 ], no aparecerá hasta que la acción en [3] haga que Spark realice el trabajo real. Por ejemplo, si sus datos en el archivo no son compatibles con startsWith() que utilicé, entonces Spark aceptará correctamente [2] y no generará ningún error, pero cuando [3] se envía, y Spark realmente evalúa tanto [1] como [2], entonces y solo entonces entenderá que algo no es correcto con [2] y producirá un error descriptivo.

Como resultado, se puede desencadenar un error cuando se ejecuta [3], ¡pero eso no significa que el error deba estar en la declaración de [3]!

Tenga en cuenta que ni las líneas ni los errores se almacenarán en la memoria después de [3]. Continuarán existiendo solo como un conjunto de instrucciones de procesamiento. Si se realizarán varias acciones en cualquiera de estos RDD, Spark leerá y filtrará los datos varias veces. Para evitar la duplicación de operaciones al realizar varias acciones en un solo RDD, a menudo es útil almacenar datos en la memoria usando caché.


Puedes ver más transformaciones/acciones en Spark docs.

Comprobar la versión de Spark

En spark-shell:

sc.version

Generalmente en un programa:

SparkContext.version

Usando spark-submit:

 spark-submit --version