本文共 4987 字,大约阅读时间需要 16 分钟。
Direct方式,直接从kafka的broker读取数据,而Receiver方式,从zk获得偏移量信息,性能要差一些!
(1)启动zk
bin/zkServer.sh start
(2) 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
(3) 创建topic
bin/kafka-topics.sh --create --topic kafka_streaming_topic --zookeeper hadoop:2181/kafka08 --partitions 1 --replication-factor 1
查看:
bin/kafka-topics.sh --list --zookeeper hadoop:2181/kafka08
(1)添加pom依赖
【参考:http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html】
org.apache.spark spark-streaming-kafka-0-8_2.11 2.1.0
(2)代码
package Sparkimport kafka.serializer.StringDecoderimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** */object KafkaDirectWordCount_product { def main(args: Array[String]): Unit = { if(args.length!=2){ System.err.println("Usage: KafkaDirectWordCount") System.exit(1) } val Array(brokers,topics)=args val sparkConf=new SparkConf().setAppName("KafkaDirectWordCount") .setMaster("local[2]") val ssc=new StreamingContext(sparkConf,Seconds(5))// val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap val topicsSet=topics.split(",").toSet val kafkaParams=Map[String,String]("metadata.broker.list"->brokers) //TODO: Spark streaming如何对接kafka //参考源码createStream val messages =KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicsSet ) //取第2个 messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() }}
(1)提交spark任务
bin/spark-submit \--class Spark.KafkaDirectWordCount_product \--master local[2] \--name KafkaDirectWordCount_product \--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \/opt/datas/lib/scalaProjectMaven.jar hadoop:9092 kafka_streaming_topic
kafka0.10.2+kafka2.2.0
org.apache.spark spark-streaming_2.11 2.1.0 org.apache.spark spark-streaming-kafka-0-10_2.11 2.1.0 org.scala-lang scala-library ${scala.version}
【参考万能官网:】
package com.testimport org.apache.kafka.clients.consumer.ConsumerConfigimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.streaming._import org.apache.spark.streaming.kafka010._/** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: DirectKafkaWordCount* is a list of one or more Kafka brokers * is a consumer group name to consume from topics * is a list of one or more kafka topics to consume from * * Example: * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \ * consumer-group topic1,topic2 */object DirectKafkaWordCount { def main(args: Array[String]): Unit = {// if (args.length < 3) {// System.err.println(s"""// |Usage: DirectKafkaWordCount // | is a list of one or more Kafka brokers// | is a consumer group name to consume from topics// | is a list of one or more kafka topics to consume from// |// """.stripMargin)// System.exit(1)// }// StreamingExamples.setStreamingLogLevels()// val Array(brokers, groupId, topics) = args val brokers="broker1:9092,broker2:9092,broker3:9092" val topics="tests" val groupId="test" // Create context with 2 second batch interval val sparkConf = new SparkConf() .setMaster("local[2]") .setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Get the lines, split them into words, count the words and print val lines = messages.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start() ssc.awaitTermination() }}// scalastyle:on println
(1)生产者
# kafka-console-producer --broker-list broker1:9092,broker2:9092,broker3:9092 --topic tests
(2)消费者
# kafka-console-consumer --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --topic tests
转载地址:http://ftygi.baihongyu.com/