博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming(13):高级数据源kafka Direct方式(生产)
阅读量:4280 次
发布时间:2019-05-27

本文共 4987 字,大约阅读时间需要 16 分钟。

一、实现功能

Direct方式,直接从kafka的broker读取数据,而Receiver方式,从zk获得偏移量信息,性能要差一些!

二、针对kafka0.8

1.测试环境

(1)启动zk

bin/zkServer.sh start

(2) 启动kafka

bin/kafka-server-start.sh -daemon config/server.properties

(3) 创建topic

bin/kafka-topics.sh --create --topic kafka_streaming_topic --zookeeper hadoop:2181/kafka08 --partitions 1 --replication-factor 1

查看:

bin/kafka-topics.sh --list --zookeeper hadoop:2181/kafka08

2.代码开发

(1)添加pom依赖

【参考:http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html】

    
   
org.apache.spark
   
spark-streaming-kafka-0-8_2.11
   
2.1.0
   

(2)代码

package Sparkimport kafka.serializer.StringDecoderimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  */object KafkaDirectWordCount_product {  def main(args: Array[String]): Unit = {    if(args.length!=2){      System.err.println("Usage: KafkaDirectWordCount 
") System.exit(1) } val Array(brokers,topics)=args val sparkConf=new SparkConf().setAppName("KafkaDirectWordCount") .setMaster("local[2]") val ssc=new StreamingContext(sparkConf,Seconds(5))// val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap val topicsSet=topics.split(",").toSet val kafkaParams=Map[String,String]("metadata.broker.list"->brokers) //TODO: Spark streaming如何对接kafka //参考源码createStream val messages =KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicsSet ) //取第2个 messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() }}

3.测试

(1)提交spark任务

bin/spark-submit \--class Spark.KafkaDirectWordCount_product \--master local[2] \--name KafkaDirectWordCount_product \--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \/opt/datas/lib/scalaProjectMaven.jar  hadoop:9092 kafka_streaming_topic

(经测试,成功!)

三、对于cdh的kafka消息消费

1.环境

kafka0.10.2+kafka2.2.0

2.代码依赖

org.apache.spark
spark-streaming_2.11
2.1.0
org.apache.spark
spark-streaming-kafka-0-10_2.11
2.1.0
org.scala-lang
scala-library
${scala.version}

3.scala消费者代码

【参考万能官网:】

package com.testimport org.apache.kafka.clients.consumer.ConsumerConfigimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.streaming._import org.apache.spark.streaming.kafka010._/** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: DirectKafkaWordCount 
*
is a list of one or more Kafka brokers *
is a consumer group name to consume from topics *
is a list of one or more kafka topics to consume from * * Example: * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \ * consumer-group topic1,topic2 */object DirectKafkaWordCount { def main(args: Array[String]): Unit = {// if (args.length < 3) {// System.err.println(s"""// |Usage: DirectKafkaWordCount
// |
is a list of one or more Kafka brokers// |
is a consumer group name to consume from topics// |
is a list of one or more kafka topics to consume from// |// """.stripMargin)// System.exit(1)// }// StreamingExamples.setStreamingLogLevels()// val Array(brokers, groupId, topics) = args val brokers="broker1:9092,broker2:9092,broker3:9092" val topics="tests" val groupId="test" // Create context with 2 second batch interval val sparkConf = new SparkConf() .setMaster("local[2]") .setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Get the lines, split them into words, count the words and print val lines = messages.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start() ssc.awaitTermination() }}// scalastyle:on println

4.shell开启生产者和消费者

(1)生产者

# kafka-console-producer --broker-list broker1:9092,broker2:9092,broker3:9092 --topic tests

(2)消费者

# kafka-console-consumer --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --topic tests

5.运行消费者程序完美

转载地址:http://ftygi.baihongyu.com/

你可能感兴趣的文章
Tomcat的GC优化实践
查看>>
idea多模块项目间通过配置pom.xml相互引用
查看>>
(转)MYSQL如何设置大小写敏感
查看>>
SpringBoot单元测试,无法导入@RunWith
查看>>
(转)hbase balance命令走过的坑
查看>>
Linux环境cpu过高,定位问题步骤(附实例)
查看>>
(转)java final关键字使用及面试题重点
查看>>
(转)CDH下集成spark2.2.0与kafka(四十一):在spark+kafka流处理程序中抛出错误java.lang.NoSuchMethodError:
查看>>
(转)maven打包时跳过测试
查看>>
(转)jstack命令执行报错:Unable to open socket file: target process not responding or HotSpot VM not loaded
查看>>
Centos7.5 离线安装firefox
查看>>
Nginx安装(linux)
查看>>
flume实例(一):监控目录
查看>>
flume实例(二):监控服务器日志
查看>>
flume实例(三):扇入
查看>>
SparkStreaming(1):提交wordcount功能
查看>>
SparkStreaming(2):粗粒度和细粒度总结
查看>>
SparkStreaming(3):构建StreamingContext
查看>>
SparkStreaming(4):Discretized Streams (DStreams)理解
查看>>
SparkStreaming(5):处理不同数据源(socket源数据或者处理本地/HDFS文件)
查看>>