
本文共 12634 字,大约阅读时间需要 42 分钟。
环境
虚拟机:centos7
操作系统:win7 elasticsearch:5.4.3Index API
Index API 允许索引(插入)一个json
类型的文档到指定的索引并使其能被搜索到。
生成json文档
这里有几种不同的方式来生成json
文档:
byte[]
或者是String
。 ②可以使用自动转换为json
的等价物,即Map
。 ③使用第三方库来序列化你的beans
例如:JackSon
。 ④使用内置的助手XContentFactory.jsonBuilder()
。 手动拼接
这没什么难的,唯一注意的地方就是日期格式要拼接正确:
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
使用Map
Map
是键值对的集合。它代表json
结构:
Mapjson = new HashMap ();json.put("user","kimchy");json.put("postDate",new Date());json.put("message","trying out Elasticsearch");
序列化你的bean
你可以使用Jackson
把你的bean
序列化为json
。请添加Jackson Databind
到你的项目中。接着你可以使用ObjectMapper
序列化你的bean
:
import com.fasterxml.jackson.databind.*;// instance a json mapperObjectMapper mapper = new ObjectMapper(); // create once, reuse// generate jsonbyte[] json = mapper.writeValueAsBytes(yourbeaninstance);//例子:Mapmap = new HashMap ();map.put("user","kimchy");map.put("postDate",new Date());map.put("message","trying out Elasticsearch");ObjectMapper mapper = new ObjectMapper();byte[] json = mapper.writeValueAsBytes(map);//把map转成json序列,并把结果输出成字节数组String asString = mapper.writeValueAsString(json1);//把map转成json序列,并把结果输出成字符串System.out.println(asString);
使用elasticsearch助手
elasticsearch提供内置的助手来生成json
内容。
import static org.elasticsearch.common.xcontent.XContentFactory.*;XContentBuilder builder = jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject()
注意:你也可以使用startArray(String)
和endArray()
方法来添加数组。顺便提一下,这个field
方法接受许多对象类型。你可以直接输入数字
、日期
和其他的XContentBuilder
对象。(使用数组时,我的没有成功,报错啦)
XContentBuilder array = jsonBuilder().startArray().field("user", "yutao") .field("postDate", new Date()) .field("message", "trying out Elast").endArray();System.out.println(array.string());//或者XContentBuilder array = jsonBuilder().startArray("name").endArray();System.out.println(array.string());
执行时,报:
com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value
如果你想查生成的json
内容,你可以使用toString()
方法。
String json = builder.string();
索引文档
这里的索引
其实就是指新增插入。
下面这里例子是索引一个json
文档到索引名(数据库名)为twitter
,类型名(表名)为tweet
和id
值为1
:
import static org.elasticsearch.common.xcontent.XContentFactory.*;IndexResponse response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) .get();
注意:你也可以索引(插入)json
字符串文档并且不指定ID
:
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json) .get();
你可以从IndexResponse
对象中得到以下信息:
// Index nameString _index = response.getIndex();// Type nameString _type = response.getType();// Document ID (generated or not)String _id = response.getId();// Version (if it's the first time you index this document, you will get: 1)long _version = response.getVersion();// status has stored current instance statement.RestStatus status = response.status();//这个我本机驱动包没有这个方法
关于操作索引的更多信息,请查看REST文档。
操作线程
index api
允许设置线程模型操作,当在同一个节点上实际要执行的api
被执行时,该操作将会被执行。(这个api
在同一台服务器上分配的分片上执行。)
选项是在不同的线程上执行该操作,或者在调用线程时被执行(注意该api依旧是异步的)。
默认情况下,operationThreaded
设置为true
,这意味着这个操作在不同的线程上执行。 Get API
get api允许通过id来获取索引(数据库)中的json类型文档。
下面的例子就是从索引名为twitter
,类型名为tweet
中获取id
值为1
的json
文档。 GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
关于更多的get
操作,查看REST文档
操作线程
get api允许设置线程模型。该操作是在当实际要执行的api
在同一个节点上执行时将会被执行。(该api是分配在同一台服务器上的一个分片上执行)。
选项是在不同线程上执行该操作或者在调用线程上执行的该操作(请注意,该api仍然是异步的)。默认情况下,operationThreaded
设置为true
,这意味着在不同的线程上执行该操作。下面例子是设置为false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) .get();
Delete API
你可以指定id
来删除json
类型的文档。
twitter
、类型名为tweet
中,删除id
为1
的json
类型文档。 DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
关于更多的delete
信息,可以查看文档。
操作线程
delete api
允许设置线程模型。该操作在 实际执行的api在同一节点(api会在分配到同一服务器的分片上执行)上被执行时
将会执行。
选项是在不同的线程上执行该操作,或者在调用线程时执行(这个api仍然是异步的)。
默认情况下,operationThreaded
设置为true
,其意味着在不同线程上执行该操作。 下面是设置为false
的例子: DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .setOperationThreaded(false) .get();
Delete By Query API
delete by query api
允许删除基于查询结果的一组文档:
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .get(); long deleted = response.getDeleted();
说明:上面的代码,我实践时,提示错误:BulkByScrollResponse cannot be resolved to a type
,也就是不知道org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse
是哪个jar
包,反正,org.elasticsearch.client
这里面没有。
因为它是可以长时间运行操作的,如果你想异步,你可以调用execute
而不是get
并提供一个监视器。如:
DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .execute(new ActionListener() { @Override public void onResponse(BulkIndexByScrollResponse response) { long deleted = response.getDeleted(); } @Override public void onFailure(Exception e) { // Handle the exception } });
Update API
你可以创建一个UpdateRequest
,把它发送给client
UpdateRequest updateRequest = new UpdateRequest();updateRequest.index("index");updateRequest.type("type");updateRequest.id("1");updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject());client.update(updateRequest).get();
或者你也可以使用prepareUpdate()
方法:
client.prepareUpdate("ttl", "doc", "1") .setScript(new Script("ctx._source.gender = \"male\"" ①, ScriptService.ScriptType.INLINE, null, null)) .get();client.prepareUpdate("ttl", "doc", "1") .setDoc(jsonBuilder() ② .startObject() .field("gender", "male") .endObject()) .get();
①:你自己写的脚本,该脚本可以是你本地的脚本名。这种情况下,你需要使用ScriptService.ScriptType.FILE
。
注意:你不同同时使用script
和doc
。
Update by script
update api 允许提供基于脚本来更新文档:
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1") .script(new Script("ctx._source.gender = \"male\""));client.update(updateRequest).get();
Update by merging documents
update api 还支持传递一部分api,它将合并到现有的文档中。
(简单的递归合并,对象内部合并,替换的核心就是键值对和数组) 通俗点就是局部更新。UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject());client.update(updateRequest).get();
Upsert
还支持upsert
。如果文档中不存在相应字段,upsert
元素内容,将会创建添加进文档。
IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject());UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); ① client.update(updateRequest).get();
①:如果文档不存在,indexRequest
被添加。
如果文档index/type/
已经存在了,我们执行这个操作后,情况如下:
{ "name" : "Joe Dalton", "gender": "male" ①}
①这个字段将会通过更新请求添加进去
如果其不存在,我们将会得到一个新的文档:
{ "name" : "Joe Smith", "gender": "male"}
Multi Get API
Multi Get API 允许根据index
,type
和id
获取文档列表。
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") ① .add("twitter", "tweet", "2", "3", "4") ② .add("another", "type", "foo") ③ .get();for (MultiGetItemResponse itemResponse : multiGetItemResponses) { ④ GetResponse response = itemResponse.getResponse(); if (response.isExists()) { ⑤ String json = response.getSourceAsString(); ⑥ }}
①更具id
得到单个文档
index
(数据库)和type
(表)中通过一组id
来查询。 ③你也可以查询其他的index
(数据库) ④迭代得带的集合 ⑤你可以查询文档是否存在 ⑥访问字段_source
关于multi
查询操作的更多信息,可以查看REST
文档
Bulk API
Bulk API 允许在单个请求中对多个文档进行索引(插入)和删除。例如:
import static org.elasticsearch.common.xcontent.XContentFactory.*;BulkRequestBuilder bulkRequest = client.prepareBulk();// either use client#prepare, or use Requests# to directly build index/delete requestsbulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) );bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) );BulkResponse bulkResponse = bulkRequest.get();if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item}
Using Bulk Processor
BulkProcessor类提供了一个简单的界面,可以根据请求的数量或大小或一段时间后进行自动刷新批处理操作。
import org.elasticsearch.action.bulk.BackoffPolicy;import org.elasticsearch.action.bulk.BulkProcessor;import org.elasticsearch.common.unit.ByteSizeUnit;import org.elasticsearch.common.unit.ByteSizeValue;import org.elasticsearch.common.unit.TimeValue;BulkProcessor bulkProcessor = BulkProcessor.builder( client, ① new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } ② @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } ③ @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } ④ }) .setBulkActions(10000) ⑤ .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) ⑥ .setFlushInterval(TimeValue.timeValueSeconds(5)) ⑦ .setConcurrentRequests(1) ⑧ .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) ⑨ .build();
①:添加你自己的elasticsearch
client
request.numberOfActions()
来看numberOfActions
。 ③:这个方法在执行bulk
之后调用。你可以通过使用response.hasFailures()
来检查是否有请求失败。 ④:当bulk
失败并且扔出Throwable
时,调用该方法。 ⑤:我们想每1000
个请求,执行bulk
。 ⑥:我们想每5Mb
,flush
一次bulk
。 ⑦:我们想无论多少次请求,每5分钟,flush
一次bulk
。 ⑧:设置并发请求的数量。值为0表示:只允许单个请求被执行。值为1表示:在累计新bulk
请求时,允许最大的并发数为1。 ⑨:自定义设置退避规则,初始值等待100ms,指数增长并且重试3次。当一个或多个bulk
请求失败并使用EsRejectedExecutionException
进行重试,可用于处理请求的计算机资源太少。要关闭该退避(backoff
),通过BackoffPolicy.noBackoff()
方法来关闭。 BulkProcessor的默认值:
1、bulkActions
设置为100 2、bulkSize
设置为5Mb 3、flushInterval
不设置 4、concurrentRequests
设置为1,这意味着,异步执行flush
操作。 5、backoffPolicy
设置指数退避,8次重试和50ms的启动延迟。总等待时间为5.1
秒。 Add requests
然后,你只需要添加你的请求到BulkProcessor
:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
Closing the Bulk Processor
当所有的文档加载到BulkProcessor
,可以使用awaitClose
或者close
方法进行关闭:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或者
bulkProcessor.close();
这两种方法都是刷新剩余的文档,并通过设置flushInterval
来排除所有其他调度的刷新。如果启动了并发请求,awaitClose
方法等待所有的请求,要是所有的请求在指定超时时间内完成,就返回true
,如果所有的bulk
请求完成之前,超过了指定的等待时间,就返回false
。
close
方法不会等待剩余任何bulk
请求,其会立即退出。 Using Bulk Processor in tests
如果你在使用elasticsearch
运行你的测试,并使用BulkProcessor
来填充你的数据集
bulk
的flush操作将会以同步的方式来执行: BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) .setBulkActions(10000) .setConcurrentRequests(0) .build();// Add your requestsbulkProcessor.add(/* Your requests */);// Flush any remaining requestsbulkProcessor.flush();// Or close the bulkProcessor if you don't need it anymorebulkProcessor.close();// Refresh your indicesclient.admin().indices().prepareRefresh().get();// Now you can start searching!client.prepareSearch().get();
参考地址:
发表评论
最新留言
关于作者
