DataX源码分析十之Transformer源码分析

在 DataX中,还有一个比较核心的组件,就是 Transformer。本章我们将着重分析 Transformer。

继上一篇文章:DataX源码分析九至Task初始化

为啥需要Transformer

DataX源码分析十之Transformer源码分析

简单的 ETL 模型视图


代码初次入口

在这段代码中,完成了自定义 Transformer 的加载、初始化等等。
代码入口:com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#TaskExecutor。

// 加载定义的&内置的Transformer
TransformerUtil.buildTransformerInfo(taskConfig);


DataX 内置 Transformer

在 DataX 中,transformer 有两处来源,一个是内置的。一个是 自己实现的。如果是自己实现的话,套路还是类似插件的那种,通过 transformer.json 来加载的;具体请看下一小节。那么内置的有哪些呢?
代码入口:com/alibaba/datax/core/transport/transformer/TransformerRegistry

// 下面的 transformer,看具体业务使用的场景而决定。
static {
  registTransformer(new SubstrTransformer());
  registTransformer(new PadTransformer());
  registTransformer(new ReplaceTransformer());
  registTransformer(new FilterTransformer());
  registTransformer(new GroovyTransformer());
}

DataX Transformer 分类

public abstract class Transformer {
    /**
     * @param record 行记录,UDF进行record的处理后,更新相应的record
     * @param paras  transformer函数参数
     */
    abstract public Record evaluate(Record record, Object... paras);
}
// 这个其实就是一个 Proxy,
public abstract class ComplexTransformer {
    /**
     * @param record   行记录,UDF进行record的处理后,更新相应的record
     * @param tContext transformer运行的配置项
     * @param paras    transformer函数参数
     */
    abstract public Record evaluate(Record record, Map tContext, Object... paras);
}

通过代码分析,两者都没有啥大的区别。ComplexTransformer 通过 ComplexTransformerProxy 最终实现 transformer 调用。复杂的Transformer 中的 tContext 是为了多传一些参数罢了。


DataX 自定义Transformer 加载&初始化

// 第一步:
// JOB_TRANSFORMER = "transformer"
List tfConfigs = taskConfig.getListConfiguration(CoreConstant.JOB_TRANSFORMER);

// 第二步:校验 dx_groovy 是否只调用了一次
functionName.equals("dx_groovy") && functionNames.contains("dx_groovy")

// 第三步:获取到每个 transformer 的组件的名称
TransformerRegistry.loadTransformerFromLocalStorage(functionNames);

// 第四步:读取指定 transformer 的路径
// CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME="G:datax_0.24dataxlocal_storagetransformer"
String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;

// 第五步:读取指定的 transformer 的配置
Configuration.from(new File(transformerPath + File.separator + "transformer.json"));

// 第六步:通过 transformer.json 读取 ClassName
String className = transformerConfiguration.getString("class");

// 第七步:通过反射初始化 transformer
Class<?> transformerClass = jarLoader.loadClass(className);

// 第八步:注册 transformer。
registedTransformer.put

Transformer 转换

// 将反射生成的 Transformer 再次打包。方便调用
private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
        TransformerInfo transformerInfo = new TransformerInfo();
        transformerInfo.setClassLoader(classLoader);
        transformerInfo.setIsNative(isNative);
        transformerInfo.setTransformer(complexTransformer);
        return transformerInfo;
    }
DataX源码分析十之Transformer源码分析

Transformer 封装,统一转换为 Complex 类型的 Transformer

Transformer 参数信息保存

Integer columnIndex = configuration.getInt(CoreConstant.TRANSFORMER_PARAMETER_COLUMNINDEX);
transformerExecutionParas.setColumnIndex(columnIndex);
List paras = configuration.getList(CoreConstant.TRANSFORMER_PARAMETER_PARAS, String.class);
if (paras != null && paras.size() > 0) {
    transformerExecutionParas.setParas(paras.toArray(new String[0]));
}


总结

  1. 通过分析自定义 Transformer 的加载过程中。我们有一次认识到了 transformer.json 类似这种文件的作用了。如果你自己开发一套插件系统的话,你可以把 xx.json 放在 Resource 目录下。这种,整体的结构会更加简单。
  2. Transformer 的定制化开发,你可以按照 自己的开发插件。如通过 DataX 内部来加载插件方式或者直接在里面改动 DataX 代码来集成。但是,我不推荐直接改代码的方式。还是得要解耦。



关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!

展开阅读全文

页面更新:2024-05-15

标签:初始化   反射   函数   源码   插件   入口   加载   参数   类型   代码   业务

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top