(1)通过FlinkSQL将数据写入mysql demo

FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。

(1)首先引入POM依赖:

    1.13.1    2.12    1.7.30            org.apache.flink        flink-java        ${flink.version}                org.apache.flink        flink-streaming-java_${scala.binary.version}        ${flink.version}            org.apache.flink        flink-clients_${scala.binary.version}        ${flink.version}                org.apache.flink        flink-table-api-java-bridge_${scala.binary.version}        ${flink.version}                    org.apache.flink        flink-connector-jdbc_${scala.binary.version}        ${flink.version}                        org.apache.flink        flink-table-planner-blink_${scala.binary.version}        ${flink.version}                org.apache.flink        flink-streaming-scala_${scala.binary.version}        ${flink.version}                org.apache.flink        flink-table-common        ${flink.version}                org.apache.flink        flink-json        ${flink.version}                    com.fasterxml.jackson.core        jackson-databind        2.12.0                    mysql        mysql-connector-java        8.0.16                com.alibaba        fastjson        1.2.66    

(2)编写代码

public static void main(String[] args) throws Exception {    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    EnvironmentSettings settings = EnvironmentSettings.newInstance()            .inStreamingMode()            //.useOldPlanner() // flink            .useBlinkPlanner() // blink            .build();    StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);    String ddl = "CREATE TABLE flinksinksds(r
" +            "componentname STRING,r
" +            "componentcount INT,r
" +            "componentsum INTr
" +            ") WITH(r
" +            "'connector.type'='jdbc',r
" +            "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +            "'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',r
" +            "'connector.table'='flinksink',r
" +            "'connector.username'='root',r
" +            "'connector.password'='root',r
" +            "'connector.write.flush.max-rows'='1'r
" +            ")";    System.err.println(ddl);    ste.executeSql(ddl);    String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" +            "values('1024', 1 , 2 )";    ste.executeSql(insert);    env.execute();    System.exit(0);}

(3)执行结果:

展开阅读全文

页面更新:2024-05-04

标签:门槛   代码   笔记   朋友   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top