kafka-sparkstreaming长时间运行后异常退出原因分析

2018/3/1 posted in  Spark Kafka

报错信息

Couldn`t find leader offsets for set([xxxxxxxxxxx,xx])

# 或者
numRecords must not be negative

分析过程

从博客中已经找不到直接的解决方案,那么只能从源码入手,从而看看能不能定位到问题所在。
从这一行代码开始跟踪

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, myTopic)

发现了,先调用getFromOffsets(kc, kafkaParams, topics)获取到了起始offset.然后创建了一个
DirectKafkaInputDStream对象.点进去

  def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }

找到这个对象里面的compute方法,重点从这个方法的逻辑开始分析

  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
  }

分析过程不仔细说了,感兴趣的在compute方法处打个断点,跟踪一下相关变量的值,应该就能明白一个大概了.

解决办法

综上所述,解决办法为添加以下参数:

--conf spark.streaming.kafka.maxRetries=50 \
--conf spark.yarn.maxAppAttempts=10 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \

注意:
1. spark.yarn.maxAppAttempts参数不能超过hadoop集群yarn.resourcemanager.am.max-attempts的最大值,如果超过将会取两者较小值
2. 由于kafka切换leader导致的数据丢失,然后造成sparkstreaming程序报错,异常退出的情况下.我这通通常都是重启kafka,删除checkpoint,再重启sparkstreaming了.
3. 后面我们不用原生的方式来存储offset.而是自己维护offset,那么以后更新迭代后就不用再删除checkpoint了,对业务的影响也最小了.