【es笔记七】Document APIs(下)
2.2.6 Bulk APIBulk API用于提供批处理的功能。注意:Java High Level REST Client提供了一个Bulk Processor(批处理对象)来帮助执行批处理,简化了Bulk API的操作。下面先来了解一下Bulk API的内容。2.2.6.1 Bulk Request一个BulkRequest对象可以被用来在一次请求中执行批量的索引、更新、删除...
2.2.6 Bulk API
Bulk API
用于提供批处理的功能。
注意:
Java High Level REST Client
提供了一个Bulk Processor
(批处理对象)来帮助执行批处理,简化了Bulk API
的操作。
下面先来了解一下Bulk API
的内容。
2.2.6.1 Bulk Request
一个BulkRequest
对象可以被用来在一次请求中执行批量的索引、更新、删除操作。需要在BulkRequest
中添加至少一个操作:
BulkRequest bulkRequest = new BulkRequest(); //
bulkRequest.add(new IndexRequest("posts", "doc", "1")
.source(XContentType.JSON,"field", "foo"));
bulkRequest.add(new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON,"field", "bar"));
bulkRequest.add(new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON,"field", "baz"));
上边的操作中,在一个BulkRequest
对象中添加了三个IndexRequest
对象。
同一个BulkRequest
对象中添加不同类型的操作对象也是可以的:
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new DeleteRequest("posts", "doc", "3"));
bulkRequest.add(new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON,"other", "test"));
bulkRequest.add(new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON,"field", "baz"));
2.2.6.2 可选参数
下面是可选参数:
bulkRequest.timeout(TimeValue.timeValueMinutes(2)); // 设置等待批量操作完成的超时时间
bulkRequest.timeout("2m"); // 设置等待批量操作完成的超时时间
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 设置刷新策略
bulkRequest.setRefreshPolicy("wait_for"); // 设置刷新策略
bulkRequest.waitForActiveShards(2); // 设置执行操作前必须有多少活动的分片
bulkRequest.waitForActiveShards(ActiveShardCount.ALL); // 设置执行操作前必须有多少活动的分片
2.2.6.3 同步执行
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
2.2.6.4 异步执行
异步执行需要提供监听器:
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
异步方法不会阻塞,会立即返回。一旦返回结果,如果成功则会回调ActionListener
的onResponse
方法,如果调用失败,则会回调onFailure
方法。
典型的listener
如下:
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
// 成功
}
@Override
public void onFailure(Exception e) {
// 失败
}
};
2.2.6.5 Bulk Response
可以通过返回的BulkResponse
对象获取执行的信息:
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
BulkResponse
对象提供了一个方法,可以帮助快速确定是否有操作执行失败:
if (bulkResponse.hasFailures()) {
// 有操作执行失败
}
如果有失败的情况,需要遍历所有的BulkItemResponse
对象,找到失败的那些:
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
}
2.2.6.6 Bulk Processor
BulkProcessor
通过提供一个实用程序类来简化Bulk API
的使用,它允许索引/更新/删除操作。
为了执行请求,BulkProcessor
需要以下组件:
-
RestHighLevelClient
这个客户端是用来执行
BulkRequest
,并获得BulkResponse
的。 -
BulkProcessor.Listener
这个监听器会在
BulkRequest
执行前后或者失败时触发。
BulkProcessor.builder
可以用来创建一个新的BulkProcessor
对象:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { // (1)
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// (2)
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
// (3)
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// (4)
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build(); // (5)
(1)
创建一个BulkProcessor.Listener
对象。
(2)
该方法在每个BulkRequest
对象执行前调用。
(3)
该方法在每个BulkRequest
对象执行后调用。
(4)
该方法在执行BulkRequest
失败后调用。
(5)
通过调用build()
方法创建一个新的BulkProcessor
对象。将在幕后通过RestHighLevelClient.bulkAsync()
执行BulkRequest
。
BulkProcessor.Builder
提供了方法用于配置BulkProcessor
将如何执行请求:
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(500); // (1)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // (2)
builder.setConcurrentRequests(0); // (3)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // (4)
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // (5)
(1)
设置达到多少次请求后触发一次批量请求(默认是1000,可以通过设置为-1来禁用)。
(2)
设置到达多少数据量是批量处理一次(默认是5MB,可以设置为-1来禁用)。
(3)
设置单次批量处理允许的并发请求个数(默认是1,设置为0只允许执行单个请求)。
(4)
设置刷新间隔,如果到达时间点,所有等待的BulkRequest
都会被flush
(默认未开启)。
(5)
设置一个初始化为等待一秒,尝试3次的后退策略。
一旦BulkProcessor
被创建,就可以添加请求:
IndexRequest one = new IndexRequest("posts", "doc", "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
// 添加3个请求
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
请求将会被BulkProcessor
执行,在BulkRequest
被调用时会触发BulkProcessor.Listener
的方法:
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
当所有请求都被添加到BulkProcessor
中之后,我们怎么关闭它呢?
可以通过调用awaitClose()
方法设定等待所有请求处理完毕或者等待指定时间后关闭:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); // (1)
(1)
当所有请求都处理完毕,该方法返回true
,当等待了指定时间而请求还没处理完毕,该方法返回false
。
还可以通过调用close()
方法,立即关闭BulkProcessor
:
bulkProcessor.close();
这两个方法都会在关闭BulkProcessor
之前flush
所有请求,并且禁止添加新的请求。
更多推荐
所有评论(0)