使用Apache Flink对物联网实时数据分析

1、前言
最近项目涉及物联网设备实时数据、采集、传输、展示,以及大数据预警报警等功能(thingsBoard+kafka+MongoDB)。统计分析模块采用的传统的基于数据库的统计分析,对于海量实时数据统计分析性能和响应速度压力巨大,要实现低延迟的实时计算和秒级多维实时查询有技术挑战。
Apache Flink 在数据分析领域中应用广泛,其实时处理能力以及运算速度都能满足大规模数据处理的需求,可以与物联网技术结合,实现对海量传感器数据的实时分析和管理。而且社区活跃技术方面也不断的创新和优化,支持许多流行数据源,如Kafka、Hadoop HDFS、ES等,具有丰富的生态系统,Flink社区也提供了丰富的工具和库。这里主要介绍基本概念,和一个实际的示例。

2、基本概念

(1)、定义:

Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。

(2)、Flink架构:

Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。

对于分布式执行,Flink 将算子的 subtasks 链接tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。


(3)流处理:

在自然环境中,数据的产生原本就是流式的。无论是来自 Web 服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕 有界流bounded)或 无界流unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。


批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。

核心的API,从开发步骤的角度来讲,主要分为四大部分Environment、Source、Transform、Sink,flink执行过程(env -> source -> transform -> sink)。

Environment,Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。

流式处理环境获取:

        StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);

批处理环境:



val env = ExecutionEnvironment.getExecutionEnvironment

Source, Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源。数据源类型很多常用的包括集合类型(数据临时存储到内存中,形成特殊的数据结构后),文件中读取数据,Kafka中读取数据,用户可以自定义数据源。

Transform 算子,转换算子,把当前的DataStream转化为另一个DataStream。数据处理的核心,有很多种算子如下图:

最常用的map、flatMap、Filter、keyBy、aggregation、reduce等算子,这些算子具体作用基本上看英文名字就知道了,使用方式很灵活支持拉姆达表达式。

KeyBy

KeyBy 算子将DataStream转换成一个KeyedStream。KeyedStream是一种特殊的DataStream,事实上,KeyedStream继承了DataStream,DataStream的各元素随机分布在各Task Slot(任务槽)中,KeyedStream的各元素按照Key分组,分配到各Task Slot中。我们需要向keyBy算子传递一个参数,以告知Flink以什么字段作为Key进行分组。

stream.keyBy(0)
或者
stream.keyBy(new KeySelector() {
                     @Override
                     public String getKey(String x) throws Exception {
                         return x.*****();
                     }
                 })

aggregation,常见的聚合操作有sum、max、min等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟keyBy相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。与批处理不同,这些聚合函数是对流数据进行聚合,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。sum算子的功能对该字段进行加和,并将结果保存在该字段上。min操作无法确定其他字段的数值。

stream.keyBy(0).sum(1).print() //0字段分组,1字段求和

reduce 在按照同一个Key分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。

Sink其实可以表示为将处理完成数据进行存储,或者将处理完的数据发送到指定的存储系统(比如Oracle、Kafka等)(官方提供了一部分的框架的sink,用户可自定义实现自己的sink)。

(4)、有状态的数据操作(Stateful Operations)

在流处理中,有些操作仅仅在某一时间针对单一事件(如事件转换map),有些操作需要记住多个事件的信息并进行处理(window operators),后者的这些操作称为有状态的操作。有状态的操作一般被维护在内置的key/value存储中。这些状态信息会跟数据流一起分区并且分布存储,并且可以通过有状态的数据操作来访问。因此这些key/value的状态信息仅在带key的数据流(通过keyBy() 函数处理过)中才能访问到。数据流按照key排列能保证所有的状态更新都是本地操作,保证一致性且无事务问题。同时这种排列方式使Flink能够透明的再分发状态信息和调整数据流分区。

(5)、窗口

我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析。

Flink 有一些内置的窗口分配器,如下:

Fixed windows(固定窗口):在 Flink 中被也称为 Tumbling windows(滚动窗口),将时间切割成具有固定时间长度的段。滚动窗口之间不会重叠。

Sliding windows(滑动窗口):滑动窗口是滚动窗口更一般化的表现的形式,由窗口大小和滑动间隔这两个属性来定义。如果滑动间隔小于窗口大小,那么不同的窗口之间就会存在重叠;如果滑动间隔大于窗口大小,不同窗口之间就会存在间隔;如果滑动间隔等于窗口大小,就相当于滚动窗口。

Session Windows(会话窗口):和滚动窗口与滑动窗口不同的是,会话窗口并没有固定的窗口大小;它是一种动态窗口,通常由超时间隔(timeout gap)来定义。当超过一段时间没有新的事件到达,则可以认为窗口关闭了。

有三种最基本的操作窗口内的事件的选项: 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。

stream
    .keyBy()
    .window()
    .reduce|aggregate|process();
示例代码:
DataStream input = ...;


input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new MyWastefulMax());


public static class MyWastefulMax extends ProcessWindowFunction<
        SensorReading,                  // 输入类型
        Tuple3,  // 输出类型
        String,                         // 键类型
        TimeWindow> {                   // 窗口类型


    @Override
    public void process(
            String key,
            Context context,
            Iterable events,
            Collector> out) {


        int max = 0;
        for (SensorReading event : events) {
            max = Math.max(event.value, max);
        }
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

(6)、Watermark

怎么确定一个窗口是否已经结束,这在流式数据处理系统中并非一个很容易解决的问题。如果窗口是基于处理时间的,那么问题确实容易解决,因为处理时间是完全基于本地时钟的;但是如果窗口基于事件时间,由于分布式系统中消息可能存在延迟、乱序到达的问题,即便系统已经接收到窗口边界以外的数据了,也不能确定前面的所有数据都已经到达了。

(7)、Checkpoint

Flink 的 Checkpoint 机制是其可靠性的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport(分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

3、安装部署

(1)、下载:

官网(archive.apache.org/dist/flink/),flink-1.9.3-bin-scala_2.12.tgz下载后解压到硬盘目录,

源码地址:github.com/apache/flink

(2)、运行(需要安装java运行环境)

只需要进入到解压目录的bin目录下,运行start-cluster.bat


(3)、访问UI


(4)、运行示例:

命令行输入:

flink.bat run D:flinkexamplesbatchWordCount.jar -input D:flinkREADME.txt -output D:f
linkREADME_CountWord_Result.txt

可以在管理界面查看任务运行情况


4、应用示例

创建流处理环境,创建一个Java maven项目,pom文件中引入需要包

pom文件:

<?xml version="1.0" encoding="UTF-8"?>



  4.0.0


  org.example
  mvntest
  1.0-SNAPSHOT


  mvntest
  
  http://www.example.com


  
    UTF-8
    1.7
    1.7
  


  
    
      junit
      junit
      4.11
      test
    
    
      org.apache.flink
      flink-connector-kafka_2.12
      1.9.3
      test
    


    
      org.apache.flink
      flink-java
      1.9.3
    


    
      org.apache.flink
      flink-scala_2.12
      1.9.3
    


    
      org.apache.flink
      flink-clients_2.12
      1.9.3
    
    
    
      org.apache.flink
      flink-streaming-scala_2.12
      1.9.3
      provided
    
    
      org.apache.flink
      flink-connector-kafka_2.12
      1.9.3
      compile
    


  


  
    
      
        
        
          maven-clean-plugin
          3.1.0
        
        
        
          maven-resources-plugin
          3.0.2
        
        
          maven-compiler-plugin
          3.8.0
        
        
          maven-surefire-plugin
          2.22.1
        
        
          maven-jar-plugin
          3.0.2
        
        
          maven-install-plugin
          2.5.2
        
        
          maven-deploy-plugin
          2.8.2
        
        
        
          maven-site-plugin
          3.7.1
        
        
          maven-project-info-reports-plugin
          3.0.0
        
      
    
    
      
        org.apache.maven.plugins
        maven-compiler-plugin
        
          8
          8
        
      
    
  


创建kafaka数据源

 StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);


        Properties properties=new Properties();
        //kafka 连接信息
        properties.setProperty("bootstrap.servers","11.11.160.158:9092");
        properties.setProperty("group.id", "kafka-group");
        DataStream stream = env
                .addSource(new FlinkKafkaConsumer<>("kafkatopic", new SimpleStringSchema(), properties));


数据处理:

 //分组
 stream.keyBy(new KeySelector() {
                     @Override
                     public String getKey(String x) throws Exception {
                         return *******;
                     }
                 })
                 //窗口
                 //.window(TumblingEventTimeWindows.of(Time.minutes(1)))
                 .timeWindow(Time.seconds(30))
                 //数据处理
                 .process(new MyProcessHandler())
                 //自定义数据输出
                 .addSink(new MySinkFunction()).name("sinktest");


public static class MyProcessHandler extends ProcessWindowFunction<
            String,                  // input type
            Tuple3,  // output type
            String,                         // key type
            TimeWindow> {                   // window type




        


        @Override
        public void process(String key, Context context, Iterable events, Collector> out) throws Exception {


            ********
            out.collect(Tuple3.of(key.toString(), context.window().getEnd(), max));
        }


        @Override
        public void clear(Context context) throws Exception {
            super.clear(context);
        }


    }
 public static class MySinkFunction extends RichSinkFunction> {




        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
        }


        @Override
        public void invoke(Tuple3 value, Context context) throws Exception {
             *****
            System.out.println(value.f1);
            System.out.println(context.currentProcessingTime());
            System.out.println("-------------------end");
        }
    }

上传jar包运行任务

java命令窗口可以查看输出,也可以通过管理UI查看任务运行情况。

5、总结

Flink功能强大,性能优秀,在众多场景下都能发挥出巨大价值,尤其是在处理大规模数据、实时分析、机器学习等领域具有广泛的应用和前景。

展开阅读全文

页面更新:2024-04-23

标签:实时   数据   算子   数据流   数据源   字段   窗口   状态   事件   操作

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top