在 DataX中,还有一个比较核心的组件,就是 Transformer。本章我们将着重分析 Transformer。
继上一篇文章:DataX源码分析九至Task初始化
简单的 ETL 模型视图
在这段代码中,完成了自定义 Transformer 的加载、初始化等等。
代码入口:com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#TaskExecutor。
// 加载定义的&内置的Transformer
TransformerUtil.buildTransformerInfo(taskConfig);
在 DataX 中,transformer 有两处来源,一个是内置的。一个是 自己实现的。如果是自己实现的话,套路还是类似插件的那种,通过 transformer.json 来加载的;具体请看下一小节。那么内置的有哪些呢?
代码入口:com/alibaba/datax/core/transport/transformer/TransformerRegistry
// 下面的 transformer,看具体业务使用的场景而决定。
static {
registTransformer(new SubstrTransformer());
registTransformer(new PadTransformer());
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());
}
public abstract class Transformer {
/**
* @param record 行记录,UDF进行record的处理后,更新相应的record
* @param paras transformer函数参数
*/
abstract public Record evaluate(Record record, Object... paras);
}
// 这个其实就是一个 Proxy,
public abstract class ComplexTransformer {
/**
* @param record 行记录,UDF进行record的处理后,更新相应的record
* @param tContext transformer运行的配置项
* @param paras transformer函数参数
*/
abstract public Record evaluate(Record record, Map tContext, Object... paras);
}
通过代码分析,两者都没有啥大的区别。ComplexTransformer 通过 ComplexTransformerProxy 最终实现 transformer 调用。复杂的Transformer 中的 tContext 是为了多传一些参数罢了。
// 第一步:
// JOB_TRANSFORMER = "transformer"
List tfConfigs = taskConfig.getListConfiguration(CoreConstant.JOB_TRANSFORMER);
// 第二步:校验 dx_groovy 是否只调用了一次
functionName.equals("dx_groovy") && functionNames.contains("dx_groovy")
// 第三步:获取到每个 transformer 的组件的名称
TransformerRegistry.loadTransformerFromLocalStorage(functionNames);
// 第四步:读取指定 transformer 的路径
// CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME="G:datax_0.24dataxlocal_storagetransformer"
String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;
// 第五步:读取指定的 transformer 的配置
Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
// 第六步:通过 transformer.json 读取 ClassName
String className = transformerConfiguration.getString("class");
// 第七步:通过反射初始化 transformer
Class<?> transformerClass = jarLoader.loadClass(className);
// 第八步:注册 transformer。
registedTransformer.put
// 将反射生成的 Transformer 再次打包。方便调用
private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
TransformerInfo transformerInfo = new TransformerInfo();
transformerInfo.setClassLoader(classLoader);
transformerInfo.setIsNative(isNative);
transformerInfo.setTransformer(complexTransformer);
return transformerInfo;
}
Transformer 封装,统一转换为 Complex 类型的 Transformer
Integer columnIndex = configuration.getInt(CoreConstant.TRANSFORMER_PARAMETER_COLUMNINDEX);
transformerExecutionParas.setColumnIndex(columnIndex);
List paras = configuration.getList(CoreConstant.TRANSFORMER_PARAMETER_PARAS, String.class);
if (paras != null && paras.size() > 0) {
transformerExecutionParas.setParas(paras.toArray(new String[0]));
}
关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!
页面更新:2024-05-15
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号