logstash的部署、整合ELK+Filebeat

官方部署文档: https://raw.githubusercontent.com/elastic/beats/7.12/deploy/kubernetes/filebeat-kubernetes.yaml

阿里云文档: https://help.aliyun.com/document_detail/169257.htm

学习:https://elkguide.elasticsearch.cn/

所有日志由Rsyslog或者Filebeat收集,然后传输给Kafka,Logstash作为Consumer消费Kafka里边的数据,分别写入Elasticsearch和Hadoop,最后使用Kibana输出到web端供相关人员查看,或者是由Spark接手进入更深层次的分析。

在以上整个架构中,核心的几个组件Kafka、Elasticsearch、Hadoop天生支持高可用,唯独Logstash是不支持的,用单个Logstash去处理日志,不仅存在处理瓶颈更重要的是在整个系统中存在单点的问题

原文:https://www.cnblogs.com/caoweixiong/p/12691458.html

一,简介

由于于 logstash是java应用,解析日志是非的消耗cpu和内存,logstash安装在应用部署的机器上显得非常的笨重。最常见的做法是用filebeat部署在应用的机器上,logstash单独部署,然后由 filebeat将日志输出给logstash解析,解析完由logstash再传给elasticsearch。

logstash从https://www.elastic.co/downloads选择要下载的版本链接

stash第一个事件

Logstash管道有两个必需元素,输入和输出,以及一个可选元素filter。 输入插件使用来自源的数据,过滤器插件在您指定时修改数据,输出插件将数据写入目标。

如下:原文

  • Input组件:负责采集日志数据,包括文件、syslog、collectd、kafka、redis等等;

  • Filter:负责解析日志数据,包括解析、加工、转换数据等;

  • Output:负责输出日志数据,对接到redis、kafka、elasticsearch、hdfs等存储组件;

Filebeat(采集数据)+Logstash(过滤)+Elasticsearch(建立索引)+Kibana(展示)

二,Logstash配置文件详解

以6.3.2为例讲解

安装路径下可以看到相关配置文件:

# cd /etc/logstash/
# ls
conf.d  jvm.options  log4j2.properties  logstash.yml  pipelines.yml  startup.options


conf.d:logstash日志解析文件保存在此处;
jvm.options:内存相关设置
log4j2.properties:日志相关配置
logstash.yml:logstash系统相关配置
pipelines.yml:管道配置
startup.options:启动配置
一般测试时需要配置内存,默认内存为1g;

————————————————
。
原文链接:https://blog.csdn.net/u010904188/article/details/81776737

2。1下面主要讲解 过滤filter插件

更多input插件可参考:https://www.elastic.co/guide/en/logstash/current/input-plugins.html

filter 模块:

filter模块下主要插件grok、mutate、ruby、date、json如下:

grok插件配置详解:如下,可酌情选择进行配置,可参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

filter{
    # 解析任意文本并且结构化他们。grok目前是logstash中最好的解析非结构化日志并且结构化他们的工具。这个工具非常适合syslog、apache log、mysql log之类的人们可读日志的解析
    grok{
       # 正则匹配日志,可以筛选分割出需要记录的字段和值
        match => { "message" => "正则表达式"}
       # 删除不需要记录的字段
        remove_field => ["message"]
    }
}

————————————————


原文链接:https://blog.csdn.net/u010904188/article/details/81776737

mutate插件配置详解:如下,可酌情选择进行配置,可参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

filter {  
    #mutate过滤器允许您对字段执行常规突变。您可以重命名,删除,替换和修改事件中的字段。
    mutate {
        #将字段的值转换为其他类型,例如将字符串转换为整数。如果字段值是数组,则将转换所有成员。如果该字段是哈希,则不会采取任何操作。
        convert => [
                    #把request_time的值装换为浮点型
                    "request_time", "float",
                    #costTime的值转换为整型
                    "costTime", "integer"
                    ]
        #将现有字段复制到另一个字段。将覆盖现有目标字段
        copy => { "source_field" => "dest_field" }
        #将正则表达式与字段值匹配,并将所有匹配替换为替换字符串。仅支持字符串或字符串数​​组的字段。对于其他类型的领域,将不采取任何行动。
        gsub => [
         #用下划线替换所有正斜杠
         "fieldname", "/", "_",
         #替换反斜杠,问号,哈希和减少
         #带点“。”
         "fieldname2", "[\\?#-]", "."
         ]
    } 

————————————————

原文链接:https://blog.csdn.net/u010904188/article/details/81776737

ruby插件配置详解:如下,可酌情选择进行配置,可参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html

filter {
    #ruby插件可以使用任何的ruby语法,无论是逻辑判断,条件语句,循环语句,还是对字符串的操作,对EVENT对象的操作,都是极其得心应手的。
    ruby {
        #ruby插件有两个属性,一个init 还有一个code
        #init属性是用来初始化字段的,你可以在这里初始化一个字段,无论是什么类型的都可以,这个字段只是在ruby{}作用域里面生效。
        #这里我初始化了一个名为field的hash字段。可以在下面的coed属性里面使用。
        init => [field={}]
        #code属性使用两个冒号进行标识,你的所有ruby语法都可以在里面进行。
        #下面我对一段数据进行处理。
        #首先,我需要在把message字段里面的值拿到,并且对值进行分割按照“|”。这样分割出来的是一个数组(ruby的字符创处理)。
        #第二步,我需要循环数组判断其值是否是我需要的数据(ruby条件语法、循环结构)
        #第三步,我需要吧我需要的字段添加进入EVEVT对象。
        #第四步,选取一个值,进行MD5加密
        #什么是event对象?event就是Logstash对象,你可以在ruby插件的code属性里面操作他,可以添加属性字段,可以删除,可以修改,同样可以进行树脂运算。
        #进行MD5加密的时候,需要引入对应的包。
        #最后把冗余的message字段去除。
        code => "
            array=event。get('message').split('|')
            array.each do |value|
                if value.include? 'MD5_VALUE'
                    then 
                        require 'digest/md5'
                        md5=Digest::MD5.hexdigest(value)
                        event.set('md5',md5)
                end
                if value.include? 'DEFAULT_VALUE'
                    then
                        event.set('value',value)
                end
            end
             remove_field=>"message"
        "
    }
}

————————————————

原文链接:https://blog.csdn.net/u010904188/article/details/81776737

json插件配置详解:如下,可酌情选择进行配置,可参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html


filter {
  #这是一个JSON解析过滤器。它需要一个包含JSON的现有字段,并将其扩展为Logstash事件中的实际数据结构。
  #默认情况下,它会将解析后的JSON放在Logstash事件的根(顶层)中,但是可以使用配置将此过滤器配置为将JSON放入任意任意事件字段中 target。
  json {
    #指定json所在位置如果您在message字段中有JSON数据
    source => "message"
    #如果此过滤器成功,请向此事件添加任意字段。字段名称可以是动态的,并使用包含事件的部分内容%{field}。
    add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
    #移除字段
    remove_field => [ "foo_%{somefield}" ]
  }
}

————————————————

原文链接:https://blog.csdn.net/u010904188/article/details/81776737

更多可参考:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

logstash部署

1. jdk安装(略)

2 下载 logsash(需要和es版本对应)

下载
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.zip

解压压缩包
unzip logstash-6.3.2.zip

将解压的包移到/usr/share目录下
mv logstash-6.3.2 /usr/share/ cd /usr/share/logstash-6.3.2/

3 修改logstash的安装目录的config目录下的logstash-sample.conf文件,配置如下:

input {
  kafka {
    bootstrap_servers => ["10.1.196.24:9092,10.1.196.22:9092,10.1.196.23:9092"]
    group_id => "es-test"
    topics => ["estest"] 
    codec => json
 }
}
 #日志筛选匹配处理
filter {

}
output {
  elasticsearch {
    hosts => "http://es.miz.so:9200"
    index => "kafka‐%{+YYYY.MM.dd}"

 }
}

3. 配置开机启动服务

cd /lib/systemd/system

cat >  logstash.service <<EOF
[Unit]
Description=logstash
Wants=network-online.target
After=network-online.target

[Service]
User=root
ExecStart=/usr/share/logstash-6.3.2/bin/logstash -f /usr/share/logstash-6.3.2/config/logstash-sample.conf
# 设置为掉线自动重启,进程强制杀掉后会自动重新启动
Restart=always

[Install]
WantedBy=multi-user.target

EOF

systemctl daemon-reload
systemctl enable logstash
systemctl status logstash
systemctl start  logstash
systemctl status logstash
journalctl  -f -u logstash

日志筛选匹配处理

日志格式如下:

1.不符合规范的不进行收集,例如,日志规范中没有包含特定字段,interface,那么不进行收集
filter {
  if ![@fields][result] {
    drop{}
  }

}
参考文档:https://www.it1352.com/1579483.html
https://elasticsearch.cn/article/6192
2.或者对于包含特定字段内容的,输出到另外的索引

output {
#字段 errorType等于1, 并且interface等于500时,将内容输出到error_test索引
if [@fields][errorType] == "1" and [@fields][interface] == "500" {
  elasticsearch {
    hosts => "http://es.miz.hk:9200"
    index => "error_test‐%{+YYYY.MM.dd}"
  }
}
#字段interface等于200时,输出内容到kafka-索引
else if [@fields][interface] == "200" {
  elasticsearch {
    hosts => "http://es.miz.hk:9200"
    index => "kafka‐%{+YYYY.MM.dd}"
  }
}
else {
  elasticsearch {
    hosts => "http://es.miz.hk:9200"
    index => "kafka‐%{+YYYY.MM.dd}"
  }
}
3.对于包含特定字段内容的,不进行收集

# 字段result不为0的直接丢弃,不收集
filter {
if [@fields][result] != "0" {
  drop{}
}
}

logstash input多个kafka,ouput多es索引

实现:

1.topics pordlog 输出到es索引 prodlogs-2021-06-30

2.topics stagelog 输出到es索引 stagelogs-2021-06-3

3.如果字段包含soda则 输出到es索引 soda-2021-06-3

linux 下测试写入日志格式如下:

echo '{"@timestamp":"2021-06-30T17:36:55+08:00","@namespace":"maizuo","@service":"bill-service-574f6d59d6-bvwrb","@podIp":"10.20.1.239","@fields":{"traceId":"7","operatorId":"7","userId":"userId=1000","mobile":"这是darrendu12345678","orderId":"0","cardNo":"100","clientIp":"192.168.1.239"}}' >>newmaizuo.log

规则如下

[root@iZbp15rvy1pu807er7crnzZ config]# cat logstash-sample.conf
input {
  kafka {
    bootstrap_servers => ["10.1.196.25:9092"]
    client_id => "kafka_client_1"
    group_id => "es-prod"
    topics => ["pordlog"] 
    codec => json
 }
  kafka {
    bootstrap_servers => ["10.1.196.25:9092"]
    client_id => "kafka_client_2"
    group_id => "es-stage"
    topics => ["stagelog"]
    codec => json
 }
}
 #日志筛选匹配处理
filter {
      

}
output {
  if [@namespace] == "soda"{
  elasticsearch {
    hosts => "http://dmzes.maizuo.com:9200"
    index => "soda‐%{+YYYY.MM.dd}"
  }
} 
 else {
  elasticsearch {
    hosts => "http://dmzes.maizuo.com:9200"
    #index => "prod-new-filebeat-6.0.0‐%{+YYYY.MM.dd}"
    index => "%{[@metadata][topic]}‐%{+YYYY.MM.dd}"
 }
}
}
说明:input 消费kafka时, 分别指定不同的 client_id.见文档 https://liuzhihang.com/2019/03/04/logstash-input-multiple-kafka-exceptions

参考文章

1。原文链接:https://blog.csdn.net/xujiamin0022016/article/details/114819249

2。原文链接:https://blog.csdn.net/u010904188/article/details/81776737

3。filebeat讲解 https://www.cnblogs.com/cjsblog/p/9495024.html

4 学习资料1 https://elasticsearch.cn/question/2818

5 2 http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/filter/grok.html

6.https://www.cnblogs.com/JetpropelledSnake/p/10873582.html

Last updated