elasticsearch for java之Document APIs【增删改查】
发布日期:2021-05-07 04:26:45 浏览次数:20 分类:精选文章

本文共 12634 字,大约阅读时间需要 42 分钟。

环境

虚拟机:centos7

操作系统:win7
elasticsearch:5.4.3

Index API

Index API 允许索引(插入)一个json类型的文档到指定的索引并使其能被搜索到。

生成json文档

这里有几种不同的方式来生成json文档:

①手动(自己去拼接)使用本机byte[]或者是String
②可以使用自动转换为json的等价物,即Map
③使用第三方库来序列化你的beans例如:JackSon
④使用内置的助手XContentFactory.jsonBuilder()

手动拼接

这没什么难的,唯一注意的地方就是日期格式要拼接正确:

String json = "{" +        "\"user\":\"kimchy\"," +        "\"postDate\":\"2013-01-30\"," +        "\"message\":\"trying out Elasticsearch\"" +    "}";

使用Map

Map是键值对的集合。它代表json结构:

Map
json = new HashMap
();json.put("user","kimchy");json.put("postDate",new Date());json.put("message","trying out Elasticsearch");

序列化你的bean

你可以使用Jackson把你的bean序列化为json。请添加Jackson Databind到你的项目中。接着你可以使用ObjectMapper序列化你的bean

import com.fasterxml.jackson.databind.*;// instance a json mapperObjectMapper mapper = new ObjectMapper(); // create once, reuse// generate jsonbyte[] json = mapper.writeValueAsBytes(yourbeaninstance);//例子:Map
map = new HashMap
();map.put("user","kimchy");map.put("postDate",new Date());map.put("message","trying out Elasticsearch");ObjectMapper mapper = new ObjectMapper();byte[] json = mapper.writeValueAsBytes(map);//把map转成json序列,并把结果输出成字节数组String asString = mapper.writeValueAsString(json1);//把map转成json序列,并把结果输出成字符串System.out.println(asString);

使用elasticsearch助手

elasticsearch提供内置的助手来生成json内容。

import static org.elasticsearch.common.xcontent.XContentFactory.*;XContentBuilder builder = jsonBuilder()    .startObject()        .field("user", "kimchy")        .field("postDate", new Date())        .field("message", "trying out Elasticsearch")    .endObject()

注意:你也可以使用startArray(String)endArray()方法来添加数组。顺便提一下,这个field方法接受许多对象类型。你可以直接输入数字日期和其他的XContentBuilder对象。(使用数组时,我的没有成功,报错啦)

XContentBuilder array = jsonBuilder().startArray().field("user", "yutao")                .field("postDate", new Date())                .field("message", "trying out Elast").endArray();System.out.println(array.string());//或者XContentBuilder array = jsonBuilder().startArray("name").endArray();System.out.println(array.string());

执行时,报:

com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value

如果你想查生成的json内容,你可以使用toString()方法。

String json = builder.string();

索引文档

这里的索引其实就是指新增插入。

下面这里例子是索引一个json文档到索引名(数据库名)为twitter,类型名(表名)为tweetid值为1

import static org.elasticsearch.common.xcontent.XContentFactory.*;IndexResponse response = client.prepareIndex("twitter", "tweet", "1")        .setSource(jsonBuilder()                    .startObject()                        .field("user", "kimchy")                        .field("postDate", new Date())                        .field("message", "trying out Elasticsearch")                    .endObject()                  )        .get();

注意:你也可以索引(插入)json字符串文档并且不指定ID

String json = "{" +        "\"user\":\"kimchy\"," +        "\"postDate\":\"2013-01-30\"," +        "\"message\":\"trying out Elasticsearch\"" +    "}";IndexResponse response = client.prepareIndex("twitter", "tweet")        .setSource(json)        .get();

你可以从IndexResponse对象中得到以下信息:

// Index nameString _index = response.getIndex();// Type nameString _type = response.getType();// Document ID (generated or not)String _id = response.getId();// Version (if it's the first time you index this document, you will get: 1)long _version = response.getVersion();// status has stored current instance statement.RestStatus status = response.status();//这个我本机驱动包没有这个方法

关于操作索引的更多信息,请查看REST文档。

操作线程

index api允许设置线程模型操作,当在同一个节点上实际要执行的api被执行时,该操作将会被执行。(这个api在同一台服务器上分配的分片上执行。)

选项是在不同的线程上执行该操作,或者在调用线程时被执行(注意该api依旧是异步的)。

默认情况下,operationThreaded设置为true,这意味着这个操作在不同的线程上执行。

Get API

get api允许通过id来获取索引(数据库)中的json类型文档。

下面的例子就是从索引名为twitter,类型名为tweet中获取id值为1json文档。

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

关于更多的get操作,查看REST文档

操作线程

get api允许设置线程模型。该操作是在当实际要执行的api在同一个节点上执行时将会被执行。(该api是分配在同一台服务器上的一个分片上执行)。

选项是在不同线程上执行该操作或者在调用线程上执行的该操作(请注意,该api仍然是异步的)。默认情况下,operationThreaded设置为true,这意味着在不同的线程上执行该操作。下面例子是设置为false

GetResponse response = client.prepareGet("twitter", "tweet", "1")        .setOperationThreaded(false)        .get();

Delete API

你可以指定id来删除json类型的文档。

下面的例子是从索引名为twitter、类型名为tweet中,删除id1json类型文档。

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

关于更多的delete信息,可以查看文档。

操作线程

delete api 允许设置线程模型。该操作在 实际执行的api在同一节点(api会在分配到同一服务器的分片上执行)上被执行时 将会执行。

选项是在不同的线程上执行该操作,或者在调用线程时执行(这个api仍然是异步的)。

默认情况下,operationThreaded设置为true,其意味着在不同线程上执行该操作。
下面是设置为false的例子:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")        .setOperationThreaded(false)        .get();

Delete By Query API

delete by query api允许删除基于查询结果的一组文档:

BulkByScrollResponse response =    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)        .filter(QueryBuilders.matchQuery("gender", "male"))         .source("persons")                                          .get();                                             long deleted = response.getDeleted();

说明:上面的代码,我实践时,提示错误:BulkByScrollResponse cannot be resolved to a type,也就是不知道org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse是哪个jar包,反正,org.elasticsearch.client这里面没有。

因为它是可以长时间运行操作的,如果你想异步,你可以调用execute而不是get并提供一个监视器。如:

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)    .filter(QueryBuilders.matchQuery("gender", "male"))                      .source("persons")                                                       .execute(new ActionListener
() { @Override public void onResponse(BulkIndexByScrollResponse response) { long deleted = response.getDeleted(); } @Override public void onFailure(Exception e) { // Handle the exception } });

Update API

你可以创建一个UpdateRequest,把它发送给client

UpdateRequest updateRequest = new UpdateRequest();updateRequest.index("index");updateRequest.type("type");updateRequest.id("1");updateRequest.doc(jsonBuilder()        .startObject()            .field("gender", "male")        .endObject());client.update(updateRequest).get();

或者你也可以使用prepareUpdate()方法:

client.prepareUpdate("ttl", "doc", "1")        .setScript(new Script("ctx._source.gender = \"male\"" ①, ScriptService.ScriptType.INLINE, null, null))        .get();client.prepareUpdate("ttl", "doc", "1")        .setDoc(jsonBuilder()        ②                   .startObject()                .field("gender", "male")            .endObject())        .get();

①:你自己写的脚本,该脚本可以是你本地的脚本名。这种情况下,你需要使用ScriptService.ScriptType.FILE

②:将合并到现有的文档。

注意:你不同同时使用scriptdoc

Update by script

update api 允许提供基于脚本来更新文档:

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")        .script(new Script("ctx._source.gender = \"male\""));client.update(updateRequest).get();

Update by merging documents

update api 还支持传递一部分api,它将合并到现有的文档中。

(简单的递归合并,对象内部合并,替换的核心就是键值对和数组)
通俗点就是局部更新。

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")        .doc(jsonBuilder()            .startObject()                .field("gender", "male")            .endObject());client.update(updateRequest).get();

Upsert

还支持upsert。如果文档中不存在相应字段,upsert元素内容,将会创建添加进文档。

IndexRequest indexRequest = new IndexRequest("index", "type", "1")        .source(jsonBuilder()            .startObject()                .field("name", "Joe Smith")                .field("gender", "male")            .endObject());UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")        .doc(jsonBuilder()            .startObject()                .field("gender", "male")            .endObject())        .upsert(indexRequest);           ①   client.update(updateRequest).get();

①:如果文档不存在,indexRequest被添加。

如果文档index/type/已经存在了,我们执行这个操作后,情况如下:

{    "name"  : "Joe Dalton",    "gender": "male"        ①}

①这个字段将会通过更新请求添加进去

如果其不存在,我们将会得到一个新的文档:

{    "name" : "Joe Smith",    "gender": "male"}

Multi Get API

Multi Get API 允许根据indextypeid获取文档列表。

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()    .add("twitter", "tweet", "1")           ①    .add("twitter", "tweet", "2", "3", "4") ②    .add("another", "type", "foo")          ③    .get();for (MultiGetItemResponse itemResponse : multiGetItemResponses) { ④    GetResponse response = itemResponse.getResponse();    if (response.isExists()) {                      ⑤        String json = response.getSourceAsString(); ⑥    }}

①更具id得到单个文档

②在同一个index(数据库)和type(表)中通过一组id来查询。
③你也可以查询其他的index(数据库)
④迭代得带的集合
⑤你可以查询文档是否存在
⑥访问字段_source

关于multi查询操作的更多信息,可以查看REST文档

Bulk API

Bulk API 允许在单个请求中对多个文档进行索引(插入)和删除。例如:

import static org.elasticsearch.common.xcontent.XContentFactory.*;BulkRequestBuilder bulkRequest = client.prepareBulk();// either use client#prepare, or use Requests# to directly build index/delete requestsbulkRequest.add(client.prepareIndex("twitter", "tweet", "1")        .setSource(jsonBuilder()                    .startObject()                        .field("user", "kimchy")                        .field("postDate", new Date())                        .field("message", "trying out Elasticsearch")                    .endObject()                  )        );bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")        .setSource(jsonBuilder()                    .startObject()                        .field("user", "kimchy")                        .field("postDate", new Date())                        .field("message", "another post")                    .endObject()                  )        );BulkResponse bulkResponse = bulkRequest.get();if (bulkResponse.hasFailures()) {    // process failures by iterating through each bulk response item}

Using Bulk Processor

BulkProcessor类提供了一个简单的界面,可以根据请求的数量或大小或一段时间后进行自动刷新批处理操作。

import org.elasticsearch.action.bulk.BackoffPolicy;import org.elasticsearch.action.bulk.BulkProcessor;import org.elasticsearch.common.unit.ByteSizeUnit;import org.elasticsearch.common.unit.ByteSizeValue;import org.elasticsearch.common.unit.TimeValue;BulkProcessor bulkProcessor = BulkProcessor.builder(        client,  ①        new BulkProcessor.Listener() {            @Override            public void beforeBulk(long executionId,                                   BulkRequest request) { ... } ②            @Override            public void afterBulk(long executionId,                                  BulkRequest request,                                  BulkResponse response) { ... } ③            @Override            public void afterBulk(long executionId,                                  BulkRequest request,                                  Throwable failure) { ... } ④        })        .setBulkActions(10000) ⑤        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) ⑥        .setFlushInterval(TimeValue.timeValueSeconds(5)) ⑦        .setConcurrentRequests(1) ⑧        .setBackoffPolicy(            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) ⑨        .build();

①:添加你自己的elasticsearch client

②:bulk执行之前这个方法将会被调用。你可以使用request.numberOfActions()来看numberOfActions
③:这个方法在执行bulk之后调用。你可以通过使用response.hasFailures()来检查是否有请求失败。
④:当bulk失败并且扔出Throwable时,调用该方法。
⑤:我们想每1000个请求,执行bulk
⑥:我们想每5Mb,flush一次bulk
⑦:我们想无论多少次请求,每5分钟,flush一次bulk
⑧:设置并发请求的数量。值为0表示:只允许单个请求被执行。值为1表示:在累计新bulk请求时,允许最大的并发数为1。
⑨:自定义设置退避规则,初始值等待100ms,指数增长并且重试3次。当一个或多个bulk请求失败并使用EsRejectedExecutionException进行重试,可用于处理请求的计算机资源太少。要关闭该退避(backoff),通过BackoffPolicy.noBackoff()方法来关闭。

BulkProcessor的默认值:

1、bulkActions设置为100
2、bulkSize设置为5Mb
3、flushInterval不设置
4、concurrentRequests设置为1,这意味着,异步执行flush操作。
5、backoffPolicy设置指数退避,8次重试和50ms的启动延迟。总等待时间为5.1秒。

Add requests

然后,你只需要添加你的请求到BulkProcessor

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

Closing the Bulk Processor

当所有的文档加载到BulkProcessor,可以使用awaitClose或者close方法进行关闭:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

或者

bulkProcessor.close();

这两种方法都是刷新剩余的文档,并通过设置flushInterval来排除所有其他调度的刷新。如果启动了并发请求,awaitClose方法等待所有的请求,要是所有的请求在指定超时时间内完成,就返回true,如果所有的bulk请求完成之前,超过了指定的等待时间,就返回false

close方法不会等待剩余任何bulk请求,其会立即退出。

Using Bulk Processor in tests

如果你在使用elasticsearch运行你的测试,并使用BulkProcessor来填充你的数据集

你最好把并发请求数设置为0,这样bulk的flush操作将会以同步的方式来执行:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })        .setBulkActions(10000)        .setConcurrentRequests(0)        .build();// Add your requestsbulkProcessor.add(/* Your requests */);// Flush any remaining requestsbulkProcessor.flush();// Or close the bulkProcessor if you don't need it anymorebulkProcessor.close();// Refresh your indicesclient.admin().indices().prepareRefresh().get();// Now you can start searching!client.prepareSearch().get();

参考地址:

上一篇:elasticsearch 批量导入数据
下一篇:elasticsearch之模块【节点】

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2025年04月11日 18时58分39秒