自己写的Spark入门实战教程,适合于有一定hadoop和数据分析经验的朋友。
Spark简介
Spark是一个开源的计算框架平台,使用该平台,数据分析程序可自动分发到集群中的不同机器中,以解决大规模数据快速计算的问题,同时它还向上提供一个优雅的编程范式,使得数据分析人员通过编写类似于本机的数据分析程序即可实现集群并行计算。
Spark项目由多个紧密集成的组件组成。
核心是Spark Core组件
,它实现了Spark的基本功能,包括:任务调度、内存管理、错误恢复、与存储系统交互等模块,特别的,Spark Core还定义了弹性分布式数据集(RDD)的API,是Spark内存计算与并行计算的主要编程抽象。
在Spark Core上有一系列软件栈,用于满足了各种不同数据分析计算任务需求,包括连接关系型数据库或Hadoop Hive的SQL/HQL的查询组件Spark SQL,对实时数据进行流式计算的组件Spark Steaming,支持常见机器学习算法并行计算组件MLlib,支持并行图计算组件GraphX等。
为了进一步支持在数千个计算节点上的伸缩计算,Spark Core底层支持在各种集群管理器上运行,包括Hadoop YARN、Apache Mesos,或者Spark自带的Standalone独立调度器。
Spark部署
安装Spark比较简单,只要在机器上配置好最新版JAVA环境,下载编译好的Spark软件包后即可在本地运行。当然,也可以根据具体环境,使用Maven编译需要的Spark功能。
Spark部署有两种方式,一是本地部署,二是集群部署。前者只需启动本地的交互式环境spark-shell.sh脚本即可,常用在本机快速程序测试,后者的应用场景更多些,具体根据集群环境不同,可部署在简易的Spark独立调度集群上、部署在Hadoop YARN集群上、或部署在Apache Mesos上等。
其中,Spark自带的独立调度器是最简单实现Spark集群环境的一种方式,只需在多台联网计算机上安装好Spark,然后在其中一台启动集群管理器(通过start-master.sh脚本),然后再在其他计算机上启动工作节点(通过start-slave.sh脚本),并连接到管理器上即可。
Spark编程
使用Spark编程,需要先在本机安装好Spark环境,然后启动Spark上下文管理器连接到本机(本地部署)或是集群上的集群管理器(集群部署),再使用Spark提供的抽象接口编程即可。
支持Spark的原生语言是Scala,一种支持JVM的脚本语言,可以避免其他语言在做数据转化过程的性能或信息丢失。但随着Spark项目的不断完善,使用Python和PySpark包、或者R和SparkR包进行Spark编程也都是不错的选择。
不论使用何种编程语言,使用Spark进行数据分析的关键在于掌握Spark抽象的编程范式,其基本流程包括4步:
初始化SparkContext
。SparkContext即是Spark上下文管理器(也称为驱动器程序),它主要负责向Spark工作节点上发送指令并获得计算结果,但数据分析人员无需关注具体细节,只需使用SparkContext接口编程即可。
创建RDD
。弹性分布数据集RDD是Spark在多机进行并行计算的核心数据结构,因此使用Spark进行数据分析,首先需使用SparkContext将外部数据读入到Spark集群内。
设计数据转化操作
。即操作的结果是返回一个新的RDD,即在图计算中只是一个中间节点。类比于Hadoop的Map()映射算子,但又不仅于此,Spark还支持filter()过滤算子、distinct()去重算子、sample()采样算子,以及多个RDD集合的交差补并等集合操作。
设计数据执行操作
。即操作的结果向SparkContext返回结果,或者将结果写入外部操作系统。类比于Hadoop的Reduce()算子,按某函数操作两个数据并返回一个同类型的数据,此外Spark还支持collect()直接返回结果算子、count()计数算子、take()/top()返回部分数据算子、foreach()迭代计算算子等操作。
Spark编程范式的本质是有向无环图方式的惰性计算
,即当使用上述方式进行编程后,Spark将自动将上述RDD和转化算子转换为有向无环图的数据工作流,只有当触发执行算子时,才按需进行数据工作流的计算。此外,为进一步提高计算效率,Spark默认将在内存中执行,并自动进行内存分配管理,当然分析人员也可根据需求通过persist()算子将中间步骤数据显式的将内存数据持久化到磁盘中,以方便调试或复用。
在R环境下使用Spark实例
最新版的RStudio已经较完整的集成了Spark数据分析功能,可以在SparkR官方扩展接口基础上更方便的使用Spark,主要需要安装两个包,分别是sparklyr和dplyr。其中,sparklyr包提供了更简洁易用的Spark R编程接口,dplyr包提供了一个语法可扩展的数据操作接口,支持与主流SQL/NoSQL数据库连接,同时使数据操作与数据集数据结构解耦合,并且和Spark原生算子可基本对应。
若第一次运行,先在本机安装必要的包和Spark环境:
之后运行下面的小例子,可以发现,除了需要初始化SparkContext、导入RDD数据和导出数据外,其他数据处理操作都与在本机做数据分析是一样的。
此外,除了dplyr接口外,sparklyr还封装了一套特征工程和常用机器学习算法,足以满足80%常见的数据分析与挖掘工作,至于剩余的20%定制算法或是流处理、图计算等任务,便需要了解更多高阶的Spark接口来实现了。
"
哪些spark大数据开发的实例可以用来快速入门?
最好用pyspark的代码来实现。
感请,我也是Java出身,给你分享一点个人经验,希望能帮到你。下面给你分享一个实例!
下面我们基于该文件进行 Spark Shell 操作。
1)利用本地文件系统的一个文本文件创建一个新 RDD。
scala>var textFile = sc.textFile(“file://home/Hadoop/SparkData/WordCount/text1”);
textFile:org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
:12
2)执行动作操作,计算文档中有多少行。
scala>textFile.count() //RDD中有多少行
17/05/17 22:59:07 INFO spark.SparkContext:Job finished:count at:15, took 5.654325469 s
resl:Long = 2
返回结果表明文档中有“2”行。
3)执行动作操作,获取文档中的第一行内容。
scala>textFile.first() // RDD 第一行的内容
17/05/17 23:01:25 INFO spark.SparkContext:Job finished:first at :15,took
返回结果表明文档的第一行内容是“hello world”。
4)转换操作会将一个 RDD 转换成一个新的 RDD。获取包含“hello”的行的代码如下。
scala>var newRDD = textFile.filter (line => line.contains(“hello”)) //有多少行含有 hello
scala>newRDD.ount() // 有多少行含 hello
17/05/17 23:06:33 INFO spark.SparkContext:Job finished:count at :15,took 0.867975549 s
res4:Long = 2
这段代码首先通过转换操作 filter 形成一个只包括含有“hello”的行的 RDD,然后再通过 count 计算有多少行。
5)Spark Shell 的 WordCount 实现
scala> val file = sc.textFile (“file://home/hendoop/SparkData/WordCount/text1”));
scala> val count = file.flatMap(line=>line.split(“”)).map(word => (word,1)).reduceByKey(_+_)
scala> count.collect()
17/05/17 23:11:46 INFO spark.SparkContext:Job finished: collect at:17,
took 1.624248037 s
res5: Array[(String, Int)] = Array((hello,2),(world,1),(My,1),(is,1),(love,1),(I,1),(John,1),(hadoop,1),(name,1),(programming,1))
使用 sparkContext 类中的 textFile() 读取本地文件,并生成 MappedBJDD。
使用 flatMap() 方法将文件内容按照空格拆分单词,拆分形成 FlatMappedRDD。
使用 map(word=>(word,1)) 将拆分的单词形成 数据对,此时生成 MappedBJDD。
使用 reduceByKey() 方法对单词的频度进行统计,由此生成 ShuffledRDD,并由 collect 运行作业得出结果。
最近在学习pyspark,有入门指南吗?
Spark提供了一个Python_Shell,即pyspark,从而可以以交互的方式使用Python编写Spark程序。
有关Spark的基本架构介绍参考http://blog.csdn.net/cymy001/article/details/78483614;
有关Pyspark的环境配置参考http://blog.csdn.net/cymy001/article/details/78430892。
pyspark里最核心的模块是SparkContext(简称sc),最重要的数据载体是RDD。RDD就像一个NumPy array或者一个Pandas Series,可以视作一个有序的item集合。只不过这些item并不存在driver端的内存里,而是被分割成很多个partitions,每个partition的数据存在集群的executor的内存中。
引入Python中pyspark工作模块
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数(比如主节点的URL)。初始化后,就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量。Spark shell会自动初始化一个SparkContext(在Scala和Python下可以,但不支持Java)。
#getOrCreate表明可以视情况新建session或利用已有的session
1
2
3
4
5
6
7
SparkSession是Spark 2.0引入的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。 在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点。SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。
初始化RDD的方法
(1)本地内存中已经有一份序列数据(比如python的list),可以通过sc.parallelize去初始化一个RDD。当执行这个操作以后,list中的元素将被自动分块(partitioned),并且把每一块送到集群上的不同机器上。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#(a)利用list创建一个RDD;使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame转成Spark RDD。
rdd = sc.parallelize([1,2,3,4,5])
rdd
#Output:ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480
#(b)getNumPartitions()方法查看list被分成了几部分
rdd.getNumPartitions()
#Output:4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#(c)glom().collect()查看分区状况
rdd.glom().collect()
#Output:[[1], [2], [3], [4, 5]]
1
2
3
在这个例子中,是一个4-core的CPU笔记本;Spark创建了4个executor,然后把数据分成4个块。colloect()方法很危险,数据量上BT文件读入会爆掉内存……
(2)创建RDD的另一个方法是直接把文本读到RDD。文本的每一行都会被当做一个item,不过需要注意的一点是,Spark一般默认给定的路径是指向HDFS的,如果要从本地读取文件的话,给一个file://开头(windows下是以file:开头)的全局路径。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#(a)记录当前pyspark工作环境位置
import os
cwd=os.getcwd()
cwd
#Output:'C:UsersYu0JulyLearn5weekhadoopspark'
#(b)要读入的文件的全路径
rdd=sc.textFile("file:" + cwd + "namesyob1880.txt")
rdd
#Output:file:C:UsersYu0JulyLearn5weekhadoopspark
amesyob1880.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
#(c)first()方法取读入的rdd数据第一个item
rdd.first()
#Output:'Mary,F,7065'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
甚至可以sc.wholeTextFiles读入整个文件夹的所有文件。但是要特别注意,这种读法,RDD中的每个item实际上是一个形如(文件名,文件所有内容)的元组。读入整个文件夹的所有文件。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#记录当前pyspark工作环境位置
import os
cwd=os.getcwd()
cwd
#Output:'C:UsersYu0JulyLearn5weekhadoopspark'
rdd = sc.wholeTextFiles("file:" + cwd + "namesyob1880.txt")
rdd
#Output:org.apache.spark.api.java.JavaPairRDD@12bcc15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rdd.first()
Output:
('file:/C:/Users/Yu/0JulyLearn/5weekhadoopspark/names/yob1880.txt',
1
2
3
4
5
其余初始化RDD的方法,包括:HDFS上的文件,Hive中的数据库与表,Spark SQL得到的结果。这里暂时不做介绍。
RDD Transformation
(1)RDDs可以进行一系列的变换得到新的RDD,有点类似列表推导式的操作,先给出一些RDD上最常用到的transformation:
map() 对RDD的每一个item都执行同一个操作
flatMap() 对RDD中的item执行同一个操作以后得到一个list,然后以平铺的方式把这些list里所有的结果组成新的list
filter() 筛选出来满足条件的item
distinct() 对RDD中的item去重
sample() 从RDD中的item中采样一部分出来,有放回或者无放回
sortBy() 对RDD中的item进行排序
1
2
3
4
5
6
如果想看操作后的结果,可以用一个叫做collect()的action把所有的item转成一个Python list。数据量大时,collect()很危险……
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
numbersRDD = sc.parallelize(range(1,10+1))
print(numbersRDD.collect())
#map()对RDD的每一个item都执行同一个操作
squaresRDD = numbersRDD.map(lambda x: x**2) # Square every number
print(squaresRDD.collect())
#filter()筛选出来满足条件的item
filteredRDD = numbersRDD.filter(lambda x: x % 2 == 0) # Only the evens
print(filteredRDD.collect())
#Output:
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
#[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
#[2, 4, 6, 8, 10]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#flatMap() 对RDD中的item执行同一个操作以后得到一个list,然后以平铺的方式把这些list里所有的结果组成新的list
sentencesRDD=sc.parallelize(['Hello world','My name is Patrick'])
wordsRDD=sentencesRDD.flatMap(lambda sentence: sentence.split(" "))
print(wordsRDD.collect())
print(wordsRDD.count())
#Output:
#['Hello', 'world', 'My', 'name', 'is', 'Patrick']
#6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
对比一下:
这里如果使用map的结果是[[‘Hello’, ‘world’], [‘My’, ‘name’, ‘is’, ‘Patrick’]],
使用flatmap的结果是全部展开[‘Hello’, ‘world’, ‘My’, ‘name’, ‘is’, ‘Patrick’]。
flatmap即对应Python里的如下操作:
l = ['Hello world', 'My name is Patrick']
ll = []
for sentence in l:
ll = ll + sentence.split(" ") #+号作用,two list拼接
ll
1
2
3
4
5
(2)最开始列出的各个Transformation,可以一个接一个地串联使用,比如:
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
def doubleIfOdd(x):
if x % 2 == 1:
return 2 * x
else:
return x
numbersRDD = sc.parallelize(range(1,10+1))
resultRDD = (numbersRDD
.map(doubleIfOdd) #map,filter,distinct()
.filter(lambda x: x > 6)
.distinct()) #distinct()对RDD中的item去重
resultRDD.collect()
#Output:[8, 10, 18, 14]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(3)当遇到更复杂的结构,比如被称作“pair RDDs”的以元组形式组织的k-v对(key, value),Spark中针对这种item结构的数据,定义了一些transform和action:
reduceByKey(): 对所有有着相同key的items执行reduce操作
groupByKey(): 返回类似(key, listOfValues)元组的RDD,后面的value List 是同一个key下面的
sortByKey(): 按照key排序
countByKey(): 按照key去对item个数进行统计
collectAsMap(): 和collect有些类似,但是返回的是k-v的字典
1
2
3
4
5
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
rdd=sc.parallelize(["Hello hello", "Hello New York", "York says hello"])
resultRDD=(rdd
.flatMap(lambda sentence:sentence.split(" "))
.map(lambda word:word.lower())
.map(lambda word:(word, 1)) #将word映射成(word,1)
.reduceByKey(lambda x, y: x + y)) #reduceByKey对所有有着相同key的items执行reduce操作
resultRDD.collect()
#Output:[('hello', 4), ('york', 2), ('says', 1), ('new', 1)]
result = resultRDD.collectAsMap() #collectAsMap类似collect,以k-v字典的形式返回
result
#Output:{'hello': 4, 'new': 1, 'says': 1, 'york': 2}
resultRDD.sortByKey(ascending=True).take(2) #sortByKey按键排序
#Output:[('hello', 4), ('new', 1)]
#取出现频次最高的2个词
print(resultRDD
.sortBy(lambda x: x[1], ascending=False)
.take(2))
#Output:[('hello', 4), ('york', 2)]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
RDD间的操作
(1)如果有2个RDD,可以通过下面这些操作,对它们进行集合运算得到1个新的RDD
rdd1.union(rdd2): 所有rdd1和rdd2中的item组合(并集)
rdd1.intersection(rdd2): rdd1 和 rdd2的交集
rdd1.substract(rdd2): 所有在rdd1中但不在rdd2中的item(差集)
rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡尔乘积(正交和)
1
2
3
4
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#初始化两个RDD
numbersRDD = sc.parallelize([1,2,3])
moreNumbersRDD = sc.parallelize([2,3,4])
1
2
3
4
5
6
7
8
9
numbersRDD.union(moreNumbersRDD).collect() #union()取并集
#Output:[1, 2, 3, 2, 3, 4]
numbersRDD.intersection(moreNumbersRDD).collect() #intersection()取交集
#Output:[2, 3]
numbersRDD.subtract(moreNumbersRDD).collect() #substract()取差集
#Output:[1]
numbersRDD.cartesian(moreNumbersRDD).collect() #cartesian()取笛卡尔积
#Output:[(1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4)]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(2)在给定2个RDD后,可以通过一个类似SQL的方式去join它们
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
# Home of different people
homesRDD = sc.parallelize([
('Brussels', 'John'),
('Brussels', 'Jack'),
('Leuven', 'Jane'),
('Antwerp', 'Jill'),
])
# Quality of life index for various cities
lifeQualityRDD = sc.parallelize([
('Brussels', 10),
('Antwerp', 7),
('RestOfFlanders', 5),
])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
homesRDD.join(lifeQualityRDD).collect() #join
#Output:
#[('Antwerp', ('Jill', 7)),
# ('Brussels', ('John', 10)),
# ('Brussels', ('Jack', 10))]
homesRDD.leftOuterJoin(lifeQualityRDD).collect() #leftOuterJoin
#Output:
#[('Antwerp', ('Jill', 7)),
# ('Leuven', ('Jane', None)),
# ('Brussels', ('John', 10)),
# ('Brussels', ('Jack', 10))]
homesRDD.rightOuterJoin(lifeQualityRDD).collect() #rightOuterJoin
#Output:
#[('Antwerp', ('Jill', 7)),
# ('RestOfFlanders', (None, 5)),
# ('Brussels', ('John', 10)),
# ('Brussels', ('Jack', 10))]
homesRDD.cogroup(lifeQualityRDD).collect() #cogroup
#Output:
#[('Antwerp',
# (,
# )),
# ('RestOfFlanders',
# (,
# )),
# ('Leuven',
# (,
# )),
# ('Brussels',
# (,
# ))]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# Oops! Those s are Spark's way of returning a list
# that we can walk over, without materializing the list.
# Let's materialize the lists to make the above more readable:
(homesRDD
.cogroup(lifeQualityRDD)
.map(lambda x:(x[0], (list(x[1][0]), list(x[1][1]))))
.collect())
#Output:
#[('Antwerp', (['Jill'], [7])),
# ('RestOfFlanders', ([], [5])),
# ('Leuven', (['Jane'], [])),
# ('Brussels', (['John', 'Jack'], [10]))]
1
2
3
4
5
6
7
8
9
10
11
12
13
惰性计算,actions方法
特别注意:Spark的一个核心概念是惰性计算。当你把一个RDD转换成另一个的时候,这个转换不会立即生效执行!!!Spark会把它先记在心里,等到真的有actions需要取转换结果时,才会重新组织transformations(因为可能有一连串的变换)。这样可以避免不必要的中间结果存储和通信。
常见的action如下,当它们出现的时候,表明需要执行上面定义过的transform了:
collect(): 计算所有的items并返回所有的结果到driver端,接着 collect()会以Python list的形式返回结果
first(): 和上面是类似的,不过只返回第1个item
take(n): 类似,但是返回n个item
count(): 计算RDD中item的个数
top(n): 返回头n个items,按照自然结果排序
reduce(): 对RDD中的items做聚合
1
2
3
4
5
6
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
rdd = sc.parallelize(range(1,10+1))
rdd.reduce(lambda x, y: x + y) #reduce(): 对RDD中的items做聚合
#Output:55
1
2
3
4
5
6
7
8
9
10
reduce的原理:先在每个分区(partition)里完成reduce操作,然后再全局地进行reduce。
有时候需要重复用到某个transform序列得到的RDD结果。但是一遍遍重复计算显然是要开销的,所以我们可以通过一个叫做cache()的操作把它暂时地存储在内存中。缓存RDD结果对于重复迭代的操作非常有用,比如很多机器学习的算法,训练过程需要重复迭代。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
import numpy as np
numbersRDD = sc.parallelize(np.linspace(1.0, 10.0, 10))
squaresRDD = numbersRDD.map(lambda x: x**2)
squaresRDD.cache() # Preserve the actual items of this RDD in memory
avg = squaresRDD.reduce(lambda x, y: x + y) / squaresRDD.count()
print(avg)
#Output:38.5