input {
redis {
host => "redis_1"
data_type => "list"
key => "logstash"
codec => "json"
}
}
filter {
if [type] == "postgres" {
grok {
match => ["message", "(?<timestamp>([0-9]{4})-%{MONTHNUM}-%{MONTHDAY} %{TIME} (?<log_timezone>(?:%{TZ}|CE[S]{0,1}T)))-%{DATA:keyvaluepairs} (?<log_level>[A-Z][A-Z]*): %{GREEDYDATA:log_message}"]
}
kv {
remove_field => ["keyvaluepairs"]
source => "keyvaluepairs"
field_split => ":"
prefix => "log_"
}
if [log_message] == "incomplete startup packet" {
drop { }
}
}
if [type] == "business" {
grok{
match => ["message", "(?<timestamp>([0-9]{4})-%{MONTHNUM}-%{MONTHDAY} %{TIME}) (?<log_level>%{LOGLEVEL}) \[(?<thread>%{DATA})\] (?<log_loger>[A-Za-z\.]+) -(?<keyvaluepairs>%{GREEDYDATA})"]
}
kv {
remove_field => ["keyvaluepairs"]
source => "keyvaluepairs"
field_split => ","
prefix => "log_"
trimkey => " "
trim => " "
}
mutate {
convert => ["log_duration", "integer"]
}
}
}
output {
elasticsearch_http { host => "elasticsearch_1" }
}