在很多的情况下,我们我们想更新我们所有的文档:
- 添加一个新的field或者是一个字段变成一个multi-field
- 用一个值更新所有的文档,或者更新复合查询条件的所有文档
在今天的文章中,我们来讲一下_update_by_query的这几个用法。
准备数据
我们来创建一个叫做twitter的索引:
1 | PUT twitter |
我们使用如下的bulk API来把数据导入:
1 | POST _bulk |
把一个字段变为multi-field
在上面,我们有意识地把city字段设置为text,但是在实际的应用中city一般来说是keyword类型。比如我们想对city这个字段来进行aggregation。那么我们该如何纠正这个错误呢?我们需要把我们之前的index删除,并使用新的mapping再次重建吗?这在我们的实际的是使用中可能并不现实。这是因为你的数据可能是非常大的,而且这种改动可能会造成很多的问题。那么我们该如何解决这个问题呢?
一种办法是在不删除之前索引的情况下,我们把city变成为一个mulit-field的字段,这样它既可以是一个keyword的类型,也可以同样是一个text类型的字段。为此,我们来修改twitter的mapping:
1 | PUT twitter/_mapping |
请注意在上面,我们把message的字段变为一个mult-field的字段。即便我们已经把mapping修改了,但是我们的索引并没有把我们的message字段进行分词。为了达到这个目的,我们可以进行如下的操作:
POST twitter/_update_by_query
经过上面的操作后,message字段将会被重新被索引,并可以被我们搜索。
1 | GET twitter/_search |
上面显示的结果为:
1 | "hits" : { |
当然由于这个字段变为multi-field的字段,它含有city.keyword,我们可以对它进行聚合搜索:
1 | GET twitter/_search |
上面我们对city进行统计,上面显示结果为:
1 | "aggregations" : { |
如果我们不修改city为multi-field,我们将不能对这个字段进行统计了。
增加一个新的字段
同样我们可以通过script的方法来为我们的twitter增加一个新的字段,比如:
1 | POST twitter/_update_by_query |
通过上面的方法,我们把所有的文档都添加一个新的字段contact,并赋予它一个同样的值:
1 | GET twitter/_search |
上面的命令显示结果:
1 | "hits" : { |
从上面我们可以看出来,有增加一个新的字段contact。
修改已有的字段
假如我们想对所有在北京的文档里的uid都加1,那么我么有通过如下的方法:
1 | POST twitter/_update_by_query |
在执行上面的命令后,我们进行查询:
1 | GET twitter/_search |
显示结果:
1 | "hits" : { |
上面显示city为北京的所有的文档的uid的数值都被加1了。上面_id为1的原来的uid值为2,现在变为3。
没有动态mapping时,reindex索引
假设您创建了一个没有动态mapping的索引,将其填充了数据,然后添加了一个mapping值以从数据中获取更多字段:
1 | PUT test |
在上面我们创建一个叫做test的索引。首先它的动态mapping被禁止了,也就是在索引时凡是不在mapping定义的字段将被自动识别,它们仅仅存在于source里,我们不能对它进行搜索。为了纠正这个错误,我们在上面的最后一步尝试来修改它的mapping来解决这个问题。那么在新的mapping下,我们之前导入的文档能进行搜索吗?我们尝试如下的命令:
1 | POST test/_search?filter_path=hits.total |
我们尝试搜索所有flag中含有foo的文档,但是上面的返回结果是:
1 | { |
那么问题出现在哪里呢?其实在我们修改完mapping以后,我们没有更新我们之前已经导入的文档。我们需要使用_update_by_query来做类似reindex的工作。我们使用如下的命令:
POST test/_update_by_query?refresh&conflicts=proceed
我们重新来搜索我们的文档:
1 | POST test/_search?filter_path=hits.total |
上面的查询显示的结果是:
1 | { |
显然,在运行完_update_by_query后,我们可以找到我们的文档了。
针对大量数据的reindex
上面所有的_update_by_query针对少量的数据还是很不错的。但是在我们的实际应用中,我们可能遇到很大的数据量,那么万一在reindex的过程中发生意外,那我们还需要从头开始吗?或者我们已经处理过的数据还需要再做一遍吗?一种通用的解决办法就是在我们的mapping中定义一个字段,比如叫做reindexBatch,那么我们可以通过添加这个字段来跟踪我们的进度:
1 | POST blogs_fixed/_update_by_query |
即使在reindex的过程已经失败了,我们再次运行上面的_update_by_query时,之前已经处理过的文件将不再被处理了。
_update_by_query 除了上面的用法之外,我们也可以结合pipepline来对我们的索引数据进行加工。详细的用法请参阅我之前的文章“运用Elastic Stack分析COVID-19数据并进行可视化分析”。