弱いエンジニアの備忘録

自分的に気になった技術に関するメモや備忘録です。Elasticsearchに関する記事が多くなりそうです。

X-Pack Watcherのindex actionではまったメモ

概要

elasticsearchのWatcher/Alertingを使っていて、はまったのでメモ。
具体的には "index action" についてです。

特定の条件を満たしたデータをまとめて別indexにbulkします。

バージョン情報など

  • elasticsearch-5.6.3
  • kibana-5.6.3
  • macOS High Sierra 10.13.1

やりたいこと

今回使うデータとして、
・anomaly_score
・time
という2種類の値を持ったドキュメントを用意します。
(3件しか登録してませんが、実際には大量のログがリアルタイムで来るイメージ)

f:id:shin0higuchi:20171101233740p:plain

ここから、anomaly_scoreが50を超えた物だけ抜き出して、anomaliesという別のindexに入れたい!
というのが今回の目的です。

では早速やってみます。

手順

1. watchを作成、登録する

バージョン6ではKibanaの画面から閾値ベースの定義が登録できそうですが、
今のバージョンではAPIを頑張って叩く必要があります。
今回はKIbanaのConsoleから登録します。

PUT _xpack/watcher/watch/test
{
  "trigger": {
    "schedule": {
      "interval": "1d"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": [
          "test"
        ],
        "body": {
          "query": {
            "bool": {
              "must": [
                {
                  "range": {
                    "time": {
                      "gte": "now-1d"
                    }
                  }
                },
                {
                  "range": {
                    "anomaly_score": {
                      "gte": 50
                    }
                  }
                }
              ]
            }
          }
        }
      }
    }
  },
  "actions": {
    "index_to_elasticsearch": {
      "index": {
        "index": "anomalies",
        "doc_type": "anomaly"
      }
    }
  }
}

...定義をJSONで書くと長いですね。しかし、内容はシンプルです。
ざっくり設定内容を書くと下記のようになっています。

  • trigger : 1日ごとに実行
  • input : test indexから「直近1日のデータ」かつ「anomaly_scoreが50以上」のドキュメントをとってくる
  • actions : inputから受け取ったペイロードをanomalies indexに登録する

2. watchを実行する

実際には登録したwatchは1日待っていれば実行されますが、
流石に待っているのもアレなので、無理矢理APIを叩いて実行します。

POST _xpack/watcher/watch/test/_execute

3. 結果を確認してみる

上手く動いていれば、2件のドキュメント(anomaly_scoreが50以上)がanomalies indexに登録されているはずです。

確認してみると...
f:id:shin0higuchi:20171101235806p:plain
しっかり検知・indexingされています!😊

....しかしちょっと待てよ。
確かに2件のドキュメントが登録されているように見えるけれど、
よく見ると単一のドキュメントとして2件分のデータが登録されてしまっています。

2件を見つけてくれたのは嬉しいけど、2件のドキュメントとしてindexingして欲しかったのです...

4.複数ドキュメントとして登録する

公式のドキュメントを読むと、解決策がしっかり書いてありました。
_docというフィールドにオブジェクト配列(ここでは各ドキュメント)を入れておけば
上手く複数ドキュメントとしてbulkしてくれるようです。

実際にwatchを修正してみましょう。
f:id:shin0higuchi:20171102000622p:plain

transformという部分を追加しました。
これでドキュメントの配列が_docというフィールドに入ってくれるはず。

5.再度watchを実行

一旦、anomalies indexを削除して、再度watchを実行してみます。
f:id:shin0higuchi:20171102001056p:plain

実行に失敗しました...
右の実行結果を読み進めて行くと...

"message": "MapperParsingException[Field [_index] is a metadata field and cannot be added inside a document. Use the index API request parameters.]"

なるほど。登録しようとしているドキュメント内に_indexなどのmeta fieldがあるとエラーが出るようです。

query投げた時に_sourceという部分だけを取得できれば良いのですが、
どうやらURI Searchではできても、Request Body Searchではできないように見えます。←ここが一番自信ないです。これできるなら誰か教えてください。
そうなってくるとwatcherとしてはinputの後で_sourceを取り出さなければ駄目そう。

6. _sourceだけを取り出してみた

"actions": {
    "index_to_elasticsearch": {
      "transform": {
          "script": "for (int i = 0; i < ctx.payload.hits.hits.length; ++i) {ctx.payload.hits.hits[i] = ctx.payload.hits.hits[i]._source }  return [ '_doc' : ctx.payload.hits.hits]"
        },
      "index": {
        "index": "anomalies",
        "doc_type": "anomaly"
      }
    }
  }

厳密に言うと、各ドキュメントを_sourceで上書きするかたちになっています。

7. 再度watchを実行する

再度実行、今度こそ!
f:id:shin0higuchi:20171102002356p:plain

上手く2ドキュメントに分かれて登録されました。
めでたし、めでたし😊

まとめ

  • index actionは1つのドキュメントとしてindexingされるのがデフォルト
  • _doc にオブジェクト配列を入れると、複数ドキュメントとして扱ってくれる
  • ドキュメント内部にmeta fieldがあるとrejectされる
  • もっと良い方法があってもおかしくない気もします。(知っている人いたら教えて欲しい)

以上です。