top of page

Data Science in Drilling - Episode 29

RDDs in Spark - Part I


written by Zeyu Yan, Ph.D., Head of Data Science from Nvicta AI


Data Science in Drilling is a multi-episode series written by the technical team members in Nvicta AI. Nvicta AI is a startup company who helps drilling service companies increase their value offering by providing them with advanced AI and automation technologies and services. The goal of this Data Science in Drilling series is to provide both data engineers and drilling engineers an insight of the state-of-art techniques combining both drilling engineering and data science.


In today's blog post, we will focus on Resilient Distributed Dataset (RDD) in Spark.


A Resilient Distributed Dataset (RDD), the basic abstraction in Spark, represents an immutable, partitioned collection of elements that can be operated on in parallel. All the higher level APIs like the DataFrame API in spark rely on RDD underneath. To learn RDD programming, the first step is to import the necessary dependencies as always:

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("testApp").setMaster("local[4]")
sc = SparkContext.getOrCreate(conf)

Here local[4] means to run Spark locally with 4 cores. Recall that when using the higher level DataFrame API, we use SparkSession, but here to work with the lower level RDD API, we use SparkContext.


Create RDD


The simplest way to create a RDD is to use the parallelize method. We will create a Python List and convert it into a RDD using the parallelize method.

words = sc.parallelize(
    ["scala",
     "java",
     "spark",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
print(words)
print(words.collect())

The results are:

ParallelCollectionRDD[150] at parallelize at PythonRDD.scala:166
['scala', 'java', 'spark', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

The collect method collects all the distributed data from all partitions into the driver program of Spark. If the volume of data is really large, there may be potential out of memory issues. Therefore in production environment, we should be careful when using the collect method.


Another way of creating RDD objects is through the textFile method of SparkContext, which creates a RDD directly from a text file. For example:

path = '/somePath/testText.txt'  
rdd = sc.textFile(path)

Change the Default Number of Partitions


There are two ways to change the default number of partitions in PySpark, the first way is through the defaultParallelism parameter on SparkContext:

SparkContext.defaultParallelism=5
print(sc.parallelize([0, 2, 3, 4, 6]).glom().collect())
SparkContext.defaultParallelism=8
print(sc.parallelize([0, 2, 3, 4, 6]).glom().collect())

The results are:

[[0], [2], [3], [4], [6]]
[[], [0], [], [2], [3], [], [4], [6]]

The glom method returns an RDD created by coalescing all elements within each partition into a list. Changing the default number of partitions can also be realized through the repartition method on the RDD object:

rdd = sc.parallelize([0, 2, 3, 4, 6])
rdd.repartition(2).glom().collect()

The result is:

[[2, 4], [0, 3, 6]]

Count


Let's take a look at the following example using the words RDD which we created earlier:

counts = words.count()
print("Number of elements in RDD -> %i" % counts)
print("Number of every elements in RDD -> %s" % words.countByKey())
print("Number of every elements in RDD -> %s" % words.countByValue())

The results are:

Number of elements in RDD -> 9
Number of every elements in RDD -> defaultdict(<class 'int'>, {'s': 4, 'j': 1, 'h': 1, 'a': 1, 'p': 2})
Number of every elements in RDD -> defaultdict(<class 'int'>, {'scala': 1, 'java': 1, 'spark': 2, 'hadoop': 1, 'akka': 1, 'spark vs hadoop': 1, 'pyspark': 1, 'pyspark and spark': 1})

It can seen that the count method just counts the total number of elements in the RDD. The countByValue method counts the number of elements by their values. The countByKey method counts the number of elements by their keys. In this case, the keys are the first letters of the words.


Filter


RDD's filter method filters the data in each partition. Take a look at the following example:

words_filtered = words.filter(lambda x: "spark" in x)
results = words_filtered.glom().collect()
print("Results -> %s" % (results))

The results are:

Results -> [[], ['spark'], ['spark'], ['spark vs hadoop', 'pyspark', 'pyspark and spark']]

The results are a list of for lists, because there are 4 partitions.


Map and FlatMap


RDD's map method applies a function to each element in the RDD. While the flatMap method returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results. Take a look at the following example:

mapped_results = words.map(lambda x: (x, len(x))).collect()
print("Mapped results -> %s" % (mapped_results))
words.flatMap(lambda x: (x, len(x))).collect()

The results are:

Mapped results -> [('scala', 5), ('java', 4), ('spark', 5), ('hadoop', 6), ('spark', 5), ('akka', 4), ('spark vs hadoop', 15), ('pyspark', 7), ('pyspark and spark', 17)]
['scala', 5, 'java', 4, 'spark', 5,'hadoop', 6, 'spark', 5, 'akka', 4, 'spark vs hadoop', 15, 'pyspark', 7, 'pyspark and spark', 17]

Distinct


RDD's distinct method returns only distinct values from the original RDD. Take a look at the following example:

sc.parallelize([1, 2, 2, 3, 4, 4, 5]).distinct().collect()

The result is:

[1, 2, 3, 4, 5]

Reduce


RDD's reduce method reduces the elements of this RDD using the specified commutative and associative binary operator. Let's take a look at the following example:


def add(a,b):
    c = a + b
    print(str(a) + ' + ' + str(b) + ' = ' + str(c))
    return c
    
nums = sc.parallelize([1, 2, 3, 4, 5])
result = nums.reduce(add)
print("Reduce result -> %i" % (result))

The results are:

1 + 2 = 3
3 + 3 = 6
6 + 4 = 10
10 + 5 = 15
Reduce result -> 15

Conclusions


In this article, we covered basics about Spark RDD. We will cover great details about Spark in this upcoming Spark tutorial series. Stay tuned!


Get in Touch


Thank you for reading! Please let us know if you like this series or if you have critiques. If this series was helpful to you, please follow us and share this series to your friends.


If you or your company needs any help on projects related to drilling automation and optimization, AI, and data science, please get in touch with us Nvicta AI. We are here to help. Cheers!


32 views0 comments

Recent Posts

See All
bottom of page