読者です 読者をやめる 読者になる 読者になる

ぐるっとぐりっど

日曜プログラマがいろいろ試してみたことを、後の自分のためにまとめておく場所

ElasticSearch+Fluentd+kibanaでEFKスタックを作る その2

の続きです。

今回は、SSHのアクセス元をマップ上に表現してみました。

主に、を全面的に参考にしました。

今回は、/var/log/secureに出力される3つのログ(パスワード間違い、存在しないユーザ、ログイン成功)をすべてマッピングしてみることにしました。それぞれ以下のような感じでバラバラのフォーマットで出力されるので、一筋縄ではいかないような気がします。

Nov  3 06:05:16 hostname sshd[12893]: Failed password for userA from xxx.xxx.xxx.xxx port 34436 ssh2
Nov  3 12:45:29 hostname sshd[15166]: Accepted password for userB from xxx.xxx.xxx.xxx port 54758 ssh2
Nov  3 12:28:16 hostname sshd[15065]: Invalid user userC from xxx.xxx.xxx.xxx

完成図は以下のような感じ。ちょっとサンプル数が少ないのでつまらないかも。
f:id:grugrut:20151103164846p:plain

fluent-plugin-geoipをインストールする

まずは、IPアドレスを座標に変換しなくてはならないので、fluent-plugin-geoipを導入

# /opt/td-agent/usr/sbin/td-agent-gem install fluent-plugin-geoip

以上。

最初間違って、td-agent-gem install geoipとしてしまったら、別のものが入ってしまったのだけれど、あれは何者だったのだろう。

なお、gccがインストールされていないと、

# /opt/td-agent/usr/sbin/td-agent-gem install fluent-plugin-geoip
WARN: Unresolved specs during Gem::Specification.reset:
      json (>= 1.4.3)
WARN: Clearing out unresolved specs.
Please report a bug if this causes problems.
Fetching: fluent-mixin-rewrite-tag-name-0.1.0.gem (100%)
Successfully installed fluent-mixin-rewrite-tag-name-0.1.0
Fetching: geoip-c-0.9.1.gem (100%)
Building native extensions.  This could take a while...
ERROR:  Error installing fluent-plugin-geoip:
        ERROR: Failed to build gem native extension.

    /opt/td-agent/embedded/bin/ruby extconf.rb
checking for iconv_open() in iconv.h... *** extconf.rb failed ***
Could not create Makefile due to some reason, probably lack of necessary
libraries and/or headers.  Check the mkmf.log file for more details.  You may
need configuration options.

Provided configuration options:
        --with-opt-dir
        --with-opt-include
        --without-opt-include=${opt-dir}/include
        --with-opt-lib
        --without-opt-lib=${opt-dir}/lib
        --with-make-prog
        --without-make-prog
        --srcdir=.
        --curdir
        --ruby=/opt/td-agent/embedded/bin/ruby
        --with-geoip-dir
        --without-geoip-dir
        --with-geoip-include
        --without-geoip-include=${geoip-dir}/include
        --with-geoip-lib
        --without-geoip-lib=${geoip-dir}/lib
/opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:467:in `try_do': The compiler failed to generate an executable file. (RuntimeError)
You have to install development tools first.
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:552:in `try_link0'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:567:in `try_link'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:753:in `try_func'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:1038:in `block in have_func'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:929:in `block in checking_for'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:351:in `block (2 levels) in postpone'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:321:in `open'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:351:in `block in postpone'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:321:in `open'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:347:in `postpone'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:928:in `checking_for'
        from /opt/td-agent/embedded/lib/ruby/2.1.0/mkmf.rb:1037:in `have_func'
        from extconf.rb:8:in `<main>'

extconf failed, exit code 1

Gem files will remain installed in /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/geoip-c-0.9.1 for inspection.
Results logged to /opt/td-agent/embedded/lib/ruby/gems/2.1.0/extensions/x86_64-linux/2.1.0/geoip-c-0.9.1/gem_make.out

な感じのエラーが出て、ハマった。

sudo yum install gcc

で、さっくり解決。

fluentdの設定をする

これで、IPアドレスからアクセス元を検知できるようになったので、fluentdの設定をした
今回は、/var/log/secureに書かれるフォーマットが若干違うログを一括で処理したかったので、それを実現できるように以下の正規表現を書いた。

(?<time>[^0-9]+ [^ ]+ [^ ]+) (?<hostname>[^ ]+) sshd\[.+\]: (?<reason>Failed password for|Accepted password for|Invalid user) (?<user>[^ ]+) from (?<ip_address>[^ ]+).*

これで、

Nov  3 06:05:16 hostname sshd[12893]: Failed password for userA from xxx.xxx.xxx.xxx port 34436 ssh2
Nov  3 12:45:29 hostname sshd[15166]: Accepted password for userB from xxx.xxx.xxx.xxx port 54758 ssh2
Nov  3 12:28:16 hostname sshd[15065]: Invalid user userC from xxx.xxx.xxx.xxx

のログ三種の、タイムスタンプ、ホスト名、成否、ユーザ名、IPアドレスを取得することができる(はず)。

ごり押し感ぱないので、もっとスマートなやり方ご存じの方は教えてください!

まとめると、td-agent.confに以下の設定をいれた。

<source>
  type tail
  path /var/log/secure
  pos_file /var/log/td-agent/secure.log.pos
  tag geo.ssh.access
  format /^(?<time>[^0-9]+ [^ ]+ [^ ]+) (?<hostname>[^ ]+) sshd\[.+\]: (?<reason>Failed password for|Accepted password for|Invalid user) (?<user>[^ ]+) from (?<ip_address>[^ ]+).*$/
</source>

<match geo.ssh.access>
  type geoip
  geoip_lookup_key ip_address
  <record>
    country ${country_code['ip_address']}
    location ${latitude['ip_address']},${longitude['ip_address']}
  </record>
  remove_tag_prefix geo.
  add_tag_prefix es.

</match>

<match es.ssh.access>
  type elasticsearch
  logstash_format true
  type_name ssh-access
  include_tag_key true
  tag_key @log_name
  flush_interval 20s
</match>

たぶんkibana3のときは、国コードで地図に色を塗れたようなんだけど、kibana4では座標が必要なようなので、

location ${latitude['ip_address']},${longitude['ip_address']}

と、fluent-plugin-geoipの機能で、IPアドレスをもとにした座標をログに追加している。

Elasticsearchにデータ型の設定を追加する

kibana4で、地図上にマッピングするためには、fluentdに追加したデータを地図上の座標情報ですよ、と教えてあげる必要があるらしい。
そのため、Elasticsearchのインデックスに、型のマッピング用の設定情報を追加する必要がある。

ただし、fluentd経由で、elasticsearchに情報を投入していく場合(logstash_format trueにしている場合)には、インデックスが日々変わっていき、デフォルト設定では、logstash-(日付)というインデックスに書き込まれていく。

そのため、インデックス自体に型のマッピング情報を登録しようとすると、毎日その情報を登録しなくてはいけない。超めんどくさい。

そういう場合には、テンプレートに登録するやり方がある。登録方法は簡単で、以下な感じでHTTP経由で設定情報を放り込んでやればOK

# curl -X PUT http://localhost:9200/_template/ssh-access-template -d'{
  "template": "logstash-*",
  "mappings": {
    "ssh-access": {
      "properties": {
        "location": {
          "type": "geo_point"
        }
      }
    }
  }
}'

これで、インデックス名が、「logstash-*」なものの、type_nameが「ssh-access」なものの「location」の型を「geo_point」型にできる。

ただし、適用されるのはインデックスが作成されるときなので、もうすでに今日の分のインデックスがある場合には、次の日まで待つか削除する必要がある。

  • before
# curl -X GET http://localhost:9200/logstash-2015.11.03/_mapping/?pretty
{
  "logstash-2015.11.03" : {
    "mappings" : {
      "ssh-access" : {
        "properties" : {
          "@log_name" : {
            "type" : "string"
          },
          "@timestamp" : {
            "type" : "date",
            "format" : "dateOptionalTime"
          },
          "country" : {
            "type" : "string"
          },
          "hostname" : {
            "type" : "string"
          },
          "ip_address" : {
            "type" : "string"
          },
          "location" : {
            "type" : "string"
          },
          "reason" : {
            "type" : "string"
          },
          "user" : {
            "type" : "string"
          }
        }
      }
    }
  }
}
  • after
# curl -X GET http://localhost:9200/logstash-2015.11.03/_mapping/?pretty
{
  "logstash-2015.11.03" : {
    "mappings" : {
      "ssh-access" : {
        "properties" : {
          "@log_name" : {
            "type" : "string"
          },
          "@timestamp" : {
            "type" : "date",
            "format" : "dateOptionalTime"
          },
          "country" : {
            "type" : "string"
          },
          "hostname" : {
            "type" : "string"
          },
          "ip_address" : {
            "type" : "string"
          },
          "location" : {
            "type" : "geo_point"
          },
          "reason" : {
            "type" : "string"
          },
          "user" : {
            "type" : "string"
          }
        }
      }
    }
  }
}

と、locationの型がstringからgeo_pointに変わっていることが確認できた。

kibanaで可視化する

ここまでできて設定データが投入されれば、あとはkibanaでVisualize→Tile Mapを選ぶだけ

Geo CoordinatesのField欄に、geo_pointにしたデータ名が表示されているはず。

表示されず、型がgeo_pointなデータが存在しないよ!と怒られている場合は、kibanaが持っている型データが古くなっているので、「Setting」→「Indices」→使っているインデックスと進み、黄色い更新ボタン「Reload field list」をクリックすれば、型情報が更新され、型がgeo_pointなデータがとれるようになるはず。

感想

3種類のログに対応した正規表現を作るのが大変だった。それさえできてしまえばあとは流れでお願いできたので簡単だった。

もっと取得するログの種類を増やして面白そうな可視化を試してみたい。

を見て日々勉強中。