数据集成是指将来自不同数据源的数据整合到一起形成一个统一的数据集。这个过程包括从不同的数据源中收集数据,对数据进行清洗、转换、重构和整合,以便能够在一个统一的数据仓库或数据湖中进行存储和管理。
前面的文章其实讲过 ETL,这里只是再次回顾以下,ETL 中的 E 是extract,数据抽取;T 是 Transform,代表数据的转换;L 代表Load,数据加载。
Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台,每天可稳定高效地同步数百亿数据,并具有 已用于生产近100家公司。
SeaTunnel 工作流程图:
SeaTunnel专注于数据集成和数据同步,主要针对解决数据集成领域的常见问题:
优势
缺点
SeaTunnel 设计的核心是利用设计模式中的“控制翻转”或者叫“依赖注入”,主要概括为以下两点:
再看 SeaTunnel 架构演进的过程,我们现在目前在做的一个事情就是从 v1 到 v2的架构改造和升级。
对于 V1 版本来讲,SeaTunnel 本质上是一个 ETL平台。而 V2 版本则向 ELT 的路线发展。基于整个架构和设计哲学的讨论,我们可以在https://github.com/apache/incubator-seatunnel/issues/1608 看到,如果有兴趣,可以去了解一下 SeaTunnel 架构演进的前世今生。
V1 架构
V2架构
基于这些痛点,我们对 V 2 版本进行了重构。首先,V2 版本有了自己的一套API,也是有了自己的一套数据类型,就可以去开发自己的连接器,而不依赖任何引擎,接入的每一条数据都是 SeaTunnelRow,通过翻译层,把 SeaTunnelRow push 到对应的计算引擎里。
最后做一下总结,进行 V1 和 V2 架构的升级对比,到底我们做了哪些事情。
SeaTunnel Engine性能测试
对比的工具有大家耳熟能详的 DataX,袋鼠云的Chunjun,可能对于Chunjun大家比较陌生,实际上它没改名之前叫 FlinkX,以及最近刚进入 Apache 孵化器的 StreamPark(原名 StreamX)。
下载地址(也可去官网下载):
链接:https://pan.baidu.com/s/1gOFkezOH-OfDcLbUmq6Dhw?pwd=szys
提取码:szys
# jdk包在我下面提供的资源包里,当然你也可以去官网下载。
tar -xf jdk-8u212-linux-x64.tar.gz
# /etc/profile文件中追加如下内容:
echo "export JAVA_HOME=`pwd`/jdk1.8.0_212" >> /etc/profile
echo "export PATH=$JAVA_HOME/bin:$PATH" >> /etc/profile
echo "export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar" >> /etc/profile
# 加载生效
source /etc/profile
export version="2.3.1"
wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"
从 2.2.0-beta 开始,二进制包默认不提供连接器依赖,所以第一次使用时,我们需要执行以下命令来安装连接器: (当然也可以手动下载连接器 [Apache Maven Repository](https://repo.maven.apache.org/maven2/org/apache/seatunnel/ 下载,然后手动移动到连接器目录下的 Seatunnel 子目录)。
# config/plugin_config ,可以修改这个配置指定下载连接器,会下载到这个目录下connectors/seatunnel/
cd apache-seatunnel-incubating-${version}
sh bin/install-plugin.sh 2.3.1
config/v2.batch.conf.template
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {}
}
启动应用程序:
cd "apache-seatunnel-incubating-${version}"
# 连接器:connectors/seatunnel/connector-fake-2.3.1.jar
./bin/seatunnel.sh --config ./config/v2.streaming.conf.template -e local
编辑 config/v2.streaming.conf.template,决定了海隧道启动后数据输入、处理和输出的方式和逻辑。 下面是配置文件的示例,与上面提到的示例应用程序相同。
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {}
}
启动应用程序(Flink 版本之间 1.15.x 和 1.16.x):
cd "apache-seatunnel-incubating-${version}"
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
这里只是演示了官方文档里的简单示例,有兴趣的话,可以去实验其它的数据转换场景。其实转换的思路跟之前的软件都是一样的,有任何疑问欢迎给我留言,后续会更新相关技术类的文章,请小伙伴耐心等待,可关注我的公众号【大数据与云原生技术分享】加群交流或私信交流~
页面更新:2024-03-03
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号