跳到主要内容

基于触发器实现 Sap Hana 实时数据同步

简介

SAP HANA(全称 SAP High-performance Analytic Appliance)是由 SAP 开发的一款内置列式数据库的系统平台,除内置数据库以外,还具有高级分析功能(例如预测分析、空间数据处理、文本分析、文本搜索、流分析、图形数据处理),ETL 功能,并内置了应用程序服务器,本文提到的 SAP HANA 特指数据平台内置的数据库管理系统。

SAP HANA 是内存数据库系统,可以把系统所有的数据都载入内存中,因此,与传统的将数据存储在硬盘上的数据库相比,HANA 的性能可以提升 10~10,000 倍。一般 SAP HANA 内置在 SAP ERP 系统中作为整体提供服务,在制造业应用广泛。

现如今企业都会建立内部的统一数据分析平台,SAP HANA 保存了ERP相关数据,如何实时同步 SAP HANA 的数据到数据平台,一直是困扰企业用户的问题。

整体流程

实现触发器同步的整体流程如下:

  1. 安装触发器,通过触发器捕获增量变更数据
  2. 记录位点,记录增量数据数据同步的起点
  3. 执行全量数据同步
  4. 执行增量数据同步

触发器安装

触发器是一种自动触发执行的存储过程,它可以在数据变更前执行也可以在数据变更后执行,因为本质也是存储过程,所以存储过程支持的操作触发器均支持。

不同数据库对触发器的支持程度不同,Hana 的触发器支持监听 I(新增)、U(更新)、(删除) 三种事件,因此数据的所有变更都可以通过触发器捕获。

安装触发器的方式与创建存储过程类似,即通过执行 SQL 创建触发器。

通过触发器实现增量数据同步,需要触发器捕获数据的变更事件并写入 增量CDC数据表, 下面是触发器执行的整体流程:

hana_sync_01.png

触发器安装示例

下面是安装触发器的示例脚本

CREATE OR REPLACE TRIGGER "MY_SCHEMA"."SYM_ON_I_FOR_TST1_CRP" AFTER INSERT ON "MY_SCHEMA"."TEST1" 
REFERENCING NEW ROW NEW FOR EACH ROW
BEGIN
DECLARE NEW_KEY_SUMMARY CLOB := '"COL1":' || CASE WHEN :NEW."COL1" IS NULL THEN 'null' ELSE CONCAT(CONCAT('"',REPLACE(REPLACE(:NEW."COL1",'\','\\'),'"','\"')),'"') END;
DECLARE NEW_COL_SUMMARY CLOB := '"COL1":' || CASE WHEN :NEW."COL1" IS NULL THEN 'null' ELSE CONCAT(CONCAT('"',REPLACE(REPLACE(:NEW."COL1",'\','\\'),'"','\"')),'"') END || ',' || '"COL0":' || CASE WHEN :NEW."COL0" IS NULL THEN 'null' ELSE CONCAT(CONCAT('"',REPLACE(REPLACE(:NEW."COL0",'\','\\'),'"','\"')),'"') END;
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;

IF 1=1 THEN
INSERT INTO "SYSTEM"."CLOUD_CANAL_TRIGGER_DATA" (CATALOG_NAME, SCHEMA_NAME, TABLE_NAME, EVENT_TYPE, TRIGGER_ID, PK_DATA, ROW_DATA, TRANSACTION_ID, EXTERNAL_DATA, CREATE_TIME)
VALUES(
'SYSTEMDB',
'MY_SCHEMA',
'TEST1',
'I',
1,
'{' || NEW_KEY_SUMMARY || '}',
'{' || NEW_COL_SUMMARY || '}',
CURRENT_UPDATE_TRANSACTION(),
'',
CURRENT_UTCTIMESTAMP
);
END IF;

END;

安装好触发器,下面介绍 增量CDC数据表 的具体结构设计

CDC 增量表

增量CDC数据表结构如下:

CREATE COLUMN TABLE "SYSTEM"."CLOUD_CANAL_TRIGGER_DATA" ("DATA_ID" BIGINT GENERATED BY DEFAULT AS IDENTITY (
START WITH 1 INCREMENT BY 1) NOT NULL ,
"CATALOG_NAME" VARCHAR(255) NULL ,
"SCHEMA_NAME" VARCHAR(255) NOT NULL ,
"TABLE_NAME" VARCHAR(255) NOT NULL ,
"EVENT_TYPE" VARCHAR(20) NOT NULL ,
"ROW_DATA" CLOB,
"PK_DATA" CLOB,
"OLD_DATA" CLOB,
"TRIGGER_ID" INTEGER NOT NULL ,
"EXTERNAL_DATA" VARCHAR(50),
"CREATE_TIME" LONGDATE
);
CREATE UNIQUE CPBTREE INDEX "CLOUD_CANAL_IDX_D_ID" ON
"SYSTEM"."CLOUD_CANAL_TRIGGER_DATA" ( "DATA_ID" ASC);
CREATE INDEX CLOUD_CANAL_TRIGGER_DATA_CREATE_TIME_IDX ON "SYSTEM".CLOUD_CANAL_TRIGGER_DATA (CREATE_TIME);

增量CDC数据表记录了每次变更数据的

  • 数据库(CATALOG_NAME)
  • 模式名称(SCHEMA_NAME)
  • 表名称(TABLE_NAME)
  • 事件类型(EVENT_TYPE)
  • 变更前的数据镜像(ROW_DATA)
  • 变更前主键数据(PK_DATA)
  • 变更后的数据镜像(OLD_DATA)
  • 触发器ID(TRIGGER_ID)
  • 变更时间(CREATE_TIME)

另外,需要注意的是使用 自增ID 作为主键,同时记录 创建时间

自增ID(DATA_ID)可以唯一标识数据变更事件并确保有序,创建时间(CREATE_TIME)作为数据变更事件的时间戳,记录数据变更发生时间。

扫描 CDC 增量表

扫描 CDC 增量表 需要做到不重复扫、不漏数据、顺序扫描。

保证顺序的方式是通过自增ID排序,即 ORDER BY DATA_ID ASC,通过这个方式相当于对全局的变更事件进行编号,基于编号进行扫描和消费,可确保不重。但只做到这一点还不够,会出现丢数据情况,数据丢失的原因如图所示:

hana_sync_02.png

当查询的语句执行时,可能有部分事务没有 COMMIT,导致漏扫,这种问题如何解决?

首先想到的可能是等待一段时间,但是等待多久合适也是问题,时间长了延迟高,时间短了丢数据,而且当事务出现回滚时,自增序列会出现缺失,缺失的原因是事务没有回滚前占用了自增 ID 生成的序号,事务回滚后占用的序号也不会被重复使用。遇到自增 ID 序号缺失的情况,通过等待一段时间方式,只能每次都等待最大超时时间,会导致同步延迟增大。

这个问题的关键点是确定占用自增 ID 的事务是否还在活跃状态。

所以在扫描 增量CDC数据表 时遇到某个数据 ID 缺失的情况,会尝试插入一条相同 ID 的数据,通过唯一键来判断这个 ID 的数据是否被占用,如果出现异常,则重新查询;如没有异常,则会写入数据占用这个 ID,因为这个 ID 的数据已经被填充,因此也不用担心这个 ID 的数据被漏扫,可以继续读取大于这个 ID 数据。

位点管理

位点用于管理数据同步的进度,记录哪些变更事件已经同步、哪些变更事件没有同步。

基于触发器的数据同步方案选择 CDC 增量表的自增 ID 及时间戳作为位点,自增 ID 可以精确定位到每个数据变更事件,时间戳可以方便以用户视角感知同步任务的延迟情况。

且当捕获的变更事件成功写入目标端后,才去更新位点。

FQA

如何判断同步延迟?

Hana 源端增量同步使用位点(增量表自增 ID)来判断延迟,当位点向前推进时可准确获取延迟,但若无变更事件导致位点不更新,延迟会持续增大,实际上并未发生延迟。

可以通过查询增量表来判断是否存在延迟,具体逻辑为:

  • 若存在数据,系统根据增量数据的时间戳计算延迟。
  • 若无数据,任务获取当前时间发送心跳事件,并根据心跳上的时间戳计算延迟。