博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
九、sparkStream的scala示例
阅读量:7080 次
发布时间:2019-06-28

本文共 2125 字,大约阅读时间需要 7 分钟。

简介

sparkStream官网:

sparkStream是构建在spark core之上的实时流处理框架,它支持很多的数据源,如:

你可以从kafka等各种数据源中实时获取数据流,然后经过spark计算,持久化或者实时的dashBoard展示。

sparkStream的实时计算其实也可以称为微批处理计算,它将数据流按照一定的时间段分割成小批的数据,然后将对数据流的操作转换为对RDD的操作,整个流计算的中间结果进行叠加存储到内存或者外部设备,如图:

代码示例

下面将使用tcp socket作为数据源,每隔1秒钟发送字符数据。sparkstream将在启动以后,将收集10秒的数据作为一个批数据进行统计处理,代码如下:

import java.net.ServerSocketimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}/**  * @Description sparkStream demo  * @Author lay  * @Date 2018/12/08 21:43  */object SparkStreamDemo {  var conf: SparkConf = _  var sc: SparkContext = _  var ssc: StreamingContext = _  def init(): Unit = {    conf = new SparkConf().setAppName("spark stream demo").setMaster("local[2]")    sc = new SparkContext(conf)    sc.setLogLevel("warn")    // 时间片为10秒钟    ssc = new StreamingContext(sc, Seconds(10))  }  def main(args: Array[String]): Unit = {    // 初始化socket流    initSocketStream()    // 初始化SparkStream    init()    // 从socket获取DStream    val lines = ssc.socketTextStream("localhost", 8888)    // 统计字数    val wordCount = lines.flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_)    // 打印结果    wordCount.print()    // 启动    ssc.start()    println("spark stream started")  }  def initSocketStream(): Unit = {    new Thread(new Runnable {      override def run(): Unit = {        val serverSocket = new ServerSocket(8888)        val socket = serverSocket.accept()        println("accepted")        for (i <- 1 to 10) {          val text = "what is this\n"          socket.getOutputStream.write(text.getBytes("utf-8"))          Thread.sleep(1000)        }        println("waiting")        Thread.sleep(50000)        socket.close()        serverSocket.close()        println("closed")      }    }).start()    println("thread started")  }}

注意:

1)这里的master设置为"local[2]",是因为spark起码需要两个线程,一个线程用来接收数据,另一个线程用来处理数据。

 2)"what is this\n"这里加了一个'\n'字符,是因为字节流的接收将会以这个字符作为分隔符。

你会看到类似如下的打印:

-------------------------------------------Time: 1544281700000 ms-------------------------------------------(this,10)(is,10)(what,10)

 

转载于:https://www.cnblogs.com/lay2017/p/10089614.html

你可能感兴趣的文章
拥有丰富经验的移动广告聚合平台-KeyMob
查看>>
ActiveMQ(三)消息机制
查看>>
CentOS yum安装mcrypt详细图解教程
查看>>
我的友情链接
查看>>
FastDFS 安装部署
查看>>
我的友情链接
查看>>
查看命令帮助 help 、man
查看>>
Linux下Squid正向/反向代理配置
查看>>
android sdk 更新代理
查看>>
企业快速开发的优点
查看>>
WIN7 64位系统使用SCRT 7.064位、GNS3以及SCRT与GNS3的关联(一)
查看>>
MONGO_URL
查看>>
监控服务器Nagios之三 监控案例
查看>>
最简单的jdbc程序
查看>>
c#索引器
查看>>
C/C++内存管理 笔记
查看>>
对象数组去重合并
查看>>
Ubuntu 安装网络扫描和嗅探工具 Zenmap
查看>>
云计算与openstack学习(七)
查看>>
SpringMVC视图解析器ViewResovlet问题
查看>>