kafka-sparkstreaming自己维护offset

2018/3/6 posted in  Spark

auto.offset.reset

By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration auto.offset.reset in Kafka parameters to smallest, then it will start consuming from the smallest offset.

OffsetRange

topic主题,分区ID,起始offset,结束offset

重写思路

因为spark源码中KafkaCluster类被限制在[spark]包下,所以我们如果想要在项目中调用这个类,那么只能在项目中也新建包org.apache.spark.streaming.kafka.然后再该包下面写调用的逻辑.这里面就可以引用KafkaCluster类了.这个类里面封装了很多实用的方法,比如:获取主题和分区,获取offset等等...

这些api,spark里面都有现成的,我们现在就是需要组织起来!

offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

简单说一下

  1. 在zookeeper上读取offset前先根据实际情况更新fromOffsets
    1.1 如果是一个新的groupid,那么会从最新的开始读
    1.2 如果是存在的groupid,根据配置auto.offset.reset
    1.2.1 smallest : 那么会从开始读,获取最开始的offset.
    1.2.2 largest : 那么会从最新的位置开始读取,获取最新的offset.

  2. 根据topic获取topic和该topics下所有的partitions

val partitionsE = kc.getPartitions(topics)
  1. 传入上面获取到的topics和该分区所有的partitions
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
  1. 获取到该topic下所有分区的offset了.最后还是调用spark中封装好了的api
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V, String)](
                ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => ( mmd.key, mmd.message, mmd.topic))
  1. 更新zookeeper中的kafka消息的偏移量
kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))

问题

sparkstreaming-kafka的源码中是自己把offset维护在kafka集群中了?

./kafka-consumer-groups.sh --bootstrap-server 10.10.25.13:9092 --describe  --group heheda

因为用命令行工具可以查到,这个工具可以查到基于javaapi方式的offset,查不到在zookeeper中的

网上的自己维护offset,是把offset维护在zookeeper中了?
用这个方式产生的groupid,在命令行工具中查不到,但是也是调用的源码中的方法呢?
难道spark提供了这个方法,但是自己却没有用是吗?

自己维护和用原生的区别

区别只在于,自己维护offset,会先去zk中获取offset,逻辑处理完成后再更新zk中的offset.
然而,在代码层面,区别在于调用了不同的KafkaUtils.createDirectStream

自己维护

自己维护的offset,这个方法会传入offset.因为在此之前我们已经从zk中获取到了起始offset

KafkaUtils.createDirectStream[K, V, KD, VD, (K, V, String)](
                ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => ( mmd.key, mmd.message, mmd.topic))

原生的

接受的是一个topic,底层会根据topic去获取存储在kafka中的起始offset

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, myTopic)

接下来这个方法里面会调用getFromOffsets来获取起始的offset

val kc = new KafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)

代码

这个代码,网上很多,GitHub上也有现成的了.这里我就不贴出来了!
这里主要还是学习代码的实现思路!

如何引用spark源码中限制了包的代码

  1. 新建和源码中同等的包名,如上所述.
  2. 把你需要的源码拷贝一份出来,但是可能源码里面又引用了别的,这个不一定好使.
  3. 在你需要引用的那个类里,把这个类的包名改成与你需要引用的包名一样.最简单的办法了