Spark学习笔记Spark Streaming的使用
什么是Spark Streaming?
Spark Streaming是Apache Spark的组成部分之一,是一个流处理引擎,可用于处理实时数据流。它可以从各种源头(如Kafka、Flume、Twitter、Socket等)获取数据,并以可扩展的、高容错的方式对数据进行处理和分析。
Spark Streaming的基本概念
- DStream(Discretized Stream):离散化的流,即连续的数据流被切分成一个个小的批次(batch),由DStream来表示。
- Input DStream:输入流,从数据源中获取数据流。
- Recevier:接收器,从输入流中接收数据。
- Transformation:转换,对数据流进行转换操作,如map、filter、reduce等操作。
- Output DStream:输出流,将转换后的结果输出到目标存储器或终端设备。
Spark Streaming的使用步骤
- 创建StreamingContext对象
```
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="SparkStreamingExample")
ssc = StreamingContext(sc, batchDuration=5) #每5秒钟处理一批数据
```
- 定义输入流
可以从多种源头获取数据流,如Kafka、Flume、Twitter、Socket等。
input_stream = ssc.socketTextStream(hostname="localhost", port=9999)
- 对数据流进行转换操作
word_counts = input_stream.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
上述操作将输入流按空格切分成单词,并计算每个单词出现的次数。
- 定义输出流
word_counts.pprint()
输出结果可以在控制台或文件中打印出来。
- 启动程序
ssc.start()
ssc.awaitTermination()
程序启动后,每隔5秒钟会处理一批输入数据,并将处理结果输出。如果没有输入数据,程序会一直等待。
示例1:在控制台中显示输入数据
下面的示例读取来自9999端口的输入数据并在控制台中显示。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="StreamingTest")
ssc = StreamingContext(sc, batchDuration=5)
input_stream = ssc.socketTextStream(hostname="localhost", port=9999)
input_stream.pprint()
ssc.start()
ssc.awaitTermination()
在另一个终端窗口中输入数据(按enter键时数据被发送):
$ nc -lk 9999
hello spark streaming!
控制台输出结果:
-------------------------------------------
Time: 2021-12-31 12:56:15
-------------------------------------------
hello spark streaming!
-------------------------------------------
Time: 2021-12-31 12:56:20
-------------------------------------------
示例2:计算输入数据中单词的出现次数
下面的示例读取来自9999端口的输入数据,将文本按空格切分成单词,并计算每个单词出现的次数,并在控制台中输出。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="WordCount")
ssc = StreamingContext(sc, batchDuration=5)
input_stream = ssc.socketTextStream(hostname="localhost", port=9999)
word_counts = input_stream.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
word_counts.pprint()
ssc.start()
ssc.awaitTermination()
在另一个终端窗口中输入数据(按enter键时数据被发送):
$ nc -lk 9999
hello world
hello scala
world python
控制台输出结果:
-------------------------------------------
Time: 2021-12-31 12:56:15
-------------------------------------------
('hello', 1)
('world', 1)
-------------------------------------------
Time: 2021-12-31 12:56:20
-------------------------------------------
('hello', 1)
('scala', 1)
('world', 2)
('python', 1)
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Spark学习笔记Spark Streaming的使用 - Python技术站