ELK日志系统之Logstash服务

 

Logstash的工作

Logstash是一个开源的、服务端的数据处理pipeline(管道),它可以接收多个源的数据、然后对它们进行转换、最终将它们发送到指定类型的目的地。

Logstash是通过插件机制实现各种功能的,读者可以在https://github.com/logstash-plugins 下载各种功能的插件,也可以自行编写插件。

Logstash实现的功能主要分为接收数据、解析过滤并转换数据、输出数据三个部分,对应的插件依次是input插件、filter插件、output插件,其中,filter插件是可选的,其它两个是必须插件。也就是说在一个完整的Logstash配置文件中,必须有input插件和output插件。

 

 

 

第一步

机器

我这边拿一台虚拟机,centos7的

主节点IP地址:192.168.0.154

 

下载

我这边是先下载好的

版本:Logstash6.5.2

官方下载地址:https://www.elastic.co/cn/downloads/logstash

 

JDK8

Logstash是需要jdk8的

JDK8安装文章:http://www.linux91.cn/c/Liunx__centos7__JDK8%E5%AE%89%E8%A3%85

 

 

 

第二步

安装 

解压:tar -zxf logstash-6.5.2.tar.gz

移动:mv logstash-6.5.2 /usr/local/logstash

 

 

 

第三步

配置文件

 

这里我们将logstash安装到/usr/local目录下,因此,logstash的配置文件目录为/usr/local/logstash/config/。

其中,jvm.options是设置JVM内存资源的配置文件,

logstash.yml是logstash全局属性配置文件,一般无需修改。

另外还需要自己创建一个logstash事件配置文件,这里重点介绍下logstash事件配置文件的编写方法和使用方式。

 

在介绍Logstash配置之前,先来认识一下logstash是如何实现输入和输出的。

Logstash提供了一个shell脚本/usr/local/logstash/bin/logstash,可以方便快速的启动一个logstash进程,在Linux命令行下,运行如下命令启动Logstash进程:

cd /usr/local/logstash/
bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
解释下这条命令的含义:
-e代表执行的意思。
input即输入的意思,input里面即是输入的方式,这里选择了stdin,就是标准输入(从终端输入)。
output即输出的意思,output里面是输出的方式,这里选择了stdout,就是标准输出(输出到终端)。
这里的codec是个插件,表明格式。这里放在stdout中,表示输出的格式,rubydebug是专门用来做测试的格式,一般用来在终端输出JSON格式。

 

接着,在终端输入信息。这里我们输入"Hello World",按回车,马上就会有返回结果,内容如下:

Hello World

{
      "@version" => "1",
       "message" => "Hello World",
          "host" => "localhost.localdomain",
    "@timestamp" => 2019-01-15T09:52:43.336Z
}

这就是logstash的输出格式。
Logstash在输出内容中会给事件添加一些额外信息。
比如"@version"、"host"、"@timestamp"都是新增的字段,
而最重要的是@timestamp,用来标记事件的发生时间。
由于这个字段涉及到Logstash内部流转,如果给一个字符串字段重命名为@timestamp的话,Logstash就会直接报错。另外,也不能删除这个字段

在logstash的输出中,常见的字段还有type,表示事件的唯一类型、tags,表示事件的某方面属性,我们可以随意给事件添加字段或者从事件里删除字段。

在执行上面的命令后,可以看到,我们输入什么内容,logstash就会按照上面的格式输出什么内容。使用CTRL-C命令可以退出运行的Logstash事件。


 

 

 创建配置一个简单的过滤配置文件

使用-e参数在命令行中指定配置是很常用的方式,但是如果logstash需要配置更多规则的话,就必须把配置固化到文件里,这就是logstash事件配置文件。

如果把上面在命令行执行的logstash命令,写到一个配置文件logstash-simple.conf中,就变成如下内容:

input {
        stdin {}
}

output {
        stdout {
             codec => rubydebug
        }
}

这就是最简单的Logstash事件配置文件。此时,可以使用logstash的-f参数来读取配置文件,然后启动logstash进程,操作如下:

bin/logstash -f config/logstash-simple.conf 

nohup bin/logstash -f config/logstash-simple.conf &


第一种启动方法:通过这种方式也可以启动logstash进程,不过这种方式启动的进程是在前台运行的

第二种启动方法:logstash进程就放到了后台运行了,在当前目录会生成一个nohup.out文件,可通过此文件查看logstash进程的启动状态

 

 

扩展

logstash过滤文件配置

 

简单的logstash过滤配置文件

下面是文件的内容

input {
        file {
                path => "/var/log/messages"
        }
}

output {
        stdout {
                codec => rubydebug
        }
}

 input插件

这里定义了input的输入源为file,然后指定了文件的路径为/var/log/messages,也就是将此文件的内容作为输入源,这里的path属性是必填配置,后面的路径必须是绝对路径,不能是相对路径。

如果需要监控多个文件,可以通过逗号分隔即可,例如:

path => ["/var/log/messages","/log/*.log"]

 

output插件

这里仍然采用rubydebug的JSON输出格式,这对于调试logstash输出信息是否正常非常有用。

 

这里我们来启动一下,以上面这个内容的配置文件启动,我测试的时候习惯在前台运行

bin/logstash -f config/logstash-sooomple.conf 

 

启动后,等程序启动后, 如果没有输出什么日志,开一个新的终端,因为我们关联的日志文件是系统登录日志文件,以下就是输出的信息:

{
      "@version" => "1",
          "host" => "localhost.localdomain",
       "message" => "Jan 16 11:31:33 localhost chronyd[6337]: Selected source 119.28.206.193",
    "@timestamp" => 2019-01-16T03:31:33.893Z,
          "path" => "/var/log/messages"
}

这就是json格式的输出内容

 

 

将输出到kafka集群上的logstash过滤配置文件

下面是配置文件的内容

input {
        file {
                path => "/var/log/messages"
        }
}

output {
        kafka {
                bootstrap_servers => "192.168.0.153:9092,192.168.0.154:9092"
                topic_id => "osmessages"
        }
}

这个配置文件中,输入input仍然是file,

重点看输出插件,这里定义了output的输出源为kafka,通过bootstrap_servers选项指定了kafka集群的IP地址和端口。

特别注意这里IP地址的写法,每个IP地址之间通过逗号分隔。另外,output输出中的topic_id选项,是指定输出到kafka中的哪个topic下,这里是osmessages,如果无此topic,会自动重建topic。

此配置文件所表达的含义是:将系统中/var/log/messages文件的内容实时的同步到kafka集群名为osmessages的topic下

 

请任选一下kafka的主机,执行以下的命令

bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.153:2181 --topic osmessages

这个命令就是在kafka端消费信息,可以看出,输入的信息在kafka消费端输出了,只不过在消息最前面增加了一个时间字段和一个主机字段

 

启动Logstash

bin/logstash -f config/logstash-sooo.conf 

 

 

配置logstash作为转发点

上面对logstash的使用做了一个基础的介绍,现在回到本节介绍的这个案例中,在这个部署架构中,logstash是作为一个二级转发节点使用的,

也就是它将kafka作为数据接收源,然后将数据发送到elasticsearch集群中,按照这个需求,新建logstash事件配置文件kafka_os.conf,内容如下:

input {
        kafka {
                bootstrap_servers => "192.168.0.153:9092,192.168.0.154:9092"
                topics => ["osmessages"]
                codec => "json"
        }       
}       

output {
        elasticsearch{
                hosts => ["192.168.0.191:9200","192.168.0.192:9200"]
                index => "osmessageslog-%{+YYYY-MM-dd}"
        }
}

从配置文件可以看到,input接收源变成了kafka,通过bootstrap_servers和topics两个选项指定了接收源kafka的属性信息,

因为logstash从kafka获取到的数据内容为json格式,所以还需要在input字段加入codec=>json来进行解析,

接着,output输出类型配置为elasticsearch,并通过hosts选项指定了elasticsearch集群的地址,最后通过index指定了索引的名称,

也就是下面要用到的Index pattern

 

 

转载请注明原文链接:ELK日志系统之Logstash服务

发表评论:

共有 0 条评论

 Top