前言:通常情况下,我们将Kafka的日志数据通过logstash订阅输出到ES,然后用Kibana来做可视化分析,这就是我们通常用的ELK日志分析模式。但是基于ELK的日志分析,通常比较常用的是实时分析,日志存个十天半个月都会删掉。那么在一些情况下,我需要将日志数据也存一份到我HDFS,积累到比较久的时间做半年、一年甚至更长时间的大数据分析。下面就来说如何最简单的通过logstash将kafka中的数据订阅一份到hdfs。
创新互联是一家专注于网站设计、成都网站建设与策划设计,井冈山网站建设哪家好?创新互联做网站,专注于网站建设十载,网设计领域的专业建站公司;建站业务涵盖:井冈山等地区。井冈山做网站价格咨询:028-86922220一:安装logstash(下载tar包安装也行,我直接yum装了)
#yum install logstash-2.1.1
二:从github上克隆代码
#git clone https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git #ls logstash-output-webhdfs-discontinued
三:安装logstash-output-webhdfs插件
#cd logstash-output-webhdfs-discontinued logstash的bin目录下有个plugin,使用plugin来安装插件 #/opt/logstash/bin/plugin install logstash-output-webhdfs
四:配置logstash
#vim /etc/logstash/conf.d/logstash.conf input { kafka { zk_connect => '10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181' #kafka的zk集群地址 group_id => 'hdfs' #消费者组,不要和ELK上的消费者一样 topic_id => 'apiAppWebCms-topic' #topic consumer_id => 'logstash-consumer-10.10.8.8' #消费者id,自定义,我写本机ip。 consumer_threads => 1 queue_size => 200 codec => 'json' } } output { #如果你一个topic中会有好几种日志,可以提取出来分开存储在hdfs上。 if [type] == "apiNginxLog" { webhdfs { workers => 2 host => "10.10.8.1" #hdfs的namenode地址 port => 50070 #webhdfs端口 user => "hdfs" #hdfs运行的用户啊,以这个用户的权限去写hdfs。 path => "/data/logstash/apiNginxLog-%{+YYYY}-%{+MM}-%{+dd}/logstash-%{+HH}.log #按天建目录,按小时建log文件。 flush_size => 500 # compression => "snappy" #压缩格式,可以不压缩 idle_flush_time => 10 retry_interval => 0.5 } } if [type] == "apiAppLog" { webhdfs { workers => 2 host => "10.64.8.1" port => 50070 user => "hdfs" path => "/data/logstash/api/apiAppLog-%{+YYYY}-%{+MM}-%{+dd}.log" flush_size => 500 # compression => "snappy" idle_flush_time => 10 retry_interval => 0.5 } } stdout { codec => rubydebug } }
五:启动logstash
#/etc/init.d/logstash start
已经可以成功写入了。
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。