X-Pack Watcherのindex actionではまったメモ
概要
elasticsearchのWatcher/Alertingを使っていて、はまったのでメモ。
具体的には "index action" についてです。
特定の条件を満たしたデータをまとめて別indexにbulkします。
やりたいこと
今回使うデータとして、
・anomaly_score
・time
という2種類の値を持ったドキュメントを用意します。
(3件しか登録してませんが、実際には大量のログがリアルタイムで来るイメージ)
ここから、anomaly_scoreが50を超えた物だけ抜き出して、anomaliesという別のindexに入れたい!
というのが今回の目的です。
では早速やってみます。
手順
1. watchを作成、登録する
バージョン6ではKibanaの画面から閾値ベースの定義が登録できそうですが、
今のバージョンではAPIを頑張って叩く必要があります。
今回はKIbanaのConsoleから登録します。
PUT _xpack/watcher/watch/test { "trigger": { "schedule": { "interval": "1d" } }, "input": { "search": { "request": { "indices": [ "test" ], "body": { "query": { "bool": { "must": [ { "range": { "time": { "gte": "now-1d" } } }, { "range": { "anomaly_score": { "gte": 50 } } } ] } } } } } }, "actions": { "index_to_elasticsearch": { "index": { "index": "anomalies", "doc_type": "anomaly" } } } }
...定義をJSONで書くと長いですね。しかし、内容はシンプルです。
ざっくり設定内容を書くと下記のようになっています。
- trigger : 1日ごとに実行
- input : test indexから「直近1日のデータ」かつ「anomaly_scoreが50以上」のドキュメントをとってくる
- actions : inputから受け取ったペイロードをanomalies indexに登録する
2. watchを実行する
実際には登録したwatchは1日待っていれば実行されますが、
流石に待っているのもアレなので、無理矢理APIを叩いて実行します。
POST _xpack/watcher/watch/test/_execute
3. 結果を確認してみる
上手く動いていれば、2件のドキュメント(anomaly_scoreが50以上)がanomalies indexに登録されているはずです。
確認してみると...
しっかり検知・indexingされています!😊
....しかしちょっと待てよ。
確かに2件のドキュメントが登録されているように見えるけれど、
よく見ると単一のドキュメントとして2件分のデータが登録されてしまっています。
2件を見つけてくれたのは嬉しいけど、2件のドキュメントとしてindexingして欲しかったのです...
4.複数ドキュメントとして登録する
公式のドキュメントを読むと、解決策がしっかり書いてありました。
_docというフィールドにオブジェクト配列(ここでは各ドキュメント)を入れておけば
上手く複数ドキュメントとしてbulkしてくれるようです。
実際にwatchを修正してみましょう。
transformという部分を追加しました。
これでドキュメントの配列が_docというフィールドに入ってくれるはず。
5.再度watchを実行
一旦、anomalies indexを削除して、再度watchを実行してみます。
実行に失敗しました...
右の実行結果を読み進めて行くと...
"message": "MapperParsingException[Field [_index] is a metadata field and cannot be added inside a document. Use the index API request parameters.]"
なるほど。登録しようとしているドキュメント内に_indexなどのmeta fieldがあるとエラーが出るようです。
query投げた時に_sourceという部分だけを取得できれば良いのですが、
どうやらURI Searchではできても、Request Body Searchではできないように見えます。←ここが一番自信ないです。これできるなら誰か教えてください。
そうなってくるとwatcherとしてはinputの後で_sourceを取り出さなければ駄目そう。
6. _sourceだけを取り出してみた
"actions": { "index_to_elasticsearch": { "transform": { "script": "for (int i = 0; i < ctx.payload.hits.hits.length; ++i) {ctx.payload.hits.hits[i] = ctx.payload.hits.hits[i]._source } return [ '_doc' : ctx.payload.hits.hits]" }, "index": { "index": "anomalies", "doc_type": "anomaly" } } }
厳密に言うと、各ドキュメントを_sourceで上書きするかたちになっています。
7. 再度watchを実行する
再度実行、今度こそ!
上手く2ドキュメントに分かれて登録されました。
めでたし、めでたし😊
まとめ
- index actionは1つのドキュメントとしてindexingされるのがデフォルト
- _doc にオブジェクト配列を入れると、複数ドキュメントとして扱ってくれる
- ドキュメント内部にmeta fieldがあるとrejectされる
- もっと良い方法があってもおかしくない気もします。(知っている人いたら教えて欲しい)
以上です。