需求

将数据从 Kafka 同步到 Hive,并且目标格式希望是 lzo。我们通过 Spark Streaming 做这件事,将文件写成 lzo 格式,并且添加索引。

实现

要实现将数据从 Kafka 同步到 Hive 的功能,我们通过将数据直接写到 HDFS 路径来解决,由于担心小文件太多的问题(一个 batch 一个文件的话,可能造成小文件太多,对 HDFS 造成非常大的压力),所以我们通过追加的方式写 HDFS 文件。

往 HDFS 追加写文件的方式,我们在前面一篇文章中描述了具体的方案。但是对于格式为 LZO 的文件,我们发现一个现象:通过 Hive 查询,只能查到第一个 batch 的数据(也就是说所有 append 的数据都不能被查询到)。这是因为 LZO 文件会在关闭的时候在文件末尾添加一个块结束标记符,导致解析的时候只能读取到块结束符之前的数据(Linux 自带的 lzop 文件可以解析包含块结束符的文件)。到这里我们有两个思路:

1. 在 Hive 层面进行修改,将 Hive 使用的 InputFormat 重新实现,从而可以解析 multipart 的文件;
2. 通过某种方式将文件进行追加,但是文件的中间不会出现结束块的标记符。

由于第一种方式影响较大,实现起来周期较长,所以这里采用第二种方法。

我们考虑如何做到往 HDFS 写完数据之后,文件流不进行关闭,在我们需要关闭的时候再手动关闭。也就是说同一个 Executor 上的多 batch 公用同一个文件流。

查看官方文档我们可以得到这是可以实现的,也就是文档中的 ConnectionPool 实现的方式,这可以做到在同一个 Executor 上执行的多个 batch 公用同一个文件流(个人觉得这里也可以从 JVM 的层面来考虑,就是利用了 static 变量的声明周期以及可访问范围)。

当我们手动关闭某个文件的时候,再考虑将这个文件 move 到特定的地方(Hive 表对应的 HDFS 路径),然后添加索引,大致框架就完成了。当然这也仅仅是一个框架,需要处理的细节问题还有很多。

Comments