官方部署文档: https://raw.githubusercontent.com/elastic/beats/7.12/deploy/kubernetes/filebeat-kubernetes.yaml
阿里云文档: https://help.aliyun.com/document_detail/169257.htm
学习:https://elkguide.elasticsearch.cn/
所有日志由Rsyslog或者Filebeat收集,然后传输给Kafka,Logstash作为Consumer消费Kafka里边的数据,分别写入Elasticsearch和Hadoop,最后使用Kibana输出到web端供相关人员查看,或者是由Spark接手进入更深层次的分析。
在以上整个架构中,核心的几个组件Kafka、Elasticsearch、Hadoop天生支持高可用,唯独Logstash是不支持的,用单个Logstash去处理日志,不仅存在处理瓶颈更重要的是在整个系统中存在单点的问题
原文:https://www.cnblogs.com/caoweixiong/p/12691458.html
一,简介
由于于 logstash是java应用,解析日志是非的消耗cpu和内存,logstash安装在应用部署的机器上显得非常的笨重。最常见的做法是用filebeat部署在应用的机器上,logstash单独部署,然后由 filebeat将日志输出给logstash解析,解析完由logstash再传给elasticsearch。
logstash从https://www.elastic.co/downloads选择要下载的版本链接
stash第一个事件
Logstash管道有两个必需元素,输入和输出,以及一个可选元素filter。 输入插件使用来自源的数据,过滤器插件在您指定时修改数据,输出插件将数据写入目标。
如下:原文
Input组件:负责采集日志数据,包括文件、syslog、collectd、kafka、redis等等;
Filter:负责解析日志数据,包括解析、加工、转换数据等;
Output:负责输出日志数据,对接到redis、kafka、elasticsearch、hdfs等存储组件;
Filebeat(采集数据)+Logstash(过滤)+Elasticsearch(建立索引)+Kibana(展示)
二,Logstash配置文件详解
以6.3.2为例讲解
安装路径下可以看到相关配置文件:
# cd /etc/logstash/
# ls
conf.d jvm.options log4j2.properties logstash.yml pipelines.yml startup.options
conf.d:logstash日志解析文件保存在此处;
jvm.options:内存相关设置
log4j2.properties:日志相关配置
logstash.yml:logstash系统相关配置
pipelines.yml:管道配置
startup.options:启动配置
一般测试时需要配置内存,默认内存为1g;
————————————————
。
原文链接:https://blog.csdn.net/u010904188/article/details/81776737
2。1下面主要讲解 过滤filter插件
更多input插件可参考:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
filter 模块:
filter模块下主要插件grok、mutate、ruby、date、json如下:
grok插件配置详解:如下,可酌情选择进行配置,可参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
filter{
# 解析任意文本并且结构化他们。grok目前是logstash中最好的解析非结构化日志并且结构化他们的工具。这个工具非常适合syslog、apache log、mysql log之类的人们可读日志的解析
grok{
# 正则匹配日志,可以筛选分割出需要记录的字段和值
match => { "message" => "正则表达式"}
# 删除不需要记录的字段
remove_field => ["message"]
}
}
————————————————
原文链接:https://blog.csdn.net/u010904188/article/details/81776737
mutate插件配置详解:如下,可酌情选择进行配置,可参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
filter {
#mutate过滤器允许您对字段执行常规突变。您可以重命名,删除,替换和修改事件中的字段。
mutate {
#将字段的值转换为其他类型,例如将字符串转换为整数。如果字段值是数组,则将转换所有成员。如果该字段是哈希,则不会采取任何操作。
convert => [
#把request_time的值装换为浮点型
"request_time", "float",
#costTime的值转换为整型
"costTime", "integer"
]
#将现有字段复制到另一个字段。将覆盖现有目标字段
copy => { "source_field" => "dest_field" }
#将正则表达式与字段值匹配,并将所有匹配替换为替换字符串。仅支持字符串或字符串数组的字段。对于其他类型的领域,将不采取任何行动。
gsub => [
#用下划线替换所有正斜杠
"fieldname", "/", "_",
#替换反斜杠,问号,哈希和减少
#带点“。”
"fieldname2", "[\\?#-]", "."
]
}
————————————————
原文链接:https://blog.csdn.net/u010904188/article/details/81776737
ruby插件配置详解:如下,可酌情选择进行配置,可参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html
filter {
#ruby插件可以使用任何的ruby语法,无论是逻辑判断,条件语句,循环语句,还是对字符串的操作,对EVENT对象的操作,都是极其得心应手的。
ruby {
#ruby插件有两个属性,一个init 还有一个code
#init属性是用来初始化字段的,你可以在这里初始化一个字段,无论是什么类型的都可以,这个字段只是在ruby{}作用域里面生效。
#这里我初始化了一个名为field的hash字段。可以在下面的coed属性里面使用。
init => [field={}]
#code属性使用两个冒号进行标识,你的所有ruby语法都可以在里面进行。
#下面我对一段数据进行处理。
#首先,我需要在把message字段里面的值拿到,并且对值进行分割按照“|”。这样分割出来的是一个数组(ruby的字符创处理)。
#第二步,我需要循环数组判断其值是否是我需要的数据(ruby条件语法、循环结构)
#第三步,我需要吧我需要的字段添加进入EVEVT对象。
#第四步,选取一个值,进行MD5加密
#什么是event对象?event就是Logstash对象,你可以在ruby插件的code属性里面操作他,可以添加属性字段,可以删除,可以修改,同样可以进行树脂运算。
#进行MD5加密的时候,需要引入对应的包。
#最后把冗余的message字段去除。
code => "
array=event。get('message').split('|')
array.each do |value|
if value.include? 'MD5_VALUE'
then
require 'digest/md5'
md5=Digest::MD5.hexdigest(value)
event.set('md5',md5)
end
if value.include? 'DEFAULT_VALUE'
then
event.set('value',value)
end
end
remove_field=>"message"
"
}
}
————————————————
原文链接:https://blog.csdn.net/u010904188/article/details/81776737
json插件配置详解:如下,可酌情选择进行配置,可参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html
filter {
#这是一个JSON解析过滤器。它需要一个包含JSON的现有字段,并将其扩展为Logstash事件中的实际数据结构。
#默认情况下,它会将解析后的JSON放在Logstash事件的根(顶层)中,但是可以使用配置将此过滤器配置为将JSON放入任意任意事件字段中 target。
json {
#指定json所在位置如果您在message字段中有JSON数据
source => "message"
#如果此过滤器成功,请向此事件添加任意字段。字段名称可以是动态的,并使用包含事件的部分内容%{field}。
add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
#移除字段
remove_field => [ "foo_%{somefield}" ]
}
}
————————————————
原文链接:https://blog.csdn.net/u010904188/article/details/81776737
更多可参考:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
logstash部署
1. jdk安装(略)
2 下载 logsash(需要和es版本对应)
下载
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.zip
解压压缩包
unzip logstash-6.3.2.zip
将解压的包移到/usr/share目录下
mv logstash-6.3.2 /usr/share/ cd /usr/share/logstash-6.3.2/
3 修改logstash的安装目录的config目录下的logstash-sample.conf文件,配置如下:
input {
kafka {
bootstrap_servers => ["10.1.196.24:9092,10.1.196.22:9092,10.1.196.23:9092"]
group_id => "es-test"
topics => ["estest"]
codec => json
}
}
#日志筛选匹配处理
filter {
}
output {
elasticsearch {
hosts => "http://es.miz.so:9200"
index => "kafka‐%{+YYYY.MM.dd}"
}
}
3. 配置开机启动服务
cd /lib/systemd/system
cat > logstash.service <<EOF
[Unit]
Description=logstash
Wants=network-online.target
After=network-online.target
[Service]
User=root
ExecStart=/usr/share/logstash-6.3.2/bin/logstash -f /usr/share/logstash-6.3.2/config/logstash-sample.conf
# 设置为掉线自动重启,进程强制杀掉后会自动重新启动
Restart=always
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable logstash
systemctl status logstash
systemctl start logstash
systemctl status logstash
journalctl -f -u logstash
日志筛选匹配处理
日志格式如下:
1.不符合规范的不进行收集,例如,日志规范中没有包含特定字段,interface,那么不进行收集
filter {
if ![@fields][result] {
drop{}
}
}
参考文档:https://www.it1352.com/1579483.html
https://elasticsearch.cn/article/6192
2.或者对于包含特定字段内容的,输出到另外的索引
output {
#字段 errorType等于1, 并且interface等于500时,将内容输出到error_test索引
if [@fields][errorType] == "1" and [@fields][interface] == "500" {
elasticsearch {
hosts => "http://es.miz.hk:9200"
index => "error_test‐%{+YYYY.MM.dd}"
}
}
#字段interface等于200时,输出内容到kafka-索引
else if [@fields][interface] == "200" {
elasticsearch {
hosts => "http://es.miz.hk:9200"
index => "kafka‐%{+YYYY.MM.dd}"
}
}
else {
elasticsearch {
hosts => "http://es.miz.hk:9200"
index => "kafka‐%{+YYYY.MM.dd}"
}
}
3.对于包含特定字段内容的,不进行收集
# 字段result不为0的直接丢弃,不收集
filter {
if [@fields][result] != "0" {
drop{}
}
}
logstash input多个kafka,ouput多es索引
实现:
1.topics pordlog 输出到es索引 prodlogs-2021-06-30
2.topics stagelog 输出到es索引 stagelogs-2021-06-3
3.如果字段包含soda则 输出到es索引 soda-2021-06-3
linux 下测试写入日志格式如下:
echo '{"@timestamp":"2021-06-30T17:36:55+08:00","@namespace":"maizuo","@service":"bill-service-574f6d59d6-bvwrb","@podIp":"10.20.1.239","@fields":{"traceId":"7","operatorId":"7","userId":"userId=1000","mobile":"这是darrendu12345678","orderId":"0","cardNo":"100","clientIp":"192.168.1.239"}}' >>newmaizuo.log
规则如下
[root@iZbp15rvy1pu807er7crnzZ config]# cat logstash-sample.conf
input {
kafka {
bootstrap_servers => ["10.1.196.25:9092"]
client_id => "kafka_client_1"
group_id => "es-prod"
topics => ["pordlog"]
codec => json
}
kafka {
bootstrap_servers => ["10.1.196.25:9092"]
client_id => "kafka_client_2"
group_id => "es-stage"
topics => ["stagelog"]
codec => json
}
}
#日志筛选匹配处理
filter {
}
output {
if [@namespace] == "soda"{
elasticsearch {
hosts => "http://dmzes.maizuo.com:9200"
index => "soda‐%{+YYYY.MM.dd}"
}
}
else {
elasticsearch {
hosts => "http://dmzes.maizuo.com:9200"
#index => "prod-new-filebeat-6.0.0‐%{+YYYY.MM.dd}"
index => "%{[@metadata][topic]}‐%{+YYYY.MM.dd}"
}
}
}
说明:input 消费kafka时, 分别指定不同的 client_id.见文档 https://liuzhihang.com/2019/03/04/logstash-input-multiple-kafka-exceptions
参考文章
1。原文链接:https://blog.csdn.net/xujiamin0022016/article/details/114819249
2。原文链接:https://blog.csdn.net/u010904188/article/details/81776737
3。filebeat讲解 https://www.cnblogs.com/cjsblog/p/9495024.html
4 学习资料1 https://elasticsearch.cn/question/2818
5 2 http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/filter/grok.html
6.https://www.cnblogs.com/JetpropelledSnake/p/10873582.html