【09】把 Elasticsearch 当数据库使:HAVING与Pipeline Aggregation

982 查看

使用 https://github.com/taowen/es-monitor 可以用 SQL 进行 elasticsearch 的查询。Elasticsearch 2.0引入的一个重大特性是支持了PipelineAggregation。在有这个特性之前,elasticsearch聚合之后可以做的计算仅仅是对TermsAggregation的结果做一个排寻,并取个TOP N。初此之外什么计算都做不了。而SQL里一个重要的特性是HAVING字句,用其过滤我们不关心的桶,以减少结果的数量。今天我们就来看看如何用Pipeline Aggregation实现HAVING。

HAVING ipo_count > 300

SQL

$ cat << EOF | ./es_query.py http://127.0.0.1:9200
    SELECT ipo_year, COUNT(*) AS ipo_count FROM symbol 
    GROUP BY ipo_year HAVING ipo_count > 200
EOF
{"ipo_count": 390, "ipo_year": 2014}
{"ipo_count": 334, "ipo_year": 2015}
{"ipo_count": 253, "ipo_year": 2013}

Elasticsearch

{
  "aggs": {
    "ipo_year": {
      "terms": {
        "field": "ipo_year", 
        "size": 0
      }, 
      "aggs": {
        "having": {
          "bucket_selector": {
            "buckets_path": {
              "ipo_count": "_count"
            }, 
            "script": {
              "lang": "expression", 
              "inline": " ipo_count > 200"
            }
          }
        }
      }
    }
  }, 
  "size": 0
}

这里bucket_selector使用的语法和前面GROUP BY ipo_year % 5的语法是类似的。不同之处在于,之前的script是从document里取值。而这里的script是从当前bucket里取值。

{
  "hits": {
    "hits": [], 
    "total": 6714, 
    "max_score": 0.0
  }, 
  "_shards": {
    "successful": 1, 
    "failed": 0, 
    "total": 1
  }, 
  "took": 3, 
  "aggregations": {
    "ipo_year": {
      "buckets": [
        {
          "key": 2014, 
          "doc_count": 390
        }, 
        {
          "key": 2015, 
          "doc_count": 334
        }, 
        {
          "key": 2013, 
          "doc_count": 253
        }
      ], 
      "sum_other_doc_count": 0, 
      "doc_count_error_upper_bound": 0
    }
  }, 
  "timed_out": false
}

Profile

[
  {
    "query": [
      {
        "query_type": "MatchAllDocsQuery",
        "lucene": "*:*",
        "time": "0.9405890000ms",
        "breakdown": {
          "score": 0,
          "create_weight": 17443,
          "next_doc": 753268,
          "match": 0,
          "build_scorer": 169878,
          "advance": 0
        }
      }
    ],
    "rewrite_time": 7177,
    "collector": [
      {
        "name": "MultiCollector",
        "reason": "search_multi",
        "time": "6.110736000ms",
        "children": [
          {
            "name": "TotalHitCountCollector",
            "reason": "search_count",
            "time": "0.8071620000ms"
          },
          {
            "name": "LongTermsAggregator: [ipo_year]",
            "reason": "aggregation",
            "time": "2.416942000ms"
          }
        ]
      }
    ]
  }
]

Having看来是Lucene计算完之后在Elasticsearch的内存里自己过滤的,所以没有体现在Profile的结果里。

HAVING ipo_count > 100 AND max_last_sale <= 10000

SQL

$ cat << EOF | ./es_query.py http://127.0.0.1:9200
    SELECT ipo_year, COUNT(*) AS ipo_count, MAX(last_sale) AS max_last_sale FROM symbol 
    GROUP BY ipo_year HAVING ipo_count > 100 AND max_last_sale <= 10000
EOF
{"max_last_sale": 6178.0, "ipo_count": 390, "ipo_year": 2014}

Elasticsearch

{
  "aggs": {
    "ipo_year": {
      "terms": {
        "field": "ipo_year", 
        "size": 0
      }, 
      "aggs": {
        "max_last_sale": {
          "max": {
            "field": "last_sale"
          }
        }, 
        "having": {
          "bucket_selector": {
            "buckets_path": {
              "max_last_sale": "max_last_sale", 
              "ipo_count": "_count"
            }, 
            "script": {
              "lang": "expression", 
              "inline": " ipo_count > 100 && max_last_sale <= 10000"
            }
          }
        }
      }
    }
  }, 
  "size": 0
}
{
  "hits": {
    "hits": [], 
    "total": 6714, 
    "max_score": 0.0
  }, 
  "_shards": {
    "successful": 1, 
    "failed": 0, 
    "total": 1
  }, 
  "took": 3, 
  "aggregations": {
    "ipo_year": {
      "buckets": [
        {
          "max_last_sale": {
            "value": 6178.0
          }, 
          "key": 2014, 
          "doc_count": 390
        }
      ], 
      "sum_other_doc_count": 0, 
      "doc_count_error_upper_bound": 0
    }
  }, 
  "timed_out": false
}

Profile

[
  {
    "query": [
      {
        "query_type": "MatchAllDocsQuery",
        "lucene": "*:*",
        "time": "0.2386400000ms",
        "breakdown": {
          "score": 0,
          "create_weight": 7620,
          "next_doc": 204982,
          "match": 0,
          "build_scorer": 26038,
          "advance": 0
        }
      }
    ],
    "rewrite_time": 2379,
    "collector": [
      {
        "name": "MultiCollector",
        "reason": "search_multi",
        "time": "1.955767000ms",
        "children": [
          {
            "name": "TotalHitCountCollector",
            "reason": "search_count",
            "time": "0.2170820000ms"
          },
          {
            "name": "LongTermsAggregator: [ipo_year]",
            "reason": "aggregation",
            "time": "0.9893530000ms"
          }
        ]
      }
    ]
  }
]