Getting started with apache-spark
aggregate(zeroValue, seqOp, combOp)
aggregate() lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.
zeroValue: The initialization value, for your result, in the desired format.
seqOp: The operation you want to apply to RDD records. Runs once for every record in a partition.
combOp: Defines how the resulted objects (one for every partition), gets combined.
Compute the sum of a list and the length of that list. Return the result in a pair of
In a Spark shell, create a list with 4 elements, with 2 partitions:
listRDD = sc.parallelize([1,2,3,4], 2)
Then define seqOp:
seqOp = (lambda local_result, list_element: (local_result + list_element, local_result + 1) )
Then define combOp:
combOp = (lambda some_local_result, another_local_result: (some_local_result + another_local_result, some_local_result + another_local_result) )
listRDD.aggregate( (0, 0), seqOp, combOp) Out: (10, 4)
The first partition has the sublist [1, 2]. This applies the seqOp to each element of that list, which produces a local result - A pair of
(sum, length) that will reflect the result locally, only in that first partition.
local_result gets initialized to the
aggregate() was provided with. For example, (0, 0) and
list_element is the first element of the list:
0 + 1 = 1 0 + 1 = 1
The local result is (1, 1), which means the sum is 1 and the length 1 for the 1st partition after processing only the first element.
local_result gets updated from (0, 0), to (1, 1).
1 + 2 = 3 1 + 1 = 2
The local result is now (3, 2), which will be the final result from the 1st partition, since they are no other elements in the sublist of the 1st partition. Doing the same for 2nd partition returns (7, 2).
Apply combOp to each local result to form the final, global result:
(3,2) + (7,2) = (10, 4)
Example described in ‘figure’:
(0, 0) <-- zeroValue [1, 2] [3, 4] 0 + 1 = 1 0 + 3 = 3 0 + 1 = 1 0 + 1 = 1 1 + 2 = 3 3 + 4 = 7 1 + 1 = 2 1 + 1 = 2 | | v v (3, 2) (7, 2) \ / \ / \ / \ / \ / \ / ------------ | combOp | ------------ | v (10, 4)
Transformation vs Action
Spark uses lazy evaluation; that means it will not do any work, unless it really has to. That approach allows us to avoid unnecessary memory usage, thus making us able to work with big data.
A transformation is lazy evaluated and the actual work happens, when an action occurs.
In : lines = sc.textFile(file) // will run instantly, regardless file's size In : errors = lines.filter(lambda line: line.startsWith("error")) // run instantly In : errorCount = errors.count() // an action occurred, let the party start! Out: 0 // no line with 'error', in this example
 we told Spark to read a file into an RDD, named
lines. Spark heard us and told us: “Yes I will do it”, but in fact it didn’t yet read the file.
In 2, we are filtering the lines of the file, assuming that its contents contain lines with errors that are marked with an
error in their start. So we tell Spark to create a new RDD, called
errors, which will have the elements of the RDD
lines, that had the word
error at their start.
, we ask Spark to count the errors, i.e. count the number of elements the RDD called
count() is an action, which leave no choice to Spark, but to actually make the operation, so that it can find the result of
count(), which will be an integer.
As a result, when
 is reached,
 will actually being performed, i.e. that when we reach
, then and only then:
the file is going to be read in
filter()‘ed (because of
count()will execute, because of
Debug tip: Since Spark won’t do any real work until
 is reached, it is important to understand that if an error exist in
, it won’t appear, until the action in
 triggers Spark to do actual work. For example if your data in the file do not support the
startsWith() I used, then
 is going to be properly accepted by Spark and it won’t raise any error, but when
 is submitted, and Spark actually evaluates both
, then and only then it will understand that something is not correct with
 and produce a descriptive error.
As a result, an error may be triggered when
 is executed, but that doesn’t mean that the error must lie in the statement of
errors will be stored in memory after
. They will continue to exist only as a set of processing instructions. If there will be multiple actions performed on either of these RDDs, spark will read and filter the data multiple times. To avoid duplicating operations when performing multiple actions on a single RDD, it is often useful to store data into memory using
You can see more transformations/actions in Spark docs.
Check Spark version
Generally in a program: