资讯专栏INFORMATION COLUMN

Elasticsearch 学习: Java API (一)

taohonghui / 2072人阅读

摘要:最近在学习,这是一个分布式的大数据搜索引擎,其实也可以看作是一个分布式的数据库。多查找可以一次返回多个要查找的值。第二个会在批量失败后执行。在例子中,当请求超过个或者总大小超过时,触发批量提交动作。

最近在学习 Elasticsearch,这是一个分布式的大数据搜索引擎,其实也可以看作是一个分布式的数据库。我使用的 Elasticsearch 的版本是 2.4.1,鉴于网上相关的中文资料较少,所以自己看官方文档学习一下。

使用 Maven 工程,我的 pom 文件如下所示:

    
        
            org.elasticsearch
            elasticsearch
            2.4.1
        
        
            org.apache.logging.log4j
            log4j-api
            2.6.2
        
        
            org.apache.logging.log4j
            log4j-core
            2.6.2
        
    
    
连接机器
TransportClient client = TransportClient.builder()
    .build()
    .addTransportAddress(new InetSocketTransportAddress(InetAddress
    .getByName("localhost"), 9300));       
    
Index API 创建 Index 并且插入 Document

创建索引有很多种方法,这里列举常用的 2 种:

HashMap json = new HashMap();
json.put("first_name","Shuang");
json.put("last_name", "Peng");
json.put("age", 24);
json.put("about", "I love coding");
IndexResponse response = client
    .prepareIndex("tseg","students","1")
    .setSource(json).get();

IndexResponse response = client.prepareIndex("tseg","students","1")
   .setSource(jsonBuilder()
   .startObject()
   .field("first_name", "Shuang")
   .field("first_name", "Peng")
   .field("age", 24)
   .field("about", "I love coding")
   .endObject())
   .get();
 

注意:Index API 只能用于创建 index,类似于关系型数据库里面的 create table,他不能对已有的数据库进行添加。追加操作可以用后面会提到的 Update 或者 Bulk 来完成。

Get API 获取 Document
GetResponse response2 = client.prepareGet("tseg", "students", "1").get();
Map res = response2.getSource();
for (Map.Entry entry: res.entrySet()){
     System.out.println(entry.getKey() + " : " + entry.getValue());
     }
Delete API 删除 Index 或者 Document
// 用来删除对应的 document 
DeleteResponse response3 = 
    client.prepareDelete("tesg","students","1").get();
// 用来删除对应的 index
DeleteIndexResponse response4 = 
    client.admin().indices().prepareDelete("facebook").execute().actionGet();
Update API 更新操作

更新操作也有两种方法。建议使用第一种,第二种太复杂了。。。看看就好。

第一种

client.prepareUpdate("tseg", "students", "1")
    .setDoc(jsonBuilder()
    .startObject().field("age", 32)
    .endObject())
    .get();

第二种

IndexRequest indexRequest = new IndexRequest("tseg", "students", "1")
    .source(jsonBuilder()
    .startObject()
    .field("first_name", "Shuang")
    .field("last_name", "Peng")
    .field("age", 32)
    .field("about", "I loving coding")
    .endObject());

UpdateRequest updateRequest = new UpdateRequest("tseg","students", "1")
    .doc(jsonBuilder()
    .startObject().field("age", 32)
    .endObject())
    .upsert(indexRequest);
 client.update(updateRequest).get();

不过这里提一下第二种方法,如果对应的 field 不存在的话,则更新操作自动变为插入操作,否则,就是正常的修改操作。

Multi Get API 多查找

MultiGetResponse API 可以一次返回多个要查找的值。下面介绍了两种方法,一种是返回一个 Map,我们可以按照不同的 field 取值;第二种方法是直接返回一个字符串(Json格式)。

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("tseg", "students", "1", "2", "3").get();

for (MultiGetItemResponse itemResponses : multiGetItemResponses) {
    GetResponse response5 = itemResponses.getResponse();
    if (response5.isExists()) {
    
// 第一种用法
    Map fields = response5.getSource();
    System.out.println(fields.get("first_name"));

// 第二种用法
    String json2 = response5.getSourceAsString();
    System.out.println(json2);
}
Bulk API 批量操作

Bulk API允许批量提交index和delete请求, 如下:

BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("tseg", "students", "1")
           .setSource(jsonBuilder()
           .startObject()
           .field("first_name", "Allen")
           .field("last_name", "Peng")
           .field("age", "22")
           .endObject()))
           .get();
           
bulkRequest.add(client.prepareIndex("tseg", "students", "2"))
            .setSource(jsonBuilder()
            .startObject()
            .field("first_name", "Hou")
            .field("last_name", "Xue")
            .field("age", "30")
            .endObject()))
            .get();

HashMap json2 = new HashMap();
List list = new ArrayList();
list.add("music");
list.add("football");
json2.put("first_name", "Peng");
json2.put("last_name", "Peng");
json2.put("interests", list);
BulkRequestBuilder bulkRequest2 = client.prepareBulk();

// 两种执行方法,个人倾向于第一种
bulkRequest2.add(client.prepareIndex("facebook", "info", 
    "3").setSource(json2)).get();
// 第二种方法
bulkRequest2.add(client.prepareIndex("facebook", 
    "info","1").setSource(json2)).execute().actionGet();

还可以这样做:

BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("index1", "type1", "id1")
    .setSource(source);
bulkRequest.add(client.prepareIndex("index2", "type2", "id2")
    .setSource(source);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
Bulk Processor API 可在批量操作完成之前和之后进行相应的操作
BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                  BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
         .build();
         
bulkProcessor.add(new IndexRequest("index1", "type1", "id1").source(source1));  
bulkProcessor.add(new DeleteRequest("index2", "type2", "id2");        

beforeBulk 会在批量提交之前执行,可以从 BulkRequest 中获取请求信息request.requests() 或者请求数量 request.numberOfActions()。

第一个 afterBulk 会在批量成功后执行,可以跟 beforeBulk 配合计算批量所需时间。

第二个 afterBulk 会在批量失败后执行。

在例子中,当请求超过 10000 个(default=1000)或者总大小超过1GB(default=5MB)时,触发批量提交动作。

后记

项目代码已经共享至 GitHub。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69834.html

相关文章

  • Elasticsearch学习上手(

    摘要:是一个基于的开源搜索引擎。的目的是通过简单的来隐藏的复杂性,从而让全文搜索变得简单。它提供了许多合理的缺省值,并对初学者隐藏了复杂的搜索引擎理论。它开箱即用安装即可使用,只需很少的学习既可在生产环境中使用。 Elasticsearch是一个基于Apache Lucene(TM)的开源搜索引擎。一开始公司里一位同事是直接采用Luncene进行开发的,整体开发下来,代码量大,比较复杂,我就...

    PumpkinDylan 评论0 收藏0

发表评论

0条评论

taohonghui

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<