Python: Spark 学习笔记
May 28, 2015
Table of Contents
Spark 学习笔记
Spark作为分布式系统框架的后起之秀(据说)各方面都要优于hadoop,有本地缓存,原生支持潮潮的scala, 对python的支持强大到直接可以用ipython notebook 作为交互界面。 还有几个正在开发中的分支框架分别用于机器学习和社交网络分析,简直华丽的飞起。
安装
根据阿帕奇基金会的尿性,应该是没有所谓的安装一说的,下载,解压就能使用了,当然不要忘了设置环境变量,具体不啰嗦.
使用
spark全面支持scala和java,部分支持python。好在scala也是脚本语言,所以相对比hadoop方便的多,先来看看两种交互模式。
pyspark
命令行使用
利用
利用
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --pylab inline>" pyspark
调用ipython notebook
随便进去个试试示例的程序,还是用WordCount好了
file = sc.textFile("hdfs://localhost:9000/user/huangsizhe/input/README.md>") count = file.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b) count.saveAsTextFile("hdfs://localhost:9000/user/huangsizhe/wc>")
成功输出~
对了,为了避免hadoop警告建议把native lib 屏蔽了
spark-shell
这个是最常用的工具了,开发语言是scala,scala的话可以用脚本也可以编译执行,我觉得还是脚本方便
使用写的脚本方法是
$spark-shell -i XXX.scala
这样就会一条一条的执行了
还是WordCount
val file = sc.textFile("hdfs://localhost:9000/user/huangsizhe/input/README.md>") val counts = file.flatMap(line => line.split(" >")).map(word => (word, 1)).reduceByKey(_ + _) counts.saveAsTextFile("hdfs://localhost:9000/user/huangsizhe/wc>")
执行
$spark-shell -i WordCount.scala
成功!
spark-submit
这个就有点像hadoop了,一般用于写app,可以使用python,java,scala来写程序,python不用编译, java需要用maven编译成jar文件,scala也需要用sbt编译成jar文件。
python例子:
文件结构如下
WordCountpy |
|-input
|-script|
|-WordCount.py
WordCount.py如下
import sys from operator import add from pyspark import SparkContext if __name__ == "__main__>": if len(sys.argv) != 3: print >> sys.stderr, "Usage: wordcount <input> <output>>" exit(-1) sc = SparkContext(appName="PythonWordCount>") lines = sc.textFile(sys.argv[1], 1) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() for (word, count) in output: print "%s: %i>" % (word, count) counts.saveAsTextFile(sys.argv[2]) sc.stop()
cd 到WordCount项目根目录下运行: spark-submit –master local[4] script/WordCount.py input/TheMostDistantWayInTheWorld.txt output 然后就会多出一个output文件夹,里面存有结果~
scala例子
java的我就不写了。。。
scala例子文件结构如下: WordCountScala | |-input| | |-TheMostDistantWayInTheWorld.txt | |-src| | |-wordcountApp.scala | |-wordcountAPP.sbt
wordcountApp.scala如下
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object WordCountApp { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: WordCountApp <input> <output>>") System.exit(1) } val conf = new SparkConf().setAppName("WordCountAPP>") val sc = new SparkContext(conf) val file = sc.textFile(args(0)) val counts = file.flatMap(line => line.split(" >")).map(word => (word,1)).reduceByKey(_+_) println(counts) counts.saveAsTextFile(args(1)) } }
wordcountAPP.sbt如下:
name := "wordcount>" version := "1.0>" scalaVersion := "2.10.4>" libraryDependencies += "org.apache.spark>" %% "spark-core>" % "1.2.0>"
编译:
cd 到
$ find .
$ sbt package
编译成功你编译完的程序就在
运行:
spark-submit --class "WordCountApp>" \ --master local[4] \ target/scala-2.10/wordcount_2.10-1.0.jar input/TheMostDistantWayInTheWorld.txt output
你就能看到output了
Streaming spark的流式计算系统
这个stream很牛,实时运算,似乎是strom的替代品,试试官方例子
import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__ == "__main__>": if len(sys.argv) != 3: print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>>" exit(-1) sc = SparkContext(appName="PythonStreamingNetworkWordCount>") ssc = StreamingContext(sc, 1) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) counts = lines.flatMap(lambda line: line.split(" >"))\ .map(lambda word: (word, 1))\ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
运行
-
nc -lk 9999 打开端口监听 -
新开个terminal,cd 到该脚本所在地址
spark-submit network_wordcount.py localhost 9999 - 在nc -lk 那个terminal下输入句子试试~ 可以在另一个terminal中看到每次回车后该行的单词次数~
0 Comments