弱いエンジニアの備忘録

自分的に気になった技術に関するメモや備忘録です。Elasticsearchに関する記事が多くなりそうです。

ElasticsearchのAggregationをpagingする

概要

elasticsearchにはaggregationという便利な機能がありますが、
paginationを提供していません。

aggregateの仕組み上単純な話ではないようです。
そのあたりの詳細は(https://github.com/elastic/elasticsearch/issues/4915)をご覧ください。

今回はaggregateした結果を擬似的にページングします。
(ちょっと反則技が入ります。途中まで読んで憤慨しないでください。。。)
(しかも大きな制約があります。。。)

バージョン情報など

elasticsearch-6.0.0-alpha2
kibana-6.0.0-alpha2

手順

1. データの準備

今回はmetricbeatのデータを使いました。
インストールして起動するだけです。
metricbeatの使い方は本記事では説明しません。

2. データの確認

データが入っていることを確認してみます。
f:id:shin0higuchi:20170620014757p:plain
無事indexが作成されていることがわかります。

3. aggregationの適用

aggregationの結果をページングするということで、
まず普通にaggregationを使ってみましょう。

今回、aggregationで行うのは、
pidごとにbucketを分割して、その中の1件ずつをtop_hits aggregationで取得するというものです。

まずはterms aggregationでpid毎のbucketに分割して...
f:id:shin0higuchi:20170620015222p:plain

top_hits aggregationで、bucketから1件ずつ取得します。
f:id:shin0higuchi:20170620015235p:plain

4. ページングできるか?

ここで問題になってくるのが、
「bucketの数が膨大になった時にページングしたい」ということです。

通常のqueryであれば、fromとsizeを使って実現できますが、下の画像の通り、aggregationには効果がありません。
f:id:shin0higuchi:20170620015806p:plain
f:id:shin0higuchi:20170620015600p:plain

かといってaggregationの中に記述するとこの通りエラーが。
fromはサポートしていないようです...
f:id:shin0higuchi:20170620015652p:plain

5. 解決策

結局のところ、どうしてもページングしたければ別indexにaggregation結果を持つのが妥当だという結論に...
(読んでくれた方の半数がタブを閉じた気がします。)

metricbeatは常に情報が更新されるので、aggregation結果も随時updateする必要があります。
今回はwatcherを使って集計結果をindexingします。

watcherというと、メール通知やslack通知などができる便利なアラートと思われがちですが、
定期実行できる上にscriptが使える(webhookだって使える)というのは割と万能なやつです。
私はわりと好きです。

こんな感じでConsoleから登録できます。(今は一応UIも出ていますね。あまり使いやすくないですが。)
f:id:shin0higuchi:20170620020314p:plain


画像だとコピペできないと思うので
一応jsonを貼っておきます。

PUT _xpack/watcher/watch/aggregation
{
  "input": {
    "search": {
      "request": {
        "indices": [
          "metricbeat-6.0.0-alpha1-2017.06.*"
        ],
        "types": [
          "doc"
        ],
        "body": {
          "size": 0,
          "aggs": {
            "num": {
              "terms": {
                "field": "system.process.pid",
                "size": 10000
              },
              "aggs": {
                "top": {
                  "top_hits": {
                    "size": 1
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "actions": {
    "aggregate": {
      "transform": {
        "script": "return ['_doc' : ctx.payload.aggregations.num.buckets]"
      },
      "index": {
        "index": "aggregated",
        "doc_type": "aggregated"
      }
    }
  }
}

この定義で何をしているかというと、
毎分aggregationを行って、その結果をaggregatedというindexに登録しています。

動いているか確認

aggregationの結果がindexingされていることがわかります。
f:id:shin0higuchi:20170620021014p:plain

ページングする

あとはこのindexに対してfromおよびsizeを使えばOKです
f:id:shin0higuchi:20170620021614p:plain
f:id:shin0higuchi:20170620021620p:plain

こんな感じです。

まとめ

  • bucketをページングするのは現状難しい。
  • watcherを使ってaggregation結果をindexingしておくと楽ちん。
  • ただし、一段目のterms aggregationでsizeを指定するので、term(pid)のパターンが膨大だと不可。
  • ブラウザ上で100件ずつページ分けたい時などには使えそう。
  • 本当はpid毎にdocument_idを指定しないとだめです。今回は省略です。
  • (watchを止め忘れないようにしましょう)