将 Kafka 中的数据通过 Flume 收集并存储到 Hive 里。
这里是使用 Apache 社区版部署的环境,而非 CDH、HDP 等方式,可以先参考官方文档搭建好环境。
hive.txn.manager
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive
transactions, which also requires appropriate settings for hive.compactor.initiator.on,
hive.compactor.worker.threads, hive.support.concurrency (true),
and hive.exec.dynamic.partition.mode (nonstrict).
The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides
no transactions.
hive.support.concurrency
true
Whether Hive supports concurrency control or not.
A ZooKeeper instance must be up and running when using zookeeper Hive lock manager
hive.metastore.uris
thrift://localhost:9083
Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
CREATE DATABASE test_db;
CREATE TABLE `test_kafka_to_hive`(
`time` string,
`type` string,
`value` string,
`code` string)
PARTITIONED BY ( `partition_time` string)
CLUSTERED BY ( `time`)
INTO 2 BUCKETS stored AS ORC
TBLPROPERTIES ('transactional'='true');
#创建source、channel、sink
a.sources=kafka_source
a.channels=memory_channel
a.sinks=hive_sink
#kafka为souce的配置
a.sources.kafka_source.type=org.apache.flume.source.kafka.KafkaSource
a.sources.kafka_source.zookeeperConnect=localhost:2181
a.sources.kafka_source.bootstrap.servers=localhost:9092
a.sources.kafka_source.topic=processed-realtimeData
a.sources.kafka_source.channels=memory_channel
a.sources.kafka_source.consumer.timeout.ms=1000
a.sources.kafka_source.batchSize=10
#hive为sink的配置
a.sinks.hive_sink.type=hive
a.sinks.hive_sink.hive.metastore=thrift://host_hive_metadatastore:9083
a.sinks.hive_sink.hive.database=test_db
a.sinks.hive_sink.hive.table=test_kafka_to_hive
a.sinks.hive_sink.hive.partition=%Y-%m-%d
a.sinks.hive_sink.hive.txnsPerBatchAsk=2
a.sinks.hive_sink.batchSize=1
a.sinks.hive_sink.serializer=JSON
a.sinks.hive_sink.serializer.fieldnames=time,type,value,code
#channel的配置
a.channels.memory_channel.type=com.my.flume.channel.JsonParsedMemoryChannel
a.channels.memory_channel.capacity=1000
a.channels.memory_channel.transactionCapacity=100
#三者之间的关系
a.sources.kafka_source.channels=memory_channel
a.sinks.hive_sink.channel=memory_channel
fs.hdfs.impl
org.apache.hadoop.hdfs.DistributedFileSystem
./bin/flume-ng agent -c ./conf -f ./conf/kafka2hive.conf -n a -Dflume.root.logger=INFO,console
#Hive
hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.0.jar
hcatalog/share/hcatalog/hive-hcatalog-core-3.1.0.jar
hcatalog/share/hcatalog/hive-hcatalog-server-extensions-3.1.0.jar
hcatalog/share/hcatalog/hive-hcatalog-pig-adapter-3.1.0.jar
hcatalog/share/hcatalog/hive-hcatalog-core-3.1.0.jar
lib/hive-jdbc-3.1.0.jar
lib/log4j-1.2-api-2.10.0.jar
lib/log4j-api-2.10.0.jar
lib/log4j-core-2.10.0.jar
lib/log4j-slf4j-impl-2.10.0.jar
lib/log4j-web-2.10.0.jar
lib/hive-standalone-metastore-3.1.0.jar
lib/hive-contrib-3.1.0.jar
lib/hive-exec-3.1.0.jar
lib/libfb303-0.9.3.jar
lib/calcite-core-1.16.0.jar
jdbc/hive-jdbc-3.1.0-standalone.jar
jdbc/hive-jdbc-3.1.0-standalone.jar
#Hadoop
share/hadoop/common/hadoop-common-2.7.7.jar
share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.7.7.jar
share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.7.jar
share/hadoop/common/lib/commons-configuration-1.6.jar
share/hadoop/common/lib/hadoop-auth-2.7.7.jar
share/hadoop/hdfs/hadoop-hdfs-2.7.7.jar
share/hadoop/hdfs/lib/htrace-core-3.1.0-incubating.jar
页面更新:2024-04-25
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号