一句话总结:InputStream#read 对于当次能够读取多少字节不做保证,必须以该函数的返回值做为实际读取字节数的事实。

#1. 问题
由于某些功能的需要,实现如下函数,从某个 InputStream copy 特定长度的数据到 OutputStream,其中 InputStream 为打开的某个 HDFS 文件流

1
2
3
4
5
6
7
8
9
10
11
12
13
private void copySpecifiedLengthBytes(final InputStream in, final OutputStream out, final long size) {
long current = 0;
byte[] bytes = new byte[BLOCKSIZE]; //BLOCKSIZE = 4096;
int bytesRead;
while (current + BLOCKSIZE < size) {
bytesRead = in.read(bytes);
current += bytesRead;
out.write(bytes);
}
int byteLeftToRead = (int) (size - current);
bytesRead = in.read(bytes, 0, byteLeftToRead);
out.write(bytes, 0, bytesRead);
}

看到上面的函数,可以先思考下会不会出现问题,如果可能出现问题,则会在哪个地方,以及问题的展现形式会是什么样的。
|
|
|
|
|
|
|
|
|
|
上面的代码在某些场景下会有问题,大致如下:OutputStream 的中间部分会多出来一些无关字节,导致整个 OutputStream 的内容是错误的。

2. 定位分析

当我们知道所有的来龙去脉之后,回过头来看发现其实并没有那么难,但是当我们只看到现象的时候,可能会有无数种解释,这个时候需要能够验证哪种解释才是合理的。

看到 OutputStream 中的数据不对,怀疑如下两点 1) InputStream 中的数据是否准确; 2) 读取的起止位置是否准确;

通过添加相关日志重现问题,初步确定 InputStream 中的数据是可信的,从 InputStream 中读取的起止位置也是准确的,但是 OutputStream 中得到的数据是非预期的。暂时不知道对不上的数据是怎么来的,但是基本能够定位问题在于上面的 copySpecifiedLengthBytes 函数。

为了验证确实是这里的问题,将该函数进行了修改,改成如下所示

1
2
3
4
5
6
7
8
9
10
private void copySpecifiedLengthBytes(final InputStream in, final OutputStream out, final long size) {
byte[] bytes = new byte[BLOCKSIZE]; //BLOCKSIZE = 4096;
int bytesRead;
long byteLeft = size;
while (byteLeft > 0) {
bytesRead = in.read(bytes, 0, (int) Math.min(byteLeft, bytes.length));
byteLeft -= bytesRead;
out.write(bytes, 0, bytesRead);
}
}

然后尝试进行复现该问题,发现没有再次复现(之前基本每次都能复现),基本确定问题在这里。但是留下几个问题不能完全解释清楚:

  1. 把 HDFS 文件 copy 到本地,尝试复现的时候,发现无法复现,且得到的 OutputStream 是预期中的(正确的数据)

  2. 为什么从同一个 HDFS 多次复现的时候,得到的 OutputStream 结果是一致的(错的一致)

3. 原因

上面遗留的第一个问题,从本地文件读取时不能复现,与实现有关,找了下源码,没有发现 read 函数有中间中断的情况,因此没有出现问题。

第二个问题,看到的现象是从同一台机器上读取同一个文件,得到的 OutputStream 一致(错的一致),也就是说 InputStream#read 的行为在多次复现过程中完全一致,所以导致多次复现得到的 OutputStream 也错的一致。

然后我们尝试在错误的 copySpecifiedLengthBytes 函数中添加日志,查看哪些地方出错,看到一个现象,在大致读取来 128K 的地方,InputStream#read 真正读取到的字节数少于 BLOCKSIZE,但是我们写出的数据量为 BLOCKSIZE,从而导致后面的 out.write(bytes) 多写出一些无关字节,至此我们基本能够解释上面的遗留问题来,但是又引发了另一个问题

为什么每次都在 128K 左右的地方出错呢?而且每次执行 copySpecifiedLengthBytes 仅仅在第一个 128K 附近出错,后面的地方都没有出错呢?

然后我们打开 hadoop-hdfs-client 的代码,从 DFSInputStream 开始跟踪,发现在 BlockReaderRemote.java 中有一段这样的代码

1
2
3
4
5
6
// First packet will include some data prior to the first byte
// the user requested. Skip it.
if (curHeader.getOffsetInBlock() < startOffset) {
int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
curDataSlice.position(newPos);
}

这段代码表示读取的第一个 packet(HDFS 读写的最小单元) 头部会包含一些非法数据,需要跳过,但是后续的 packet 则不需要跳过,这个和我们看到的现象基本吻合。那到底是不是这里呢?

通过准备环境,然后在 curDataSlice.position(newPos) 处添加断点,发现如预期的停在了断点处,也就是说问题的根源来自这里。至此所有的遗留问题都解决了。

接下来梳理一下整个流程:

  1. 使用 InputStream#read 进行读取

  2. DFSInputStream 会读取 packet

  3. DFSInputStream 读取 packet 的时候会对第一个 packet 进行部分字节的跳过, 这里是因为 HDFS 的读写最小单元是 packet,seek 的 offset 可能不是 packet 的开头,那么就会从 packet 的开头进行读取,然后实际读取的时候需要把前面一部分进行跳过。

  4. 导致第一步中的 read 读取不充分(实际读取的字节数比预期的少),导致 out.write(bytes) 的行为不符合预期

最后附上可以进行直观查看效果的 UT
当使用错误的 copySpecifiedLengthBytes 的时候,下面的测试会挂掉,注意挂掉的原因,当使用正确的测试的使用,则会通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Test
public void testCopySpecifiedLengthBytes() throws IOException {
int length = 4 * 1024 * 1024;
byte[] from = new byte[length];
Random rnd = new Random();
rnd.nextBytes(from);
InputStream fromStream = spy(new ByteArrayInputStream(from));
doAnswer(new Answer() {
private int idx = 0;
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
byte[] first = invocation.getArgumentAt(0, byte[].class);
int off = invocation.getArgumentAt(1, Integer.class);
int len = invocation.getArgumentAt(2, Integer.class);
int bytesReadThisTime = Math.min(100, len);
System.arraycopy(from, idx, first, off, bytesReadThisTime);
idx += bytesReadThisTime;
return bytesReadThisTime;
}
}).when(fromStream).read(any(byte[].class), anyInt(), anyInt());
ByteArrayOutputStream toStream = new ByteArrayOutputStream(length);
copySpecifiedLengthBytes(fromStream, toStream, length);
assertArrayEquals(from toStream.toByteArray());
}

Comments