A Comprehensive Guide to PySpark RDD Operations

Source Node: 1126030

Resilient Distributed Dataset or RDD in a PySpark is a core data structure of PySpark. PySpark RDD’s is a low-level object and are highly efficient in performing distributed tasks. This article will not involve the basics of PySpark such as the creation of PySpark RDDs and PySpark DataFrames. If you are not aware of these terms, I would highly recommend reading my previous article on PySpark here.

PySpark RDD has a set of operations to accomplish any task. These operations are of two types:

1. Transformations

2. Actions

Transformations are a kind of operation that takes an RDD as input and produces another RDD as output. Once a transformation is applied to an RDD, it returns a new RDD, the original RDD remains the same and thus are immutable. After applying the transformation, it creates a Directed Acyclic Graph or DAG for computations and ends after applying any actions on it. This is the reason they are called lazy evaluation processes.

Actions are a kind of operation which are applied on an RDD to produce a single value. These methods are applied on a resultant RDD and produces a non-RDD value, thus removing the laziness of the transformation of RDD.

To conclude in Layman’s Terms, Transformations are applied on an RDD to give another RDD. While Actions are performed on an RDD to give a non-RDD value.

F purposes, we will perform all the following operations in Google Colab. To perform the PySpark RDD Operations, we need to perform some prerequisites in our local machine. If you are also practicing in your local machine, you can follow the following prerequisites.

!pip install pyspark

Next, we will initialize a SparkContext to perform the operations:

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

If you are unable to complete the prerequisites due to any of certain reasons, please refer to the PySpark Installation Guide for Local machines.

Now, we have SparkContext ready with us, we get to perform all the Actions and Transformations coming next.

Actions in PySpark RDDs

In PySpark RDDs, Actions are a kind of operation that returns a value on being applied to an RDD. To learn more about Actions, refer to the Spark Documentation here.

Following are some of the essential PySpark RDD Operations widely used.

1. The .collect() Action

The .collect() action on an RDD returns a list of all the elements of the RDD. It’s a great asset for displaying all the contents of our RDD. Let’s understand this with an example:

collect_rdd = sc.parallelize([1,2,3,4,5])
print(collect_rdd.collect())

On executing this code, we get:

Output

Here we first created an RDD, collect_rdd, using the .parallelize() method of SparkContext. Then we used the .collect() method on our RDD which returns the list of all the elements from collect_rdd.

2. The .count() Action

The .count() action on an RDD is an operation that returns the number of elements of our RDD. This helps in verifying if a correct number of elements are being added in an RDD. Let’s understand this with an example:

count_rdd = sc.parallelize([1,2,3,4,5,5,6,7,8,9])
print(count_rdd.count())

On executing this code, we get:

Output

Here, we first created an RDD, count_rdd, using the .parallelize() method of SparkContext. Then we applied the .count() method on our RDD which returned the number of elements present in our RDD.

3. The .first() Action

The .first() action on an RDD returns the first element from our RDD. This can be helpful when we want to verify if the exact kind of data has been loaded in our RDD as per the requirements. For example, if wanted an RDD with the first 10 natural numbers. We can verify this by checking the first element from our RDD i.e. 1. Let’s understand this with an example:

first_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
print(first_rdd.first())

On executing this code, we get:

Output

Here, we first created an RDD, first_rdd using the .parallelize() method of SparkContext having the first ten natural numbers. Then, we applied the .first() operation on first_rdd. This returned the first element from first_rdd, i.e. 1.

4. The .take() Action

The .take(n) action on an RDD returns n number of elements from the RDD. The ‘n’ argument takes an integer which refers to the number of elements we want to extract from the RDD. Let’s understand this with an example:

take_rdd = sc.parallelize([1,2,3,4,5])
print(take_rdd.take(3))

On executing this code, we get:

Output

Here, we first created an RDD, take_rdd, using the .parallelize() method of SparkContext. Then we applied the .take(3) method on our RDD take_rdd. This returned the first 3 elements in a list from the RDD.

5. The .reduce() Action

The .reduce() Actiontakes two elements from the given RDD and operates. This operation is performed using an anonymous function or lambda. For example, if we want to add all the elements from the given RDD, we can use the .reduce() action.

reduce_rdd = sc.parallelize([1,3,4,6])
print(reduce_rdd.reduce(lambda x, y : x + y))

On executing this code, we get:

Output

Here, we created an RDD, reduce_rdd using .parallelize() method of SparkContext. We used the .reduce action on reduce_rdd with an enclosed anonymous function or lambda. Here, the lambda adds all the elements of the given RDD and prints the sum.

 

6. The .saveAsTextFile() Action

The .saveAsTextFile() Action is used to serve the resultant RDD as a text file. We can also specify the path to which file needed to be saved. This helps in saving our results especially when we are working with a large amount of data.

save_rdd = sc.parallelize([1,2,3,4,5,6])
save_rdd.saveAsTextFile('file.txt')

On executing this code, we get:

Output

Here, we created an RDD, save_rdd using the .parallelize() method of SparkContext. We used the .saveAsTextFile() action on save_rdd to save it into our directory with the name passed as an argument in it as a string type. The .saveAsTextFile() generates a directory with the given argument. Inside the directory, several parts of the file will be created based on the size of the file.

Transformations in PySpark RDDs

Transformations are the kind of operations that are performed on an RDD and return a new RDD. Few of these methods work almost similarly to the functions already present in Python. To learn more about Transformations, refer to the Spark Documentation here.

Now, Let’s look at some of the essential Transformations in PySpark RDD:

1. The .map() Transformation

As the name suggests, the .map() transformation maps a value to the elements of an RDD. The .map() transformation takes in an anonymous function and applies this function to each of the elements in the RDD. For example, If we want to add 10 to each of the elements present in RDD, the .map() transformation would come in handy. This operation saves time and goes with the DRY policy. Let’s understand this with an example:

my_rdd = sc.parallelize([1,2,3,4])
print(my_rdd.map(lambda x: x+ 10).collect())

On executing this, we get:

Here, we created an RDD, my_rdd using the .parallelize() method of SparkContext. Since the .map() is a transformation, it returns a new RDD, thus we used the .collect() action to extract all the resultant elements in a Python list. Here, the anonymous function or lambda performs the same as it works in Python.

2. The .filter() Transformation

A .filter() transformation is an operation in PySpark for filtering elements from a PySpark RDD. The .filter() transformation takes in an anonymous function with a condition. Again, since it’s a transformation, it returns an RDD having elements that had passed the given condition. For example, we want to return only an even number of elements, we can use the .filter() transformation.

Let’s understand this with an example:

filter_rdd = sc.parallelize([2, 3, 4, 5, 6, 7])
print(filter_rdd.filter(lambda x: x%2 == 0).collect())

On executing this code, we get:

Output | PySpark RDD Operations

Here, we first created an RDD, filter_rdd using the .parallelize() method of SparkContext. Then we used the anonymous function lambda to filter the even numbers from our RDD filter_rdd. Since .filter() transformation returns a new RDD, we used the .collect() action to extract all the resultant elements in a list.

We can also filter strings from a certain text present in an RDD. For example, If we want to check the names of persons from a list of guests starting with a certain alphabet, we can use the .filter() for this operation as well. Let’s understand this with an example:

filter_rdd_2 = sc.parallelize(['Rahul', 'Swati', 'Rohan', 'Shreya', 'Priya'])
print(filter_rdd_2.filter(lambda x: x.startswith('R')).collect())

On executing this code, we get:

Output | PySpark RDD Operations

Here, we first created an RDD, filter_rdd_2 using the .parallelize() method of SparkContext. Then we used the .filter() transformation on it to filter the elements of our RDD that start with ‘R‘. We used the .collect() action on the resultant RDD to get all the desired elements in a list.

3. The .union() Transformation

The .union() transformation combines two RDDs and returns the union of the input two RDDs. This can be helpful to extract elements from similar characteristics from two RDDs into a single RDD.

Let’s understand this with an example:

union_inp = sc.parallelize([2,4,5,6,7,8,9])
union_rdd_1 = union_inp.filter(lambda x: x % 2 == 0)
union_rdd_2 = union_inp.filter(lambda x: x % 3 == 0)
print(union_rdd_1.union(union_rdd_2).collect())

On executing this code, we get:

Output | PySpark RDD Operations

Here, we first created an RDD, union_inp using the .parallelize() method of SparkContext. Then we created two more RDDs union_rdd_1 and union_rdd_2 using the .filter() method on input RDD. At last, we created the union of the two filtered RDDs using the .union() transformation.

4. The .flatMap() Transformation

The .flatMap() transformation peforms same as the .map() transformation except the fact that .flatMap() transformation return seperate values for each element from original RDD.

Let’s understand this with an example:

flatmap_rdd = sc.parallelize(["Hey there", "This is PySpark RDD Transformations"])
(flatmap_rdd.flatMap(lambda x: x.split(" ")).collect())

On executing this, we get:

Output | PySpark RDD Operations

Here, we first created an RDD, flatmap_rdd using the .parallelize() method and added two strings to it. Then, we applied the .flatMap() transformation to it to split all the strings into single words. This worked the same as the .split() method in Python lists. Then we used the .collect() method to extract all the resultant elements in a list.

PySpark Pair RDD Operations

PySpark has a dedicated set of operations for Pair RDDs. Pair RDDs are a special kind of data structure in PySpark in the form of key-value pairs, and that’s how it got its name. Practically, the Pair RDDs are used more widely because of the reason that most of the real-world data is in the form of Key/Value pairs. The Pair RDDs use different terminology for key and value. The key is known as the identifier while the value is known as data.

Now, let’s see how to create Pair RDDs in PySpark.

First, we will create a list of tuples. The following tuples will be having students from a class and their average marks out of 100.

marks = [('Rahul', 88), ('Swati', 92), ('Shreya', 83), ('Abhay', 93), ('Rohan', 78)]
sc.parallelize(marks).collect()

This returns a PySpark Pair RDD.

PySpark Pair RDD

Now, we will see a set of methods which are the PySpark operations specifically for Pair RDDs. The same set of Actions is perfectly fine for Pair RDDs that had worked for normal RDDs. But Pair RDDs has a unique set of Transformation operations and comes in handy when we have data in key, value pairs.

Transformations in Pair RDDs

Since Pair RDDs are created from multiple tuples, we need to use operations that make use of keys and values.

Following are the widely used Transformation on a Pair RDD:

1. The .reduceByKey() Transformation

The .reduceByKey() transformation performs multiple parallel processes for each key in the data and combines the values for the same keys. It uses an anonymous function or lambda to perform the task. Since it’s a transformation, it returns an RDD as a result.

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.reduceByKey(lambda x, y: x + y).collect())

On executing this code, we get:

Output after applying .reduceByKey() Transformation

Here,  we created an RDD, marks_rdd using the .parallelize() method of SparkContext and added a list of tuples consisting of marks of students. Then, we applied the .groupByKey() transformation on marks_rdd with an anonymous function enclosing inside the .reduceByKey(). This returns a new RDD and thus we applied the .collect() action to generate the list of resultant elements.

2. The .sortByKey() Transformation

The .sortByKey() transformation sorts the input data by keys from key-value pairs either in ascending or descending order. It returns a unique RDD as a result.

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.sortByKey('ascending').collect())

On executing this code, we get:

Output after applying .sortByKey() Transformation

Here, we created an RDD, marks_rdd using the .parallelize() method of SparkContext and added a list of tuples consisting of marks of students. We applied the .sortByKey() Transformation on this RDD. We also passed ‘ascending‘ (string) as an argument to the .sortByKey() transformation which denotes that we want to sort the keys in ascending order. At last, we used the .collect() method on the resultant RDD to get all the result elements as a list.

3. The .groupByKey() Transformation

The .groupByKey() transformation groups all the values in the given data with the same key together. It returns a new RDD as a result. For example, if we want to extract all the Cultural Members from a list of committee members, the .groupByKey() will come in handy to perform the necessary task.

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.groupByKey().collect()
for key, value in dict_rdd:
    print(key, list(value))

On executing this code, we get:

Output after applying .groupByKey() Transformation

Here, we created an RDD, marks_rdd using the .parallelize() method of SparkContext and added a list of tuples consisting of marks of students. Then, we applied the .groupByKey() transformation on the marks_rdd RDD. Then we used the .collect() action to get the results and saved the results to dict_rdd. Since dict_rdd is a dictionary item type, we applied the for loop on dict_rdd  to get a list of marks for each student in each line. We also added list() to the values since we have more than one subject mark for students.

Actions in Pair RDDs

Even though all of the RDD Actions can be performed on Pair RDDs, there is a set of articles that are specifically designed for Pair RDDs. These Actions will not work on normal RDDs and are to be used only on Pair RDDs. Following are the Actions that are widely used for Key-Value type Pair RDD data:

1. The countByKey() Action

The .countByKey() option is used to count the number of values for each key in the given data. This action returns a dictionary and one can extract the keys and values by iterating over the extracted dictionary using loops. Since we are getting a dictionary as a result, we can also use the dictionary methods such as .keys(), .values() and .items().

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.countByKey().items()
for key, value in dict_rdd:
    print(key, value)

On executing this code, we get:

Output after applying .countByKey() Action

Here, we created an RDD, marks_rdd using the .parallelize() method of SparkContext and added a list of tuples consisting of marks of students. The .countByKey() action returns the dictionary, we saved the dictionary items into variable dict_rdd. Later we iterated over these items and got the count of values for each key.

Conclusion

In this guide, we learned about PySpark RDDs and it’s operations which are widely used. In general, we looked at two types of operations, Transformation, and Actions and the different methods involved in it. For each of RDD and Pair RDD, we looked at a different set of Actions and Transformations. There is a certain number of Transformations that needed to apply only on Pair RDD. Actions discussed for RDDs are versatile and can be used on Pair RDD as well. We also discussed a single action for Pair RDD which is again, exclusive to only Pair RDD and cannot be used for normal RDD as it requires data in key-value [pair type.

Each type of Transformation or Action plays an important role in itself and one can apply them based on the tasks these operations can accomplish. Few more actions can be used based on the requirement of the project. All the discussed operations are very popular and are used in almost every task of Big Data Analysis.

Read my previous articles on PySpark :

1. Beginner’s Guide To Create PySpark DataFrame

2. Essential PySpark DataFrame Column Operations that Data Engineers Should Know

About the Author

Connect with me on LinkedIn.

For any suggestions or article requests, you can email me here.

Check out my other Articles Here and on Medium

You can provide your valuable feedback to me on LinkedIn.

Thanks for giving your time!

Source: https://www.analyticsvidhya.com/blog/2021/10/a-comprehensive-guide-to-pyspark-rdd-operations/

Time Stamp:

More from Analytics Vidhya