ElasticSearch学习笔记(八)Java AP实现增删改查
ElasticSearchJAVA API增删改查
·
ElasticSearch虽然很多时候可以直接通过schema获取数据,但是有的时候也需要自己手写API来实现自定义的功能。本篇主要是elasticsearch提供的API的学习。
这个是官网的教程链接https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-api.html
下面的代码直接拷贝到本地编辑器应该是可以直接运行的。由于代码量较多,而且比较简单,这里不做过多的解释,不理解的地方可以去看看官网的教程,还是很好理解的。
稍稍评价一下,elasticsearch的API基本上把所有的情况都考虑到了,是一个很全的API,但是由于太多了,感觉不是很能研究的过来,就算全部研究了,也不一定能全部记得住,经常用的大概就那么几个API吧。
一、构建项目
新建一个maven java项目。
pom.xml文件中添加下面这些东西
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
然后在项目根目录下添加log4j2.properties文件,复制下面的东西进去,当然也可以使用slf的日志文件,这里就不配置了。
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
rootLogger.level = info
rootLogger.appenderRef.console.ref = console
配置很简单,到这基本上就算是配置完成了。
二、声明client
这个还是很好理解的,感觉看代码应该就能懂,不做过多的介绍。
package test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
@SuppressWarnings("resource")
public class testClient {
public static void main(String[] args) throws Exception {
method1();
method2();
method3();
}
public static void method1() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
}
/**
* 集群名字,默认是elasticsearch,可以在elasticsearch.yml中进行修改。
*/
public static void method2() throws Exception{
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
}
/**
* put("client.transport.sniff", true)这东西在自己的服务器报错,在公司的服务器不会报错。
*/
public static void method3() throws Exception{
Settings settings = Settings.builder().put("client.transport.ignore_cluster_name", "true").put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE).source("movies").execute().actionGet();
client.close();
}
}
三、添加索引信息
package test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
@SuppressWarnings({ "deprecation", "resource" })
public class TestIndex {
public static void main(String[] args) throws IOException {
indexmethod1();
indexmethod2();
indexmethod3();
indexmethod4();
indexmethod5();
}
/**
* 方法一
* @throws IOException
*/
public static void indexmethod1() throws IOException{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
IndexResponse response = client.prepareIndex("movies", "movie", "1")
.setSource(jsonBuilder().startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch").endObject())
.get();
println(response);
}
/**
* 方法二
* @throws IOException
*/
public static void indexmethod2() throws IOException{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2017-07-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse response = client.prepareIndex("movies", "movie","2")
.setSource(json)
.get();
println(response);
}
/**
* 方法三
* @throws IOException
*/
public static void indexmethod3() throws IOException{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
IndexResponse response = client.prepareIndex("movies", "movie","3")
.setSource(json)
.get();
println(response);
}
/**
* 方法四
* @throws IOException
*/
public static void indexmethod4()throws IOException{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
IndexRequest indexRequest = new IndexRequest();
indexRequest.index("movies");
indexRequest.type("movie");
indexRequest.id("10");
indexRequest.source(jsonBuilder().startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch").endObject());
client.index(indexRequest);
}
/**
* 方法五
* @throws IOException
*/
public static void indexmethod5() throws IOException{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
IndexRequest indexRequest = new IndexRequest("movies","movie","20")
.source(jsonBuilder().startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch").endObject());
client.index(indexRequest);
}
/**
* 输出状态
* @param response
*/
public static void println(IndexResponse response){
System.err.println(
"status:"+response.status()+"\n"
+"ID:"+response.getId()+"\n"
+"Index:"+response.getIndex()+"\n"
+"Type:"+response.getType()+"\n"
+"Version:"+response.getVersion()+"\n"
+"Result:"+response.getResult()+"\n"
+"ShardID:"+response.getShardId()+"\n"
+"ShardInfo:"+response.getShardInfo()+"\n"
);
}
}
四、更新索引
package test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.net.InetAddress;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.script.Script;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
@SuppressWarnings({ "resource" })
public class testUpdate {
public static void main(String[] args) throws Exception {
updatemethod5();
}
/**
* 方法一
* @throws Exception
*/
public static void updatemethod1() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("movies");
updateRequest.type("movie");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("user", "1")
.endObject());
client.update(updateRequest).get();
}
/**
* 方法二
* @throws Exception
*/
public static void updatemethod2() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
UpdateResponse response =client.prepareUpdate("movies", "movie","2")
.setDoc(jsonBuilder().startObject()
.field("user", "2")
.endObject())
.get();
println(response);
}
/**
* 方法三
* @throws Exception
*/
public static void updatemethod3() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
UpdateRequest updateRequest = new UpdateRequest("movies", "movie", "3")
.script(new Script("ctx._source.user = \"3\""));
client.update(updateRequest).get();
}
/**
* 方法四
* @throws Exception
*/
public static void updatemethod4() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
client.prepareUpdate("movies", "movie", "10")
.setScript(new Script("ctx._source.user = \"10\""))
.get();
}
/**
* 方法五
* @throws Exception
* 上面的任一方法,只要索引中没有该字段,就自动添加该字段,若有,则修改该字段的值
* 创建索引的办法基本上都可以用来修改索引的信息。
*/
public static void updatemethod5() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
client.prepareUpdate("movies", "movie", "10")
.setScript(new Script("ctx._source.waaaa = \"10\""))
.get();
}
/**
* 输出状态
* @param response
*/
public static void println(UpdateResponse response){
System.err.println(
"status:"+response.status()+"\n"
+"ID:"+response.getId()+"\n"
+"Index:"+response.getIndex()+"\n"
+"Type:"+response.getType()+"\n"
+"Version:"+response.getVersion()+"\n"
+"Result:"+response.getResult()+"\n"
+"ShardID:"+response.getShardId()+"\n"
+"ShardInfo:"+response.getShardInfo()+"\n"
);
}
}
五、获取索引
package test;
import java.net.InetAddress;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
@SuppressWarnings({ "resource" })
public class testGet {
public static void main(String[] args) throws Exception {
getmethod2();
}
/**
* 方法一
* @throws Exception
*/
public static void getmethod1() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
//.get()和.action().actionGet得到的结果是一样的,区别是单线程的还是多线程的
//GetResponse response = client.prepareGet("movies", "movie", "1").get();
GetResponse response = client.prepareGet("movies", "movie", "1").execute().actionGet();
println(response);
}
/**
* 方法二
* @throws Exception
*/
public static void getmethod2() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("movies", "movie", "1")
.add("movies", "movie", "2", "3", "10")
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
GetResponse response = itemResponse.getResponse();
if(response.isExists())
println(response);
}
}
/**
* 输出结果GetResponse
* @param response
*/
public static void println(GetResponse response){
System.err.println("*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-");
System.err.println("SourceAsString : ");
System.err.println(response.getSourceAsString());
System.err.println("Source : ");
System.err.println(response.getSource());
System.err.println("SourceAsMap : ");
System.err.println(response.getSourceAsMap());
System.err.println( "\n" +
"isExists : " + response.isExists() + "\n" +
"type : " + response.getType() + "\n" +
"id : " + response.getId() + "\n" +
"version : " + response.getVersion() + "\n" +
"fields : " + response.getFields() + "\n" +
"remoteAddress : " + response.remoteAddress() + "\n" +
"sourceEmpty : " + response.isSourceEmpty()
);
}
}
六、删除索引
package test;
import java.net.InetAddress;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
@SuppressWarnings("resource")
public class testDelete {
public static void main(String[] args) throws Exception {
deletemethod6();
}
/**
* 方法一
* @throws Exception
*/
public static void deletemethod1()throws Exception {
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
DeleteResponse response = client.prepareDelete("movies", "movie", "1").get();
println(response);
}
/**
* 方法二
* @throws Exception
*/
public static void deletemethod2() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("year", "1962"))
.source("movies")
.get();
println(response);
}
/**
* 方法三
* 这个不能删除,很奇怪,感觉和下面那个方法相同,但是下面那个方法可以删除成功,不知道为什么
* @throws Exception
*/
public static void deletemethod3() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.index("movies");
deleteRequest.type("movie");
deleteRequest.id("10");
client.delete(deleteRequest);
}
/**
* 方法四
* @throws Exception
*/
public static void deletemethod4() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
DeleteRequest deleteRequest = new DeleteRequest("movies","movie","10");
client.delete(deleteRequest);
}
/**
* 方法五
* @throws Exception
* 清楚指定索引
*/
public static void deletemethod5() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
DeleteIndexResponse response = client.admin().indices()
.prepareDelete("movies")
.execute().actionGet();
println(response);
}
/**
* 方法六
* 清空索引
* @throws Exception
*/
public static void deletemethod6() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
ClusterStateResponse response = client.admin().cluster()
.prepareState()
.execute().actionGet();
//获取所有索引
String[] indexs=response.getState().getMetaData().getConcreteAllIndices();
for (String index : indexs) {
//清空所有索引。
DeleteIndexResponse deleteIndexResponse = client.admin().indices()
.prepareDelete(index)
.execute().actionGet();
println(deleteIndexResponse);
}
}
/**
* 输出DeleteResponse
* @param response
*/
public static void println(DeleteResponse response){
System.err.println(
"status:"+response.status()+"\n"
+"ID:"+response.getId()+"\n"
+"Type:"+response.getType()+"\n"
+"Index:"+response.getIndex()+"\n"
+"Result:"+response.getResult()+"\n"
+"Version:"+response.getVersion()+"\n"
+"ShardID:"+response.getShardId()+"\n"
+"ShardInfo:"+response.getShardInfo()+"\n"
);
}
/**
* 输出BulkByScrollResponse
* @param response
*/
public static void println(BulkByScrollResponse response){
System.out.println(
"BulkRetries = " + response.getBulkRetries() + "\n"
+ "SearchRetries = " + response.getSearchRetries() + "\n"
+ "Created = " + response.getCreated() + "\n"
+ "Deleted = " + response.getDeleted() + "\n"
+ "Updated = " + response.getUpdated() + "\n"
+ "ReasonCancelled = " + response.getReasonCancelled() + "\n"
+ "VersionConflicts = " + response.getVersionConflicts() + "\n"
+ "BulkFailures = " + response.getBulkFailures() + "\n"
+ "SearchFailures = " + response.getSearchFailures() + "\n"
+ "Batches = " + response.getBatches() + "\n"
+ "Noops = " + response.getNoops() + "\n"
+ "Status = " + response.getStatus() + "\n"
+ "Took = " + response.getTook() + "\n"
);
}
/**
* 输出BulkByScrollResponse
* @param response
*/
public static void println(DeleteIndexResponse response){
System.out.println(
"isAcknowledged = " + response.isAcknowledged() + "\n" +
"remoteAddress = " + response.remoteAddress() + "\n"
);
}
}
七、批量操作索引
package test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.net.InetAddress;
import java.util.Date;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
@SuppressWarnings("resource")
public class testBulk {
public static void main(String[] args)throws Exception {
bulkmethod2();
}
/**
* 方法一
* @throws Exception
*/
public static void bulkmethod1() throws Exception{
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("movies", "movie", "5")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("movies", "movie", "6")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
System.out.println("处理添加失败的地方");
}
}
/**
* 方法二
* @throws Exception
*/
public static void bulkmethod2() throws Exception {
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("106.14.112.215"), 9300));
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.err.println("批量操作之前");
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.err.println("批量操作之后,且不抛出异常");
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
System.err.println("批量操作之后,但不抛出异常");
}
}).setBulkActions(10000)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
bulkProcessor.add(new IndexRequest("movies", "movie", "4")
.source(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkProcessor.add(new DeleteRequest("movies", "movie", "1"));
bulkProcessor.flush();
bulkProcessor.close();
}
}
源码我放在http://download.csdn.net/detail/q15150676766/9920331这里了,有需要的可以去下载看看
更多推荐
已为社区贡献1条内容
所有评论(0)