需求

将 kafka 上的数据实时同步到 HDFS,不能有太多小文件

实现过程

Spark Streaming 支持 RDD#saveAsTextFile,将数据以 纯文本 方式写到 HDFS,我们查看 RDD#saveAsTextFile 可以看到

1
2
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

从上面这句话我们可以知道,首先将 RDD 转化为 PariRDD,然后再调用 saveAsHadoopFile 函数进行实际的操作。上面的语句中 r 是原始 RDD,nullWritableClassTagtextClassTag 表示所写数据的类型,使用 nullWritableClassTag 是因为 HDFS 不会将这个数据进行实际写入(pariRDD 是 (K,V) 类型, 我们只需要写入 V),从效果上看就只写如后面的一个字段。TextOutputFormat 是一个格式化函数,后面我们再来看这个函数,NullWritable 则表示一个占位符,同样是这个字段不需要实际写入 HDFS,Text 表示我们将写入文本类型的数据。

我们看到 TextOutputFormat 这个类中有一个函数是 RecordWriter 用于操作没一条记录的写入,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");
if(!isCompressed) {
Path codecClass1 = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem codec1 = codecClass1.getFileSystem(job);
FSDataOutputStream file1 = codec1.create(codecClass1, progress);
return new TextOutputFormat.LineRecordWriter(file1, keyValueSeparator);
} else {
Class codecClass = getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job);
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
}
}

文件中分为两部分:1)压缩文件,2)非压缩文件。然后剩下的事情就是打开文件,往文件中写数据了。

说到压缩文件,就和写 lzo 格式关联起来了,因为 lzo 格式就是压缩的,那么我们从哪拿到这个压缩的格式的呢?实际上 PariRDDFunctions#saveAsHadoopFile 还可以传入压缩格式类,函数原型如下

1
2
3
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {

这里第二个参数表示压缩的类。如果需要我们传入一个压缩类即可,如 classOf[com.hadoop.compression.lzo.LzopCodec] 最终这个参数会传给 TextOutputFormat#RecordWriter.

至此,我们以及可以写 lzo 格式的文件了。但是还没有结束,因为会产生小文件,每个 RDD 的每个 partition 都会在 HDFS 上产生一个文件,而且这些文件大小非常小,就形成了很多小文件,这对 HDFS 的压力会非常大。我们需要解决这个问题

Comments