Skip to content Skip to main navigation Skip to footer

Python: Spark 学习笔记

Spark 学习笔记

Spark作为分布式系统框架的后起之秀(据说)各方面都要优于hadoop,有本地缓存,原生支持潮潮的scala, 对python的支持强大到直接可以用ipython notebook 作为交互界面。 还有几个正在开发中的分支框架分别用于机器学习和社交网络分析,简直华丽的飞起。

安装

根据阿帕奇基金会的尿性,应该是没有所谓的安装一说的,下载,解压就能使用了,当然不要忘了设置环境变量,具体不啰嗦.

使用

spark全面支持scala和java,部分支持python。好在scala也是脚本语言,所以相对比hadoop方便的多,先来看看两种交互模式。

pyspark

命令行使用 pyspark以后terminal中就会出现个大大SPARK图案和熟悉的python默认交互界面,这太丑了有啥更优秀的没?

利用 PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark呼出ipython交互界面,这个比原来的好多了。

利用

 PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --pylab inline>" pyspark
 

调用ipython notebook

随便进去个试试示例的程序,还是用WordCount好了

file = sc.textFile("hdfs://localhost:9000/user/huangsizhe/input/README.md>")
count = file.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
count.saveAsTextFile("hdfs://localhost:9000/user/huangsizhe/wc>")
 

成功输出~

对了,为了避免hadoop警告建议把native lib 屏蔽了

spark-shell

这个是最常用的工具了,开发语言是scala,scala的话可以用脚本也可以编译执行,我觉得还是脚本方便

使用写的脚本方法是

$spark-shell -i XXX.scala

这样就会一条一条的执行了

还是WordCount

WordCount.scala放在$SPARKPATH/scala目录下

val file = sc.textFile("hdfs://localhost:9000/user/huangsizhe/input/README.md>")
val counts = file.flatMap(line => line.split(" >")).map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://localhost:9000/user/huangsizhe/wc>")
 

执行

$spark-shell -i WordCount.scala

成功!

spark-submit

这个就有点像hadoop了,一般用于写app,可以使用python,java,scala来写程序,python不用编译, java需要用maven编译成jar文件,scala也需要用sbt编译成jar文件。

python例子:

文件结构如下

WordCountpy |
            |-input
            |-script|
                    |-WordCount.py

WordCount.py如下

import sys
from operator import add
from pyspark import SparkContext
if __name__ == "__main__>":
	if len(sys.argv) != 3:
		print >> sys.stderr, "Usage: wordcount <input> <output>>"
		exit(-1)
	sc = SparkContext(appName="PythonWordCount>")
	lines = sc.textFile(sys.argv[1], 1)
	counts = lines.flatMap(lambda x: x.split(' ')) \
				  .map(lambda x: (x, 1)) \
				  .reduceByKey(add)
	output = counts.collect()
	for (word, count) in output:
		print "%s: %i>" % (word, count)
	counts.saveAsTextFile(sys.argv[2])
	sc.stop()
 

cd 到WordCount项目根目录下运行: spark-submit –master local[4] script/WordCount.py input/TheMostDistantWayInTheWorld.txt output 然后就会多出一个output文件夹,里面存有结果~

scala例子

java的我就不写了。。。

scala例子文件结构如下: WordCountScala | |-input| | |-TheMostDistantWayInTheWorld.txt | |-src| | |-wordcountApp.scala | |-wordcountAPP.sbt

wordcountApp.scala如下

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object WordCountApp {
    def main(args: Array[String]) {
	if (args.length < 2) {
	  System.err.println("Usage: WordCountApp <input> <output>>")
	  System.exit(1)
	}
	val conf = new SparkConf().setAppName("WordCountAPP>")
	val sc = new SparkContext(conf)
	val file = sc.textFile(args(0))
	val counts = file.flatMap(line => line.split(" >")).map(word => (word,1)).reduceByKey(_+_)
	println(counts)
	counts.saveAsTextFile(args(1))
	}
    }
 

wordcountAPP.sbt如下:

name := "wordcount>"
version := "1.0>"
scalaVersion := "2.10.4>"
libraryDependencies += "org.apache.spark>" %% "spark-core>" % "1.2.0>"
 

编译:

cd 到 WordCountScala项目文件夹下

$ find .
$ sbt package

编译成功你编译完的程序就在 target/scala-2.10/simple-project_2.10-1.0.jar

运行:

spark-submit   --class "WordCountApp>"  \
 --master local[4]   \
 target/scala-2.10/wordcount_2.10-1.0.jar input/TheMostDistantWayInTheWorld.txt output
 

你就能看到output了

Streaming spark的流式计算系统

这个stream很牛,实时运算,似乎是strom的替代品,试试官方例子 Networkwordcount.py

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__>":
	if len(sys.argv) != 3:
		print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>>"
		exit(-1)
	sc = SparkContext(appName="PythonStreamingNetworkWordCount>")
	ssc = StreamingContext(sc, 1)
	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
	counts = lines.flatMap(lambda line: line.split(" >"))\
				  .map(lambda word: (word, 1))\
				  .reduceByKey(lambda a, b: a+b)
	counts.pprint()
	ssc.start()
	ssc.awaitTermination()
 

运行

  1. nc -lk 9999打开端口监听
  2. 新开个terminal,cd 到该脚本所在地址 spark-submit network_wordcount.py localhost 9999
  3. 在nc -lk 那个terminal下输入句子试试~ 可以在另一个terminal中看到每次回车后该行的单词次数~

原文:http://www.ituring.com.cn/article/198895

0 Comments

There are no comments yet

Leave a comment

Your email address will not be published.