侧边栏壁纸
博主头像
Kom

完成而非完美

  • 累计撰写 9 篇文章
  • 累计创建 13 个标签
  • 累计收到 0 条评论

Flink ClickHouse

Kom
Kom
2023-07-31 / 0 评论 / 0 点赞 / 180 阅读 / 10,066 字
温馨提示:
本文最后更新于 2023-10-06,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

环境配置

  1. Ubuntu 18
  2. JDK 11
  3. Maven 3.9.3
  4. VSCode 最新版
  5. Flink 1.17.1
  6. Clickhouse 21.4.5.46
  7. Flink CDC 2.4.1

Flink 本地开发快速上手

JDK 11 和 JDK17

JDK17主要是为了VSCode java配置要求

  1. sudo apt install openjdk-11-jdk
  2. wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz 下载
  3. tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local/jdk/ 解压
  4. 环境配置,或之后一起配置
    查看java位置

    whereis java

    ll -a /usr/bin/java

    ll -a /etc/alternatives/java

Maven

  1. 解压在 /usr/local/maven
  2. 修改settings.xml
<localRepository>/home/bigdata/maven_repos</localRepository>

<mirror> 
<id>aliyun-maven</id> 
<mirrorOf>*</mirrorOf> 
<name>aliyun maven</name> 
<url>http://maven.aliyun.com/nexus/content/groups/public</url> 
</mirror>

注意:/home/bigdata/maven_repos不要先创建,不然之后运行命令会构建失败

也可以直接sudo apt install maven安装

  1. 配置环境变量
    在/etc/proflie中配置
export JDK17_HOME=/usr/local/jdk/jdk-17.0.7
export JDK11_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export JAVA_HOME=${JDK11_HOME}
export MAVEN_HOME=/usr/local/maven/apache-maven-3.9.3
export PATH=${MAVEN_HOME}/bin:${JAVA_HOME}/bin:$PATH

source /etc/profile 使命令生效

  1. wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz --no-check-certificate
  2. tar -xvzf flink-1.17.1-bin-scala_2.12.tgz -C /usr/local/flink/
  3. cd flink-1.17.1/
  4. 启动集群 ./bin/start-cluster.sh
  5. 测试示例 ./bin/flink run examples/streaming/WordCount.jar
  6. 查看效果 tail log/flink-*-taskexecutor-*.out
  7. 停止集群 ./bin/stop-cluster.sh

远程开发

配置Frp

  • frps:
[common]
bind_port = 7000
vhost_http_port = 50070
token = xxxxx

dashboard_port = xxxx
dashboard_user = xxxx
dashboard_pwd = xxxx
enable_prometheus = true

log_file = /var/log/frps.log
log_level = info
log_max_days = 3
  • frpc
[common]
server_addr = 域名
server_port = 7000
tls_enable = true

token = xxxxx

[ssh]
type = tcp
local_ip = 127.0.0.1
local_port = 22
remote_port = xxxxx

[flink]
type = http
local_ip = 127.0.0.1
local_port = xxxx

custom_domains = xxxx
  • 后台启动 ./frpc -c frpc.ini &
  • 停止 ps -aux|grep frp| grep -v grep kill 进程

配置VSCode

然后这里我选择使用VSCode远程开发

下载Remote Development 拓展,然后连接即可进入远程环境,记得下载相应拓展,因为远程是没有本地的拓展的,需要重新下载

Flink初探

基于官方命令构建反欺诈检测系统

官方文档:

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/

  1. 生成工程文件
    在用户目录下创建一个workspace,然后在workspace下运行以下命令
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.17.1 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

会自动生成frauddetection工程文件,用tree生成项目层级目录

.
├── pom.xml   // 依赖管理文件
└── src
    └── main
        ├── java    // 相关java代码
        │   └── spendreport
        │       ├── FraudDetectionJob.java
        │       └── FraudDetector.java
        └── resources    // 相关配置
            └── log4j2.properties
  1. Flink编程模型
    大数据通用:对接数据源 --> 使用引擎进行业务逻辑处理 --> 结果写到某个地方去

    Flink: Source --> Transformation --> Sink
  2. 在 IDE 中运行项目可能会导致 java.lang.NoClassDefFoundError 异常。这可能是因为您没有将所有必需的 Flink 依赖项隐式加载到类路径中。
    解决办法:
  • IntelliJ IDEA: Go to Run > Edit Configurations > Modify options > Select include dependencies with "Provided" scope.
  • VSCode:将所有"Provided" scope注释掉。让他们能在运行时被编译。
    注: "Provided" scope意味着打包的时候不会包含。
  1. 启用webUI
    vim /flink-1.17.1/conf/flink-conf.yaml

    修改这行为rest.bind-address: 0.0.0.0
  2. 额外知识点:
  • CDC:实时抓取数据库日志文件,用于异构数据库同步。
  • binlog:MySQL集群同步
  • OLAP:侧重于查询
  • OLTP:侧重于事务,两者很难兼顾

flinkCDC快速上手

Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: One or more required options are missing.

Missing required options are:

slot.name

官方文档有问题,正在修正。应该在pg建表的时候增加'slot.name' = 'flink'

CREATE TABLE shipments (
   shipment_id INT,
   order_id INT,
   origin STRING,
   destination STRING,
   is_arrived BOOLEAN,
   PRIMARY KEY (shipment_id) NOT ENFORCED
 ) WITH (
   'connector' = 'postgres-cdc',
   'hostname' = 'localhost',
   'port' = '5432',
   'username' = 'postgres',
   'password' = 'postgres',
   'database-name' = 'postgres',
   'schema-name' = 'public',
   'table-name' = 'shipments',
   'slot.name' = 'flink'
 );

修改之后运行仍然报错:

[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block data

暂时用这个方法解决:(后注:不需要改,应该还是时间的原因)

类加载顺序问题,flink默认是child-first
在flink的flink-conf.yaml文件中添加
classloader.resolve-order: parent-first 
重启集群即可。

sql确实执行成功了,但是job执行未成功,kibana里面也没有找到数据

问题已解决,咨询了官方群的大佬,原来是MySQL时间和ip地址?的问题,排查过程:

建表之后运行select * from products;报错:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

确定是时间问题:

修改时间vim /etc/timezone

将内容改为UTC

备注: 校准时间用timedatectl

pom文件

主pom文件

// 依赖管理
<dependencyManagement>
	<dependencies>
		<dependency>
		
		</dependency>
	</dependencies>
</dependencyManagement>
// 插件管理
// 打包实际上需要插件来进行,不然jar包里只会包含代码,不包含依赖
<build>
	<plugins>
		<plugin>
			
		</plugin>
	</plugins>
	<pluginManagement>
		<plugins>
			<plugin>
			
			</plugin>
		</plugins>
 	</pluginManagement>
</build>

次级pom文件(不需要版本)

<dependencies>
	<dependency>
			<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
        </dependency>
	</dependency>
</dependencies>
<build>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
		</plugin>

		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
		</plugin>

	</plugins>
</build>

注意点:

  • versions-maven-plugin来管理jar版本,将他放到父模块。
    • 统一设置版本mvn versions:set -DnewVersion=1.2
    • 退回版本mvn versions:revert
    • 提交修改mvn versions:commit
  • 两种CDC包的使用
    • 使用flink-sql-connector-mysql-cdc更好,可以直接放在flink lib目录下,而不用打包。
    • flink-connector-mysql-cdc包务必要打包,不要使用provided scope

测试

nc -lk 9527

这个命令是在本地启动一个TCP服务器,监听9627端口,并将接收到的数据输出到控制台。nc是netcat的缩写,-l选项表示监听模式,-k选项表示保持监听,不断开连接。

流批区别

批处理与流处理代码不同:

  • groupBykeyBy
  • 批处理不需要env.execute

大数据结构

基本都是主从结构Master-Slave结构

JobManager(HA高可用性)-TaskManager

注意事项

  1. 连接MySQL必须导入flink连接依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.17.1</version>
</dependency>
  1. 找不到主类错误:
Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: Error in serialization.

缺少依赖,需要table api依赖:

<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-table-runtime</artifactId>
     <version>${flink.version}</version>
</dependency>

注意: 记得在模块的pom文件里也加上

Checkpoint

Flink中的Checkpoint用于实现作业的故障恢复和状态持久化,非常重要。关于Checkpoint可以总结如下几点:

  1. Checkpoint用于保存作业运行状态,以便失败后可以恢复。它会定期保存状态快照。

  2. 触发Checkpoint可以是定时触发,也可以是处理某些记录数触发。需要设置checkpoint间隔参数来控制。

  3. Checkpoint会产生一定的性能开销,需要平衡故障恢复需求和性能开销。

  4. 保存checkpoint的状态backend可以是内存级的,也可以是持久化的,如保存到HDFS等。

  5. 开启checkpoint后,需要设置state.backend参数来配置状态存储位置。

  6. 对于实时流作业,可以启用incremental checkpoint来优化性能。

  7. exactly-once语义需要开启checkpoint并将sink设置为支持幂等写入。

  8. 设置checkpoint超时时间,如果checkpoint长时间未完成,会强制触发超时退出。

  9. 重启作业时从最新的checkpoint恢复状态。

exactly-once:

Flink 中的 exactly-once 表示 end-to-end 的精确一次语义,是 Flink 提供的最强的数据一致性保证。

它的工作原理可以概括为:

  1. 基于 checkpoint 实现atleast-once(至少一次)语义
    Flink 的 checkpoint 机制可以在失败时恢复状态和数据,确保每条数据至少被处理一次。

  2. 幂等sink来避免重复写入
    通过让 sink 更新支持幂等操作,可以避免从 checkpoint 恢复时重复写入导致的数据重复。

  3. 两者结合实现 exactly-once
    checkpoint + 幂等sink 结合可以保证每条数据既不会丢失,也不会重复处理,即实现了精确一次语义。

  4. 对不同源的支持
    Flink 为 Kafka、RabbitMQ 等源提供了支持 exactly-once 的内置连接器。

  5. 总结:
    通过 checkpoint、幂等 sink、idempotent source 三者的配合,Flink 作业可以实现端到端精确一次的语义,使得流式处理结果和批处理一样精确。这是 Flink 的重要优势之一。

时区错误

Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone UTC. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

检查当前时间是否同步timedatectl

Local time: Tue 2023-07-18 11:25:51 CST
Universal time: Tue 2023-07-18 03:25:51 UTC
RTC time: Tue 2023-07-18 03:25:51
Time zone: Asia/Shanghai (CST, +0800)
System clock synchronized: yes
systemd-timesyncd.service active: yes
RTC in local TZ: no

解释:

  • 系统本地时间为CST 中国标准时间
  • 硬件RTC时间为UTC时间
  • 系统时钟已与网络时间同步
  • RTC时钟未使用本地时区

用这个命令修改:timedatectl set-timezone UTC

注: 尽量使用timedatectl来修改时间,而不是修改/etc/timezone文件,因为timedatectl会自动修改多个配置文件。

额外知识点

  1. 可以选择自定义反序列化器
  2. 可以设置savepoint实现断点续传

Flink CDC采集MySQL binlog日志实时写入ClickHouse

读取从库binlog

报错

Caused by: java.net.ConnectException: Connection refused: connect

clickhouse 远程配置没开启

vim /etc/clickhouse-server/config.xml

把注释掉的<listen_host>::</listen_host>取消注释,然后重启服务:

service clickhouse-server restart

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

缺少权限,赋予权限 grant replication slave,replication client on *.* to 'bigdata'@'%';

java.lang.AbstractMethodError: Receiver class com.example.demo.CustomerDebeziumDeserializationSchema does not define or inherit an implementation of the resolved method 'abstract void deserialize(com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord, org.apache.flink.util.Collector)' of interface com.ververica.cdc.debezium.DebeziumDeserializationSchema.

知识点

  1. MySQL行设置为ROW模式,现在貌似都是默认ROW模式了
  2. 建表:
CREATE TABLE t_demo (
  id Int32, 
  name String,
  birthday Date32,
  ts DateTime64
) ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMM(birthday)
ORDER BY id
CREATE TABLE t_demo (
  id Int32,
  name String,
  birthday String
) ENGINE = ReplacingMergeTree(id) 
order by id;
  1. 表引擎合并是延时的
    optimize table t_demo立刻合并排序键值相同的重复项

    注: 要在查询时就看合并效果可以在查询语句后面加final

  2. 官网中文版缺乏大量资料,包括clickhouse-jdbc的使用教程,资料还是需要优先英文。

  3. Nullable(String)类型 可传入空字符串null

  4. 反序列器

  • StringDebeziumDeserializationSchema()最简单
  • JsonDebeziumDeserializationSchema教程中使用,简单的封装了一下,信息还是比较多
  • 还是要自己自定义反序列器
  1. ClickHouseSink:继承 RichSinkFunction,重写sink。因为是异构数据库,且官方未提供clickhouse sink。

  2. hikaricp连接池,官网示例里建议使用这个来管理连接(暂未使用)

  3. sink建议并行度:1。技术主管说是因为要避免乱序,但是根据官方群里的回复,一开始全量阶段,多并行度会增加同步效率,基本不需要考虑顺序;增量阶段,都是一个并行度,结合flink的ckp,有序输出。所以设置为多并行度也没关系。

  4. 更新操作,有多种方案:

  • 直接用插入语句,然后等待表引擎自动合并。不行,因为clickhouse要实时数据分析展示,这样旧数据会影响展示。
  • 直接用更新语句。在table api中,读取更新是两条日志-U +U,不方便 (另外,UPDATE 在 MergeTree 引擎中是有限制的,例如无法更新数据表中的主键列。万一遇到有人修改主键的情况,就出问题了。)
  • 先删除、再插入。 现在使用的是这个,代码实现也相当简单。
  1. Mutation操作执行是一个异步的过程,ALTER语句的变种。clickhouse无法实现真正意义上的更新和删除。

未解决的问题

  1. 无法传递时间类型LocalDateTime到clickhouse的datatime64里
  2. 在重写的invoke方法里死活写不了Integer id = data.getJSONObject("after").getInteger("birthday");这样获取嵌套json里的数据的语句,birthday一直获取不了,显示为null,但是after是可以获取的。原因不明,现在在batchInsert里获取,也算勉强能用
  3. 有时候表引擎手动合并莫名没反应,且通过jdbc写入的和在client里面写入的数据貌似没有合并;
0

评论区