Flume采集Kafka数据存储到Hive

需求背景

将 Kafka 中的数据通过 Flume 收集并存储到 Hive 里。

环境准备

这里是使用 Apache 社区版部署的环境,而非 CDH、HDP 等方式,可以先参考官方文档搭建好环境。

配置 Hive

  1. 修改hive-site.xml文件,添加以下内容。

    hive.txn.manager
    org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    
        Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive
        transactions, which also requires appropriate settings for hive.compactor.initiator.on,
        hive.compactor.worker.threads, hive.support.concurrency (true),
        and hive.exec.dynamic.partition.mode (nonstrict).
        The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides
        no transactions.
    


    hive.support.concurrency
    true
    
        Whether Hive supports concurrency control or not.
        A ZooKeeper instance must be up and running when using zookeeper Hive lock manager
    


    hive.metastore.uris
    thrift://localhost:9083
    Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
  1. 创建hive数据库和表,需要注意,建表时需要分区、分桶、事务 。
CREATE DATABASE test_db;

CREATE TABLE `test_kafka_to_hive`(
  `time` string,
  `type` string, 
  `value` string,
  `code` string)
PARTITIONED BY ( `partition_time` string)
CLUSTERED BY ( `time`) 
INTO 2 BUCKETS  stored AS ORC 
TBLPROPERTIES ('transactional'='true');

配置 Flume

  1. 新建配置文件kafka2hive.conf,内容如下,更多相关的配置可以参考官方文档kafka-source和hive-sink。
  #创建source、channel、sink
a.sources=kafka_source
a.channels=memory_channel
a.sinks=hive_sink

#kafka为souce的配置
a.sources.kafka_source.type=org.apache.flume.source.kafka.KafkaSource
a.sources.kafka_source.zookeeperConnect=localhost:2181
a.sources.kafka_source.bootstrap.servers=localhost:9092
a.sources.kafka_source.topic=processed-realtimeData
a.sources.kafka_source.channels=memory_channel
a.sources.kafka_source.consumer.timeout.ms=1000
a.sources.kafka_source.batchSize=10

#hive为sink的配置
a.sinks.hive_sink.type=hive
a.sinks.hive_sink.hive.metastore=thrift://host_hive_metadatastore:9083
a.sinks.hive_sink.hive.database=test_db
a.sinks.hive_sink.hive.table=test_kafka_to_hive
a.sinks.hive_sink.hive.partition=%Y-%m-%d
a.sinks.hive_sink.hive.txnsPerBatchAsk=2
a.sinks.hive_sink.batchSize=1
a.sinks.hive_sink.serializer=JSON
a.sinks.hive_sink.serializer.fieldnames=time,type,value,code

#channel的配置
a.channels.memory_channel.type=com.my.flume.channel.JsonParsedMemoryChannel
a.channels.memory_channel.capacity=1000
a.channels.memory_channel.transactionCapacity=100

#三者之间的关系
a.sources.kafka_source.channels=memory_channel
a.sinks.hive_sink.channel=memory_channel
  1. 拷贝Hadoop的配置文件core-site.xmlhdfs-site.xml到Flume的conf目录里,编辑core-site.xml文件,添加以下内容。

   fs.hdfs.impl
   org.apache.hadoop.hdfs.DistributedFileSystem
  1. 运行Flume
./bin/flume-ng agent -c ./conf -f ./conf/kafka2hive.conf -n a -Dflume.root.logger=INFO,console

注意事项

  1. 运行过程中遇到很多类不存在或者找不到类等相关的异常,解决办法是把Hive和Hadoop的相关jar包拷贝到Flume的lib目录下,如下这些是部署过程中碰到的一些,具体可以根据实际情况来处理。
#Hive
hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.0.jar
hcatalog/share/hcatalog/hive-hcatalog-core-3.1.0.jar
hcatalog/share/hcatalog/hive-hcatalog-server-extensions-3.1.0.jar
hcatalog/share/hcatalog/hive-hcatalog-pig-adapter-3.1.0.jar
hcatalog/share/hcatalog/hive-hcatalog-core-3.1.0.jar
lib/hive-jdbc-3.1.0.jar
lib/log4j-1.2-api-2.10.0.jar
lib/log4j-api-2.10.0.jar
lib/log4j-core-2.10.0.jar
lib/log4j-slf4j-impl-2.10.0.jar
lib/log4j-web-2.10.0.jar
lib/hive-standalone-metastore-3.1.0.jar
lib/hive-contrib-3.1.0.jar
lib/hive-exec-3.1.0.jar
lib/libfb303-0.9.3.jar
lib/calcite-core-1.16.0.jar
jdbc/hive-jdbc-3.1.0-standalone.jar
jdbc/hive-jdbc-3.1.0-standalone.jar

#Hadoop
share/hadoop/common/hadoop-common-2.7.7.jar
share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.7.7.jar
share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.7.jar
share/hadoop/common/lib/commons-configuration-1.6.jar
share/hadoop/common/lib/hadoop-auth-2.7.7.jar
share/hadoop/hdfs/hadoop-hdfs-2.7.7.jar
share/hadoop/hdfs/lib/htrace-core-3.1.0-incubating.jar
展开阅读全文

页面更新:2024-04-25

标签:以下内容   分区   注意事项   异常   背景   需求   文档   环境   文件   官方

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top