资讯专栏INFORMATION COLUMN

Elasticsearch Java API 6.2(文档API)

lykops / 1516人阅读

摘要:注意当一个文档在快照的时间和索引请求过程之间发生变化时,会发生版本冲突。当版本匹配时,更新文档并增加版本号。在正在运行的更新中,使用更改的值使用查找的值。值加快进程立即生效,减慢查询的值在完成当前批处理后生效,以防止滚动超时。

文档API

本节描述以下CRUD API:

单文档的API

Index API

Get API

Delete API

Update API

多文档API

Multi Get API

Bulk API

Reindex API

Update By Query API

Delete By Query API

注意
所有CRUD API都是单索引API,索引参数接受单个索引名,或指向单个索引的别名
Index API

index API允许将类型化的JSON文档索引到特定的索引中,并使其可搜索。

生成JSON文档

生成JSON文档有几种不同的方法:

手动的(也就是你自己)使用原生byte[]或作为String

使用一个Map,该Map将自动转换为它的JSON等效项

使用第三方库对bean(如Jackson)进行序列化

使用内置的助手XContentFactory.jsonBuilder()

在内部,每个类型被转换为byte[](像String被转换为byte[]),因此,如果对象已经以这种形式存在,那么就使用它,jsonBuilder是高度优化的JSON生成器,它直接构造一个byte[]

自己动手

这里没有什么困难,但是请注意,您必须根据日期格式对日期进行编码。

String json = "{" +
        ""user":"kimchy"," +
        ""postDate":"2013-01-30"," +
        ""message":"trying out Elasticsearch"" +
    "}";
使用Map

Map是一个键:值对集合,它表示一个JSON结构:

Map json = new HashMap();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
bean序列化

可以使用Jacksonbean序列化为JSON,请将Jackson Databind添加到您的项目中,然后,您可以使用ObjectMapper来序列化您的bean:

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
使用Elasticsearch助手

Elasticsearch提供了内置的助手来生成JSON内容。

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意,您还可以使用startArray(String)endArray()方法添加数组,顺便说一下,field方法接受许多对象类型,您可以直接传递数字、日期甚至其他XContentBuilder对象。

如果需要查看生成的JSON内容,可以使用string()方法。

String json = builder.string();
索引文档

下面的示例将JSON文档索引为一个名为twitter的索引,其类型为tweet, id值为1:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

注意,您还可以将文档索引为JSON字符串,并且不需要提供ID:

String json = "{" +
        ""user":"kimchy"," +
        ""postDate":"2013-01-30"," +
        ""message":"trying out Elasticsearch"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json, XContentType.JSON)
        .get();

IndexResponse对象会给你一个响应:

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it"s the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();

有关索引操作的更多信息,请查看REST索引文档

Get API

get API允许根据索引的id从索引中获取类型化的JSON文档,下面的示例从一个名为twitter的索引中获取JSON文档,该索引的类型名为tweet, id值为1:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

有关get操作的更多信息,请查看REST get文档。

Delete API

delete API允许基于id从特定索引中删除类型化的JSON文档,下面的示例从名为twitter的索引中删除JSON文档,该索引的类型名为tweet, id值为1:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
Delete By Query API

通过查询删除的API可以根据查询结果删除给定的一组文档:

BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male")) 
    .source("persons")                                  
    .get();                                             
long deleted = response.getDeleted();

QueryBuilders.matchQuery("gender", "male")(查询)

source("persons") (索引)

get()(执行操作)

response.getDeleted()(被删除的文档数)

由于这是一个长时间运行的操作,如果您希望异步执行,可以调用execute而不是get,并提供如下监听器:

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))     
    .source("persons")                                      
    .execute(new ActionListener() {   
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();           
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });
Update API

您可以创建一个UpdateRequest并将其发送给客户端:

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

也可以使用prepareUpdate()方法:

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = "male""  , ScriptService.ScriptType.INLINE, null, null))
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()               
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

Script()(你的脚本,它也可以是本地存储的脚本名)

setDoc()(将合并到现有的文档)

注意,您不能同时提供脚本和doc

使用脚本更新

update API允许基于提供的脚本更新文档:

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = "male""));
client.update(updateRequest).get();
通过合并文档更新

update API还支持传递一个部分文档合并到现有文档中(简单的递归合并,内部合并对象,取代核心的“键/值”和数组),例如:

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();
Upsert

也有对Upsert的支持,如果文档不存在,则使用upsert元素的内容索引新的doc:

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);              
client.update(updateRequest).get();

如果文档不存在,将添加indexRequest中的文档。

如果文件index/type/1已经存在,我们将在此操作后获得如下文件:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}

"gender": "male"(此字段由更新请求添加)

如果不存在,我们将有一份新文件:

{
    "name" : "Joe Smith",
    "gender": "male"
}
Multi Get API

multi get API允许根据文档的indextypeid获取文档列表:

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")           
    .add("twitter", "tweet", "2", "3", "4") 
    .add("another", "type", "foo")          
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      
        String json = response.getSourceAsString(); 
    }
}

add("twitter", "tweet", "1")(通过单一id)

add("twitter", "tweet", "2", "3", "4")(或以相同index/type的id列表)

add("another", "type", "foo")(你也可以从另一个索引中得到)

MultiGetItemResponse itemResponse : multiGetItemResponses(迭代结果集)

response.isExists()(您可以检查文档是否存在)

response.getSourceAsString()(访问_source字段)

有关multi get操作的更多信息,请查看剩余的multi get文档

Bulk API

bulk API允许在一个请求中索引和删除多个文档,这里有一个示例用法:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}
使用Bulk处理器

BulkProcessor类提供了一个简单的接口,可以根据请求的数量或大小自动刷新bulk操作,或者在给定的时间之后。

要使用它,首先创建一个BulkProcessor实例:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
        .build();

beforeBulk()

此方法在执行bulk之前被调用,例如,您可以通过request.numberOfActions()查看numberOfActions

afterBulk(...BulkResponse response)

此方法在执行bulk之后被调用,例如,您可以通过response.hasFailures()检查是否存在失败请求

afterBulk(...Throwable failure)

bulk失败并引发一个可抛出对象时,将调用此方法

setBulkActions(10000)

我们希望每10,000个请求就执行一次bulk

setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))

我们希望每5MB就flush一次

setFlushInterval(TimeValue.timeValueSeconds(5))

无论请求的数量是多少,我们都希望每5秒flush一次

setConcurrentRequests(1)

设置并发请求的数量,值为0意味着只允许执行一个请求,在积累新的bulk请求时,允许执行一个值为1的并发请求

setBackoffPolicy()

设置一个自定义的备份策略,该策略最初将等待100ms,以指数形式增加并重试三次,当一个或多个bulk项目请求以EsRejectedExecutionException失败时,将尝试重试,该异常表明用于处理请求的计算资源太少,要禁用backoff,请传递BackoffPolicy.noBackoff()

默认情况下,BulkProcessor:

bulkActions设置为1000

bulkSize设置为5mb

不设置flushInterval

concurrentrequest设置为1,这意味着flush操作的异步执行

backoffPolicy设置为一个指数备份,8次重试,启动延时为50ms,总等待时间约为5.1秒

添加请求

然后您可以简单地将您的请求添加到BulkProcessor:

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
关闭Bulk Processor

当所有的文档都被加载到BulkProcessor,可以使用awaitCloseclose方法进行关闭:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

如果通过设置flushInterval来调度其他计划的flush,这两种方法都将flush所有剩余的文档,并禁用所有其他计划flush。如果并发请求被启用,那么awaitClose方法等待指定的超时以完成所有bulk请求,然后返回true,如果在所有bulk请求完成之前指定的等待时间已经过去,则返回falseclose方法不等待任何剩余的批量请求完成并立即退出。

在测试中使用Bulk Processor

如果您正在使用Elasticsearch运行测试,并且正在使用BulkProcessor来填充数据集,那么您最好将并发请求的数量设置为0,以便以同步方式执行批量的flush操作:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don"t need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();
Update By Query API

updateByQuery最简单的用法是在不更改源的情况下更新索引中的每个文档,这种用法允许获取一个新属性或另一个在线映射更改。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

updateByQuery API的调用从获取索引快照开始,索引使用内部版本控制找到任何文档。

注意
当一个文档在快照的时间和索引请求过程之间发生变化时,会发生版本冲突。

当版本匹配时,updateByQuery更新文档并增加版本号。

所有更新和查询失败都会导致updateByQuery中止,这些故障可以从BulkByScrollResponse#getIndexingFailures方法中获得,任何成功的更新仍然存在,并且不会回滚,当第一次失败导致中止时,响应包含由失败的bulk请求生成的所有失败。

为了防止版本冲突导致updateByQuery中止,请设置abortOnVersionConflict(false),第一个示例之所以这样做,是因为它试图获取在线映射更改,而版本冲突意味着在相同时间开始updateByQuery和试图更新文档的冲突文档。这很好,因为该更新将获取在线映射更新。

UpdateByQueryRequestBuilder API支持过滤更新的文档,限制要更新的文档总数,并使用脚本更新文档:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE, "ctx._source.awesome = "absolutely"", "painless", Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder还允许直接访问用于选择文档的查询,您可以使用此访问来更改默认的滚动大小,或者以其他方式修改对匹配文档的请求。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source().setSize(500);
BulkByScrollResponse response = updateByQuery.get();

您还可以将大小与排序相结合以限制文档的更新:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
    .source().addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

除了更改文档的_source字段外,还可以使用脚本更改操作,类似于Update API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == "absolutely) {"
            + "  ctx.op="noop""
            + "} else if (ctx._source.awesome == "lame") {"
            + "  ctx.op="delete""
            + "} else {"
            + "ctx._source.awesome = "absolutely"}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

Update API中,可以设置ctx.op的值来更改执行的操作:

noop

如果您的脚本没有做任何更改,设置ctx.op = "noop"updateByQuery操作将从更新中省略该文档,这种行为增加了响应主体中的noop计数器。

delete

如果您的脚本决定必须删除该文档,设置ctx.op = "delete",删除将在响应主体中已删除的计数器中报告。

ctx.op设置为任何其他值都会产生错误,在ctx中设置任何其他字段都会产生错误。

这个API不允许您移动它所接触的文档,只是修改它们的源,这是故意的!我们没有规定要把文件从原来的位置移走。

您也可以同时对多个索引和类型执行这些操作,类似于search API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

如果提供路由值,则进程将路由值复制到滚动查询,将进程限制为与路由值匹配的碎片:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery也可以通过指定这样的pipeline来使用ingest节点:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();
使用Task API

您可以使用Task API获取所有正在运行的update-by-query请求的状态:

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

使用上面所示的TaskId,您可以直接查找任务:

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

使用Cancel Task API

任何查询更新都可以使用Task Cancel API取消:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();

使用list tasks API查找taskId的值。

取消请求通常是一个非常快速的过程,但可能要花费几秒钟的时间,task status API继续列出任务,直到取消完成。

Rethrottling

在正在运行的更新中,使用_rethrottle API更改requests_per_second的值:

RethrottleAction.INSTANCE.newRequestBuilder(client)
    .setTaskId(taskId)
    .setRequestsPerSecond(2.0f)
    .get();

使用list tasks API查找taskId的值。

updateByQuery API一样,requests_per_second的值可以是任何正值的浮点值来设置节流的级别,或者Float.POSITIVE_INFINITY禁用节流。requests_per_second值加快进程立即生效,减慢查询的requests_per_second值在完成当前批处理后生效,以防止滚动超时。

Reindex API

详情见reindex API

BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client)
    .destination("target_index")
    .filter(QueryBuilders.matchQuery("category", "xzy")) 
    .get();

还可以提供查询来筛选应该从源索引到目标索引的哪些文档。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69697.html

相关文章

  • Elasticsearch Java API 6.2java client)

    摘要:高级客户端目前支持更常用的,但还有很多东西需要补充,您可以通过告诉我们您的应用程序需要哪些缺失的来帮助我们优化优先级,通过向这个添加注释高级客户端完整性。传输客户端排除非数据节点的原因是为了避免将搜索流量发送给主节点。 前言 本节描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客户端对象执行,所有操作本质上都是完全异步的(要么接收监听器...

    Gu_Yan 评论0 收藏0
  • Elasticsearch Java High Level REST Client(入门)

    摘要:入门本节描述从获取工件到在应用程序中使用它如何开始使用高级别客户端。保证能够与运行在相同主版本和大于或等于的次要版本上的任何节点通信。与具有相同的发布周期,将版本替换为想要的客户端版本。 Java High Level REST Client 入门 本节描述从获取工件到在应用程序中使用它如何开始使用高级别REST客户端。 兼容性 Java High Level REST Client需...

    honmaple 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<