/**
* 创建环境
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
...
/**
* 创建配置 信息
*/
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTable = StreamTableEnvironment.create(env,settings);
//对时间有要求的可以设置时区
//streamTable.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
//根据官网创建 简单的DDL (参考官网)
//表1
String userSql="create table user_info(" +
"u_id BIGINT," +
"money BIGINT" +
"" +
") WITH (" +
"'connector' = 'datagen'," +
"'rows-per-second' = '1'," +
"'fields.u_id.min' = '12'," +
"'fields.u_id.max' = '15'," +
"'fields.money.min' = '1'," +
"'fields.money.max' = '10'" +
")";
//表2
String orderSql="create table order_info(" +
"u_id BIGINT," +
"o_id BIGINT," +
"code BIGINT" +
"" +
") WITH (" +
"'connector' = 'datagen'," +
"'rows-per-second' = '20'," +
"'fields.u_id.min' = '12'," +
"'fields.u_id.max' = '15'," +
"" +
"'fields.o_id.min' = '1'," +
"'fields.o_id.max' = '10'," +
"" +
"'fields.code.kind'='sequence'," +
"'fields.code.start'='1'," +
"'fields.code.end'='1000'" +
")";
//左表状态
streamTable.getConfig().setIdleLeftStateRetention(Duration.ofMillis(10000));
//右表状态
streamTable.getConfig().setIdleRightStateRetention(Duration.ofMillis(5000));
streamTable.executeSql(userSql);
streamTable.executeSql(orderSql);
Table table = streamTable.sqlQuery("select u.u_id,o.u_id,o.code,CURRENT_TIMESTAMP from user_info u left join order_info o on u.u_id=o.u_id");
//转换Retract 输出
DataStream> tuple2DataStream = streamTable.toRetractStream(table, Row.class);
tuple2DataStream.writeAsText("e://log.txt", FileSystem.WriteMode.OVERWRITE);
env.execute();
定位到 参数值
已经获取到left ,right 对应的ttl 状态时间
页面更新:2024-04-25
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号