在开发 Spark Streaming 的公共组件过程中,需要将 binlog 的数据(Array[Byte])转换为 Json 格式,供用户使用,本文提供一种转换的思路。另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面。如果

如果本文有讲述不详细,或者错误指出,肯请指出,谢谢

对于 binlog 数据,每一次操作(INSERT/UPDATE/DELETE 等)都会作为一条记录写入 binlog 文件,但是同一条记录可能包含数据库中的几行数据(这里比较绕,可以看一个具体的例子)

在数据库中,有 id, name,age 三个字段,其中 id 为主键,name 随意, age 随意。有两行数据如下

id name age
1 john 30
2 john 40

那么你进行操作

update table set age = 50 where name = “john”

的时候,就会将两行的数据都进行更改,这两行更改的数据会在同一个 binlog 记录中,这一点会在后面的实现中有体现。

下面,我们给出具体的代码,然后对代码进行分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def desirializeByte(b: (String, Array[Byte])) : (String, String) = {
val binlogEntry = BinlogEntryUtil.serializeToBean(b._2) //将 Array[Byte] 数据转换成 com.meituan.data.binlog.BinlogEntry 类,相关类定义参考附录
val pkeys = binlogEntry.getPrimaryKeys.asScala //获取主键,这里的 asScala 将 Java 的 List 转换为 Scala 的 List
val rowDatas : List[BinlogRow] = binlogEntry.getRowDatas.asScala.toList //获取具体的信息
val strRowDatas = rowDatas.map(a => { //将获取到的具体信息进行转换,这里主要是将没一条信息的内容,转换 [(K1:V1,K2:V2...Kn:Vn)] 的形式,方面后面进行 Json 化
val b = a.getBeforeColumns.asScala //获取 beforColumns
val c = a.getAfterColumns.asScala //获取 afterColumns
val mb = b.map(d => (d._1, d._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值
val mc = c.map(c => (c._1, c._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值
(mb, mc) //返回转换后的 beforeColumns 和 afterColumns
})
//下面利用 json4s 进行 Json 化
(binlogEntry.getEventType, compact("rowdata" -> strRowDatas.map{
w => List("row_data" -> ("before" -> w._1.toMap) ~ ("after" -> w._2.toMap)) //这里的两个 toMap 是必要的,不然里层会变成 List,这个地方比较疑惑的是,
//w._1 按理是 Map类型,为什么还需要强制转换成 Map
//而且用 strRowDatas.foreach(x => println(s"${x._1} ${x._2}")打印的结果表名是 Map
}))</pre>
desirializeByte 函数传入 topic 中的一条记录,返回参数自己确定,我这里为了测试,返回一个 (String, String) 的 Tuple,第一个字段表示该条记录的 EventType(Insert/Update/Delete 等),第二个字段为 Json 化后的数据。
BinlogEntryUtil.serilizeToBean 是一个辅助类,将 binlog 数据转化为一个 Java bean 类。
第 4 行,我们得到表对应的主键,第 5 行获得具体的数据
第 6 行到第 12 行是 Json 化之前的辅助工作,将所有不需要的东西给剔除掉,只留下字段,以及字段对应的值。
第 14, 15 行就是具体的 Json 工作了(使用了 json4s 包进行 Json 化)
这个过程中有一点需要注意的是,在 Json 化的时候,记得为 w._1 和 w._2 加 toMap 操作,不然会变成 List(很奇怪,我将 w._1 和 w._2 打印出来看,都是 Map 类型)或者你可以在第 7,8 行的末尾加上 .toMap 操作。这个我查了 API,进行了实验,暂时怀疑是在和 json4s 组合的时候,出现了问题,有待验证。
利用上述代码,我们可以得到下面这样 Json 化之后的字符串(我进行了排版,程序返回的 Json 串是不换行的)
<pre class="font-size:8 lang:default decode:true">{"rowdata":
[{"row_data":
{"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},
"after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}
}
}]
}

到这里,基本就完成了一种将 binlog 数据 Json 化的代码。

附录代码,由于这些代码是从其他工程里面抠出来的,可能读起来会不顺畅,还请见谅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public static BinlogEntry serializeToBean(byte[] input) {
BinlogEntry binlogEntry = null;
Entry entry = deserializeFromProtoBuf(input);//从 protobuf 反序列化
if(entry != null) {
binlogEntry = serializeToBean(entry);
}
return binlogEntry;
}
public static Entry deserializeFromProtoBuf(byte[] input) {
Entry entry = null;
try {
entry = Entry.parseFrom(input);
//com.alibaba.otter.canal.protocol.CanalEntry#Entry 类的方法,由 protobuf 生成
} catch (InvalidProtocolBufferException var3) {
logger.error("Exception:" + var3);
}
return entry;
}
//将 Entry 解析为一个 bean 类
public static BinlogEntry serializeToBean(Entry entry) {
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception var8) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), var8);
}
BinlogEntry binlogEntry = new BinlogEntry();
String[] logFileNames = entry.getHeader().getLogfileName().split("\\.");
String logFileNo = "000000";
if(logFileNames.length > 1) {
logFileNo = logFileNames[1];
}
binlogEntry.setBinlogFileName(logFileNo);
binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset());
binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime());
binlogEntry.setTableName(entry.getHeader().getTableName());
binlogEntry.setEventType(entry.getHeader().getEventType().toString());
Iterator primaryKeysList = rowChange.getRowDatasList().iterator();
while(primaryKeysList.hasNext()) {
RowData rowData = (RowData)primaryKeysList.next();
BinlogRow row = new BinlogRow(binlogEntry.getEventType());
row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()));
row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()));
binlogEntry.addRowData(row);
}
if(binlogEntry.getRowDatas().size() >= 1) {
BinlogRow primaryKeysList1 = (BinlogRow)binlogEntry.getRowDatas().get(0);
binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1));
} else {
ArrayList primaryKeysList2 = new ArrayList();
binlogEntry.setPrimaryKeys(primaryKeysList2);
}
return binlogEntry;
}
public class BinlogEntry implements Serializable {
private String binlogFileName;
private long binlogOffset;
private long executeTime;
private String tableName;
private String eventType;
private List<String> primaryKeys;
private List<BinlogRow> rowDatas = new ArrayList();
}
public class BinlogRow implements Serializable {
public static final String EVENT_TYPE_INSERT = "INSERT";
public static final String EVENT_TYPE_UPDATE = "UPDATE";
public static final String EVENT_TYPE_DELETE = "DELETE";
private String eventType;
private Map<String, BinlogColumn> beforeColumns;
private Map<String, BinlogColumn> afterColumns;
}
public class BinlogColumn implements Serializable {
private int index;
private String mysqlType;
private String name;
private boolean isKey;
private boolean updated;
private boolean isNull;
private String value;
}

 

Comments