上海注册建网站,用frontpage怎么做网页,廊坊短视频优化案例,wordpress多个文章页一个消费者组可以消费多个topic#xff0c;以前写过一篇一个消费者消费一个topic的#xff0c;这次的是一个消费者组通过直连方式消费多个topic,做了小测试#xff0c;结果是正确的#xff0c;通过查看zookeeper的客户端#xff0c;zookeeper记录了偏移量 package day04 /… 一个消费者组可以消费多个topic以前写过一篇一个消费者消费一个topic的这次的是一个消费者组通过直连方式消费多个topic,做了小测试结果是正确的通过查看zookeeper的客户端zookeeper记录了偏移量 package day04 /*消费多个topic */import kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import scala.collection.mutable.ListBufferimport org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import org.apache.spark.streaming.{Duration, StreamingContext} object OrderDemoYY1 { def main(args: Array[String]): Unit { val conf new SparkConf().setAppName(yy).setMaster(local[*]) val ssc new StreamingContext(conf,Duration(5000)) //消费3个topic val topic1 wc val topic2 wc1 val topic3 wc2 //组名 val groupid GPMMVV //zookeeper地址 val zkQuorum hadoop01:2181,hadoop02:2181,hadoop03:2181 //brokerList val brokerList hadoop01:9092,hadoop02:9092,hadoop03:9092 //把消费的分区放到Set集合中可以在第一次读取时作为参数传入 val topics Set(topic1,topic2,topic3) //ListBuffer时有序的按下标有序 val topicsList ListBuffer[String](topic1,topic2,topic3) //设置kafka的参数 val kafkaParams Map( metadata.broker.list-brokerList, groupid-groupid, auto.offset.reset-kafka.api.OffsetRequest.SmallestTimeString //默认时从头开始读的 ) //new ListBuffer用来存放ZKGroupTopicDirs, 用来保存偏移量的地址 //因为有多个topic,对应的也就有多个ZKGroupTopicDirs var zkGTList:ListBuffer[ZKGroupTopicDirs] new ListBuffer[ZKGroupTopicDirs]() //根据topicList 新建 ZKGroupTopicDirs 添加到zkGTList for(tp - topicsList){ val topicDirs new ZKGroupTopicDirs(groupid,tp) zkGTList topicDirs } //新建zkClient用来获取偏移量和更新偏移量 val zkClient new ZkClient(zkQuorum) //新建一个InputDStream,要是var,因为有两种情况消费过 没有消费过 根据情况赋值 var kafkaDStream :InputDStream[(String,String)] null //创建一个Mapkeyvalue-》( 对应的时Topic和分区 偏移量) var fromOffset Map[TopicAndPartition,Long]() //获取每个topic是否被消费过 var childrens:ListBuffer[Int] new ListBuffer[Int]() var flag false //有topic被消费过则为true for (topicDir - zkGTList){ //循环存放偏移量的 //通过zkClient.countChidren来获取每个topic对应的分区中的偏移量ZKGroupTopicDirs的对象 val child: Int zkClient.countChildren(topicDir.consumerOffsetDir) childrens www.mhylpt.com child if(child0){ flag true } } if(flag){//消费过 for(z - 0 until topics.size){ //根据topicsList的的下表获取相应的child和ZKGroupTopicDirs val child childrens(z) val gpDirs zkGTList(z) val topicn topicsList(z) for(i - 0 until child)www.mcyllpt.com/{ //循环child 根据使用zkClient.readData方法u获取topic的每个分区的偏移量 val offset zkClient.readData[String](gpDirs.consumerOffsetDir/i) val tp new TopicAndPartition(www.michenggw.com/ topicn,i) fromOffset tp - offset.toLong } } //返回的而结果是 kafka的key,默认是null, value是kafka中的值 val messageHandler www.gcyl159.com/ (mmd:MessageAndMetadata[String,String])www.gcyl152.com{ (mmd.key(),mmd.message()) } //创建kafkaDStream kafkaDStream KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)]( ssc,kafkaParams,fromOffset,messageHandler ) }else{//以前没有读取过 kafkaDStream KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topics ) } /*val children1 zkClient.countChildren(zKGroupTopicDirs1.consumerOffsetDir) val children2 zkClient.countChildren(zKGroupTopicDirs2.consumerOffsetDir) if(children10 || children20){ if(children10){ for (i - 0 until children1){ val offset zkClient.readData[String](zKGroupTopicDirs1.consumerOffsetDir/i) val tp new TopicAndPartition(topic1,i) fromOffset tp -offset.toLong } } if(children20){ for (i - 0 until children1){ val offset zkClient.readData[String](zKGroupTopicDirs2.consumerOffsetDir/i) val tp new TopicAndPartition(topic2,i) fromOffset tp -offset.toLong } } val messageHandler (mmd:MessageAndMetadata[String,String]){ (mmd.key(),mmd.message()) } kafkaDStream KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams,fromOffset,messageHandler) }else{ kafkaDStream KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) }*/ var offsetRanges Array[OffsetRange]www.hjpt521.com() //用来记录更新的每个topic的分区偏移量 kafkaDStream.foreachRDD(kafkaRDD{ //kafkaRDD是一个KafkaRDD,可以转换成HasOffsetRanges对象从而获取offsetRanges offsetRanges kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges kafkaRDD.foreach(println)www.365soke.com //打印 for(o - offsetRanges){ val topicNN: String o.topic //获取topic val offset: Long o.untilOffset //获取偏移量 val partition: Int o.partition //获取分区 val i topicsList.indexOf(topicNN) //通过topicList查找topic的下标找到与之对应的ZKGroupTopicDirs val gpDir zkGTList(i) //通过ZkUtils更新偏移量 ZkUtils.updatePersistentPath(zkClient,gpDir.consumerOffsetDir/partition,offset.toString) /*if(topicNN.equals(topic1)){ ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs1.consumerOffsetDir/partition,offset.toString) }else if(topicNN.equals(topic2)){ ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs2.consumerOffsetDir/partition,offset.toString) }*/ } }) ssc.start() ssc.awaitTermination(www.dfgjyl.cn) 可以通过zookeeper的客户端在/consumers中查看偏移量我的3个topic中其中wc和wc1只有1个分区可以通过下图可看出wc1的0分区偏移量13 转载于:https://www.cnblogs.com/qwangxiao/p/9971006.html