需求

从指定时间戳(比如 2 小时)开始消费 Kafka 数据

思路

我们知道通过 Kafka 的 API 可以得到指定时间戳对应数据所在的 segment 的起始 offset。那么就可以通过这个功能来粗略的实现需求。

实现

我们知道 KafkaUitls.createDirectStream 这个接口可以指定起始点的 offset,那么我们需要做的就变成如下三步:

  1. 获取 topic 对应的 TopicAndPartitions,得到当前 topic 有多少 partition
  2. 从 Kafka 获取每个 partition 指定时间戳所在 segment 的起始 offset
  3. 将步骤 2 中的 offset 作为参数传入 createDirectStream 即可
    通过查看源码,我们知道步骤 1 和步骤 2 中的功能在 org.apache.spark.streaming.kafka.KafkaCluster 中都已经有现成的函数了:getPartitionsgetLeaderOffsets,分别表示获取指定 topic 的 partition 以及获取 partition 指定时间戳所在的 segment 的起始 offset,那么我们需要做的就是如何调用这两个函数实现我们的功能了。

我们知道 KafkaCluster 的作用域是 private[spark] 所以我们需要在自己的代码中使用 package org.apache.spark(.xxx ... .yyy)(小括号中表示可以省略)来限定自己的代码,因此我们可以将步骤 1 和步骤 2 中的功能实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package org.apache.spark.streaming.kafka
...... //省略其他不相关的代码
def getPartitions(kafkaParams: Map[String, String], topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
val kc = new KafkaCluster(kafkaParams)
kc.getPartitions(topics) //我们可以在这里处理错误,也可以将错误继续往上传递
}
def getLeaderOffsets(kafkaParams: Map[String, String], topicAndPartitions: Set[TopicAndPartition], before: Long) : Map[TopicAndPartition, Long] = {
val kc = new KafkaCluster(kafkaParams)
val leaderOffsets = kc.getLeaderOffsets(topicAndPartitions, before)
if (leaderOffsets.isLeft) { //在本函数内部处理错误,如果有错误抛出异常
throw new RuntimeException(s"### Exception when MTKafkaUtils#getLeaderOffsets ${leaderOffsets.left.get} ###")
}
leaderOffsets.right.get.map { case (k, v) => (k, v.offset)} //将 Map[TopicAndPartition, LeaderOffset] 转变为 Map[TopicAndPartition, Long](Long 为对应 partition 的 offset,从 LeaderOffset 中获取)
}

步骤 3 直接传入参数即可,就可以从指定时间戳开始消费 Kafka 数据了

Comments