Logstash Pattern 簡單教學

簡單對存入進去es的資料做一些講解

  • index => DB
  • type => Table
  • document => 剩下的檔案

基礎 由 三塊組合而成

  • input
  • filter
  • output

參考圖為最下方網址 基本的程式碼會長這樣由剛剛講的三個部分組成

基本上logstash 支援將各個檔案合併的方法,但是需要透過編號的方法 讓他區分開

logstash 在啟動的時候會在去將這些檔案合併,所以請依照數字排序 不要讓config出問題

有問題可以從log 看

為什麼需要這樣呢? 因為logstash 可以針對不同的index 執行不同的parsing,因為這塊非常消耗資源所以在寫的時候要非常小心

input

mutiline 如果沒有放在這個區域他沒辦法啟動 多執行緒去判別 port => 進入的port mutiline 判斷ISO 8601 或是 ISO 8601後面時間區碼是7碼 將他多行合併成一行

  input {

     beats {
          port => 5044
          codec => multiline {
          pattern => "^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}[\.,][0-9]{3,7} "
          negate => true
          what => "previous"
          }
      }
  }

filter

測試自己的pattern 可以先從下面兩個網址去查詢

https://grokdebug.herokuapp.com/
http://grokconstructor.appspot.com/do/match

[@metadata][beat] 這個是從你的filebeat 設定檔案的index 來的 他可以取不同的名子 切記要小寫
grok 代表他會match什麼樣的條件 會依照match的順序
break_on_match => true 代表只要符合他就會跳掉
%{TIMESTAMP_ISO8601:logDate} TIMESTAMP_ISO8601 代表他內建的條件 : 後面是你想要的欄位名稱
%{POSINT:processId:int} 最後可以指定他的型態
(?<logger>[A-Z-a-z .]+) 如果想自定pattern 格式 可以透過 [內的regular expression]去撰寫
mutate => 可以將你的欄位移除 或是轉換型態 ex: remove_field ,remove_tag GREEDYDATA => 剩餘資料全部變成此欄位

  • tag:可以用來搜尋 : 可以透過 動態的方法 加入tag

    mutate {
      add_tag => [ "%{httpMethod}" ,"%{apiPath}" ]
    }    
    
  • geoip 可以將ip 轉換成地圖所需的格式 經緯度

  • database => "/etc/logstash/GeoLiteCity.dat 此在五版已經改為內建 請勿在加入
  • 撰寫的時候請先判斷欄位 是否存在 在寫
  • 如果Server 有多台logstash 可以透過加tag 區分來源server
    filter {
        mutate {
            add_tag => [ "dev-logstash-01" ]
        }
    }
    

完整範例如下:

  filter {
      if  [@metadata][beat] =~ ""   {

          grok {        
              break_on_match => true
              match => {"message" => "%{TIMESTAMP_ISO8601:logDate} (\[%{POSINT:processId:int}\]) \[(?<status>(INFO|DEBUG|WARNING|ERROR))\] \[(?<logger>[A-Z\-a-z .]+)\] \[(?<PERFORMANCE>[A-Z\-a-z]+)\]\[(?<describe>[A-Z\-a-z  ]+)\]\[(?<Method>[A-Z\-a-z . /=?0-9\[\]:()',]+)\]\[%{NUMBER:performanceTime:int}ms\]%{GREEDYDATA:messages}"}
              match => {"message" => "%{TIMESTAMP_ISO8601:logDate} (\[%{POSINT:processId:int}\]) \[(?<status>(INFO|DEBUG|WARNING|ERROR))\] \[(?<logger>[A-Z\-a-z .]+)\]\[(?<webapi>[A-Z\-a-z]+)\]\[(?<apiPath>[A-Z\-a-z . /=?0-9\[\]:()]+)\]\[(?<httpMethod>[A-Z\-a-z /.\-]+)\]\[%{IP:clientIP}\]%{GREEDYDATA:messages}"}
              match => {"message" => "%{TIMESTAMP_ISO8601:logDate} (\[%{POSINT:processId:int}\]) \[(?<status>(INFO|DEBUG|WARNING|ERROR))\] \[(?<logger>[A-Z\-a-z .]+)\] \[(?<Feature>(FEATURE))\]\[(?<Method>[A-Z\-a-z . /=?0-9:()]+)\]\[(?<featureResult>[A-Z\-a-z . /=?0-9:()]+)\]%{GREEDYDATA:messages}"}
              match => {"message" => "%{TIMESTAMP_ISO8601:logDate} (\[%{POSINT:processId:int}\]) \[(?<status>(INFO|DEBUG|WARNING|ERROR))\] \[(?<logger>[A-Z\-a-z .]+)\] %{GREEDYDATA:messages}"}
              match => {"message" => "%{TIMESTAMP_ISO8601:logDate} (\[%{POSINT:processId:int}\]) (?<status>(INFO|DEBUG|WARN|ERROR))"}
          }
          # customize timestamp
          date {
              timezone => "America/Toronto"
              match => ["logDate", "ISO8601"]
              target => "@timestamp"
          }    
          mutate {
                  convert => [ "[geoip][coordinates]", "float"]
                  remove_tag => [ "beats_input_codec_plain_applied", "beats_input_codec_multiline_applied", "_grokparsefailure" ]
                  remove_field => [ "PERFORMANCE","logDate","Feature" ,"webapi" ]
          }



          if [httpMethod]{
              mutate {
                  add_tag => [ "%{httpMethod}" ,"%{apiPath}" ]
              }    

          }
          if [Method]{
              mutate {
                  add_tag => [ "%{Method}"  ]
              }    
          }
          if [featureResult]{
              mutate {
                  add_tag => [ "%{featureResult}"  ]
              }    
          }

          if [clientIP] {
              geoip {
                    source => "clientIP"
                    target => "geoip"
                    # database => "/etc/logstash/GeoLiteCity.dat"
                    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
              }
          }

      }
  }

Output

host =>如果是cluster 可以打所有SERVER 他會自動sync index => ww 是BY week host => 可以傳入單一data node or 全部的data node 他會自動作sync up

    output {
      elasticsearch {
        hosts => ["http://xxx.xx.xxx.xx:9200"]
        index => "%{[@metadata][beat]}-%{+xxxx.ww}"
        document_type => "%{[@metadata][type]}"
       }
    }

Reference:

Geo IP查詢:http://stackoverflow.com/questions/33673510/logstash-kibana-geoip-not-working

Parsing JSON:http://stackoverflow.com/questions/33937936/how-to-parse-json-in-logstash-grok-from-a-text-file-line

替換@timestam:http://stackoverflow.com/questions/20401663/logstash-replace-timestamp-with-syslog-date

https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html 基礎介紹:https://qbox.io/blog/parsing-logs-using-logstash?utm_source=qbox.io&utm_medium=article&utm_campaign=logstash-5-0-monitoring-api-node-plugin-stats-hot-threads 效能介紹https://www.elastic.co/blog/do-you-grok-grok

results matching ""

    No results matching ""