环境准备(8.x 版本)

<properties>
	<maven.compiler.source>8</maven.compiler.source>
	<maven.compiler.target>8</maven.compiler.target>
	<elastic.version>8.1.0</elastic.version>
</properties>
<dependencies>
   <dependency>
       <groupId>org.elasticsearch.plugin</groupId>
       <artifactId>x-pack-sql-jdbc</artifactId>
       <version>8.1.0</version>
   </dependency>
   <dependency>
       <groupId>co.elastic.clients</groupId>
       <artifactId>elasticsearch-java</artifactId>
       <version>${elastic.version}</version>
   </dependency>
   <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
       <version>2.12.3</version>
   </dependency>
   <dependency>
       <groupId>jakarta.json</groupId>
       <artifactId>jakarta.json-api</artifactId>
       <version>2.0.1</version>
   </dependency>
</dependencies>

现在使用的基于 https 安全的 Elasticsearch 服务,所以首先我们需要将之前的证书进行一个转换

openssl pkcs12 -in elastic-stack-ca.p12 -clcerts -nokeys -out java-ca.crt

Java 客户端连接

// 导入的类
import co.elastic.clients.elasticsearch.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport; import org.apache.http.HttpHost;
import org.apache.http.auth.*;
import org.apache.http.client.*;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.*;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.ssl.*;
import org.elasticsearch.client.*;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.*;
import java.security.KeyStore;
import java.security.cert.*;
 
 
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
								   new UsernamePasswordCredentials("elastic", "O3x0hfu7i=ZbQvlktCnd"));
 
Path caCertificatePath = Paths.get("ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)) {
   trustedCa = factory.generateCertificate(is);
}
 
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
 
SSLContextBuilder sslContextBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslContextBuilder.build();
 
RestClientBuilder builder = RestClient.builder(
       new HttpHost("linux1", 9200, "https"))
       .setHttpClientConfigCallback(new
       RestClientBuilder.HttpClientConfigCallback() {
			@Override
		    public HttpAsyncClientBuilder customizeHttpClient(
			  	   HttpAsyncClientBuilder httpClientBuilder) {
			 return httpClientBuilder.setSSLContext(sslContext)
					.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
					.setDefaultCredentialsProvider(credentialsProvider);
			 }
	   });
RestClient restClient = builder.build();
 
ElasticsearchTransport transport = new RestClientTransport(
       restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);
...
transport.close();

操作数据(普通操作)

索引操作

// 创建索引
CreateIndexRequest request = new CreateIndexRequest.Builder().index("myindex").build();
final CreateIndexResponse createIndexResponse = client.indices().create(request);
System.out.println("创建索引成功:" + createIndexResponse.acknowledged());
// 查询索引
GetIndexRequest getIndexRequest = new GetIndexRequest.Builder().index("myindex").build();
final GetIndexResponse getIndexResponse = client.indices().get(getIndexRequest);
System.out.println("索引查询成功:" + getIndexResponse.result());
// 删除索引
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index("myindex").build();
final DeleteIndexResponse delete = client.indices().delete(deleteIndexRequest); final boolean acknowledged = delete.acknowledged(); System.out.println("删除索引成功:" + acknowledged);

文档操作

// 创建文档
IndexRequest indexRequest = new IndexRequest.Builder()
       .index("myindex")
       .id(user.getId().toString())
       .document(user)
       .build();
final IndexResponse index = client.index(indexRequest);
System.out.println("文档操作结果:" + index.result());
// 批量创建文档
final List<BulkOperation> operations = new ArrayList<BulkOperation>();
for ( int i= 1;i <= 5; i++ ) {
	final CreateOperation.Builder builder = new CreateOperation.Builder();
	builder.index("myindex");
	builder.id("200" + i);
	builder.document(new User(2000 + i, 30 + i * 10, "zhangsan" + i, "beijing", 1000 + i*1000));
	final CreateOperation<Object> objectCreateOperation = builder.build();
	final BulkOperation bulk = new BulkOperation.Builder().create(objectCreateOperation).build();
    operations.add(bulk);
}
BulkRequest bulkRequest = new BulkRequest.Builder().operations(operations).build();
final BulkResponse bulkResponse = client.bulk(bulkRequest);
System.out.println("数据操作成功:" + bulkResponse);
// 删除文档
DeleteRequest deleteRequest = new DeleteRequest.Builder().index("myindex").id("1001").build();
client.delete(deleteRequest);

文档查询

final SearchRequest.Builder searchRequestBuilder = new SearchRequest.Builder().index("myindex1");
MatchQuery matchQuery = new MatchQuery.Builder().field("city").query(FieldValue.of("beijing")).build();
Query query = new Query.Builder().match(matchQuery).build();
searchRequestBuilder.query(query);
SearchRequest searchRequest = searchRequestBuilder.build();
final SearchResponse<Object> search = client.search(searchRequest, Object.class);
System.out.println(search);

操作数据(函数操作)

索引操作

// 创建索引
final Boolean acknowledged =
	client.indices().create(p ->p.index("")).acknowledged();
System.out.println("创建索引成功");
 
// 获取索引
System.out.println(
   client.indices().get(req -> req.index("myindex1")
).result());
 
// 删除索引
client.indices().delete(
	reqbuilder -> reqbuilder.index("myindex")
).acknowledged();

文档操作

// 创建文档
System.out.println(
   client.index(
       req ->
          req.index("myindex")
             .id(user.getId().toString())
             .document(user)
   ).result()
);
// 批量创建文档
client.bulk(
   req -> {
       users.forEach(
          u -> {
             req.operations(
					b -> {
	                    b.create(
							d -> d.id(u.getId().toString()).index("myindex").document(u)
						);
						return b;
					}
			 );
		   }
		);
		return req;
	}
);
 
// 删除文档
client.delete(
   req -> req.index("myindex").id("1001")
);

文档查询

client.search(
   req -> {
       req.query(
         q ->
            q.match(
                m -> m.field("city").query("beijing")
			)
		);
		return req;
	}
   , Object.class
);

客户端异步操作

ES Java API 提供了同步和异步的两种客户端处理。之前演示的都是同步处理,异步客户端的处理和同步客户端处理的 API 基本原理相同,不同的是需要异步对返回结果进行相应的处理。

// 创建索引
asyncClient.indices().create(
   req -> {
       req.index("newindex");
       return req;
	}
).whenComplete(
	(resp, error) -> {
		System.out.println("回调函数");
		if ( resp != null ) {
			System.out.println(resp.acknowledged());
		} else {
			error.printStackTrace();
		}
	}
);
System.out.println("主线程操作...");
---
asyncClient.indices().create(
	req -> {
		req.index("newindex");
		return req;
	}
).thenApply(
	resp -> {
		return resp.acknowledged();
	}
).whenComplete(
	(resp, error) -> {
		System.out.println("回调函数");
		if ( !resp ) {
			System.out.println();
		} else {
			error.printStackTrace();
		}
	}
);

环境准备(7.x 版本)

Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.8.0</version>
    </dependency>
    <!-- elasticsearch 的客户端 -->
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.8.0</version>
    </dependency>
    <!-- elasticsearch 依赖 2.x 的 log4j -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.9</version>
    </dependency>
</dependencies>
 

客户端基础使用

// 创建客户端对象
RestHighLevelClient client = new RestHighLevelClient(
		RestClient.builder(new HttpHost("localhost", 9200, "http")));
//		...
System.out.println(client);
 
// 关闭客户端连接
client.close();

代码重构 - 文档操作开始

import org.elasticsearch.client.RestHighLevelClient;
public interface ElasticsearchTask {
    void doSomething(RestHighLevelClient client) throws Exception;
}
---
public class ConnectElasticsearch{
    public static void connect(ElasticsearchTask task){
        // 创建客户端对象
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(new HttpHost("localhost", 9200, "http")));
        try {
            task.doSomething(client);
            // 关闭客户端连接
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
---
public class SomeClass {
    public static void main(String[] args) {
        ConnectElasticsearch.connect(client -> {
			//do something
        });
    }
}

索引 - 增删改查

创建索引

// 创建客户端对象
RestHighLevelClient client = new RestHighLevelClient(
		RestClient.builder(new HttpHost("localhost", 9200, "http")));
// 创建索引 - 请求对象
CreateIndexRequest request = new CreateIndexRequest("user2");
// 发送请求,获取响应
CreateIndexResponse response = client.indices().create(request,
		RequestOptions.DEFAULT);
// 响应状态
boolean acknowledged = response.isAcknowledged();

查询索引

// 查询索引 - 请求对象
GetIndexRequest request = new GetIndexRequest("user2");
// 发送请求,获取响应
GetIndexResponse response = client.indices().get(request,
		RequestOptions.DEFAULT);
 
System.out.println("aliases:"+response.getAliases());
System.out.println("mappings:"+response.getMappings());
System.out.println("settings:"+response.getSettings());
aliases:{user2=[]}
mappings:{user2= org.elasticsearch.cluster.metadata.MappingMetadata@ad700514 }
settings:{user2={"index.creation_date":"1617948726976","index.number_of_replicas":"1","index.number_of_shards":"1","index.provided_name":"user2","index.uuid":"UGZ1ntcySnK6hWyP2qoVpQ","index.version.created":"7080099"}}

删除索引

// 删除索引 - 请求对象
DeleteIndexRequest request = new DeleteIndexRequest("user2");
// 发送请求,获取响应
AcknowledgedResponse response = client.indices().delete(request,RequestOptions.DEFAULT);
// 操作结果
System.out.println("操作结果 : " + response.isAcknowledged());

文档 - 增删改查

新增文档

ConnectElasticsearch.connect(client -> {
	// 新增文档 - 请求对象
	IndexRequest request = new IndexRequest();
	// 设置索引及唯一性标识
	request.index("user").id("1001");
 
	// 创建数据对象
	User user = new User();
	user.setName("zhangsan");
	user.setAge(30);
	user.setSex("男");
 
	ObjectMapper objectMapper = new ObjectMapper();
	String productJson = objectMapper.writeValueAsString(user);
	// 添加文档数据,数据格式为 JSON 格式
	request.source(productJson, XContentType.JSON);
	// 客户端发送请求,获取响应对象
	IndexResponse response = client.index(request, RequestOptions.DEFAULT);
	// 打印结果信息
	System.out.println("_index:" + response.getIndex());
	System.out.println("_id:" + response.getId());
	System.out.println("_result:" + response.getResult());
});

修改文档

ConnectElasticsearch.connect(client -> {
	// 修改文档 - 请求对象
	UpdateRequest request = new UpdateRequest();
	// 配置修改参数
	request.index("user").id("1001");
	// 设置请求体,对数据进行修改
	request.doc(XContentType.JSON, "sex", "女");
	// 客户端发送请求,获取响应对象
	UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
	System.out.println("_index:" + response.getIndex());
	System.out.println("_id:" + response.getId());
	System.out.println("_result:" + response.getResult());
});

基于 ID 查询

ConnectElasticsearch.connect(client -> {
	//1.创建请求对象
	GetRequest request = new GetRequest().index("user").id("1001");
	//2.客户端发送请求,获取响应对象
	GetResponse response = client.get(request, RequestOptions.DEFAULT);
	3.打印结果信息
	System.out.println("_index:" + response.getIndex());          // user
	System.out.println("_type:" + response.getType());            // doc
	System.out.println("_id:" + response.getId());                // 1001
	System.out.println("source:" + response.getSourceAsString()); // {"name":"zhangsan","age":30,"sex":"男"}
});

基于 ID 删除

ConnectElasticsearch.connect(client -> {
	//创建请求对象
	DeleteRequest request = new DeleteRequest().index("user").id("1001");
	//客户端发送请求,获取响应对象
	DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
	//打印信息
	System.out.println(response.toString());
});
---
DeleteResponse[index=user,type=_doc,id=1001,version=16,result=deleted,shards=ShardInfo{total=2, successful=1, failures=[]}]

批量新增文档

ConnectElasticsearch.connect(client -> {
	//创建批量新增请求对象
	BulkRequest request = new BulkRequest();
	request.add(new
			IndexRequest().index("user").id("1001").source(XContentType.JSON, "name",
			"zhangsan"));
	request.add(new
			IndexRequest().index("user").id("1002").source(XContentType.JSON, "name",
					"lisi"));
	request.add(new
			IndexRequest().index("user").id("1003").source(XContentType.JSON, "name",
			"wangwu"));
	//客户端发送请求,获取响应对象
	BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
});

批量删除文档

ConnectElasticsearch.connect(client -> {
	//创建批量删除请求对象
	BulkRequest request = new BulkRequest();
	request.add(new DeleteRequest().index("user").id("1001"));
	request.add(new DeleteRequest().index("user").id("1002"));
	request.add(new DeleteRequest().index("user").id("1003"));
	//客户端发送请求,获取响应对象
	BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
});

全量查询

ConnectElasticsearch.connect(client -> {
	// 创建搜索请求对象
	SearchRequest request = new SearchRequest();
	request.indices("user");
	// 构建查询的请求体
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	// 查询所有数据
	sourceBuilder.query(QueryBuilders.matchAllQuery());
	request.source(sourceBuilder);
	SearchResponse response = client.search(request, RequestOptions.DEFAULT);
	// 查询匹配
	SearchHits hits = response.getHits();
	System.out.println("took:" + response.getTook());  // 查询耗时
	System.out.println("timeout:" + response.isTimedOut());  // 是否超时
	System.out.println("total:" + hits.getTotalHits());  // 命中个数
	System.out.println("MaxScore:" + hits.getMaxScore());
	System.out.println("hits========>>");
	for (SearchHit hit : hits) {
		System.out.println(hit.getSourceAsString());
	}
	System.out.println("<<========");
});

条件查询

client -> {
	// 创建搜索请求对象
	SearchRequest request = new SearchRequest();
	request.indices("user");
	// 构建查询的请求体
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	sourceBuilder.query(QueryBuilders.termQuery("age", "30"));
	request.source(sourceBuilder);
	SearchResponse response = client.search(request, RequestOptions.DEFAULT);
	// 查询匹配
	SearchHits hits = response.getHits();
	System.out.println("took:" + response.getTook());
	System.out.println("timeout:" + response.isTimedOut());
	System.out.println("total:" + hits.getTotalHits());
	System.out.println("MaxScore:" + hits.getMaxScore());
	System.out.println("hits========>>");
	for (SearchHit hit : hits) {
		//输出每条查询的结果信息
		System.out.println(hit.getSourceAsString());
	}
	System.out.println("<<========");
};

分页查询

client -> {
	// 创建搜索请求对象
	SearchRequest request = new SearchRequest();
	request.indices("user");
	// 构建查询的请求体
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	sourceBuilder.query(QueryBuilders.matchAllQuery());
	// 分页查询
	// 当前页其实索引(第一条数据的顺序号), from
	sourceBuilder.from(0);
	// 每页显示多少条 size
	sourceBuilder.size(2);
	request.source(sourceBuilder);
	SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};

查询排序

client -> {
	// 创建搜索请求对象
	SearchRequest request = new SearchRequest();
	request.indices("user");
	// 构建查询的请求体
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	sourceBuilder.query(QueryBuilders.matchAllQuery());
	// 排序
	sourceBuilder.sort("age", SortOrder.ASC);
	request.source(sourceBuilder);
	SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};

组合查询

client -> {
	// 创建搜索请求对象
	SearchRequest request = new SearchRequest();
	request.indices("user");
	// 构建查询的请求体
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
	// 必须包含
	boolQueryBuilder.must(QueryBuilders.matchQuery("age", "30"));
	// 一定不含
	boolQueryBuilder.mustNot(QueryBuilders.matchQuery("name", "zhangsan"));
	// 可能包含
	boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "男"));
	sourceBuilder.query(boolQueryBuilder);
	request.source(sourceBuilder);
	SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};

范围查询

client -> {
	// 创建搜索请求对象
	SearchRequest request = new SearchRequest();
	request.indices("user");
	// 构建查询的请求体
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
	// 大于等于
	//rangeQuery.gte("30");
	// 小于等于
	rangeQuery.lte("40");
	sourceBuilder.query(rangeQuery);
	request.source(sourceBuilder);
	SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};

模糊查询

client -> {
	// 创建搜索请求对象
	SearchRequest request = new SearchRequest();
	request.indices("user");
	// 构建查询的请求体
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	sourceBuilder.query(QueryBuilders.fuzzyQuery("name","wangwu").fuzziness(Fuzziness.ONE));
	request.source(sourceBuilder);
	SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};

高亮查询

client -> {
	// 高亮查询
	SearchRequest request = new SearchRequest().indices("user");
	//2.创建查询请求体构建器
	SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
	//构建查询方式:高亮查询
	TermsQueryBuilder termsQueryBuilder =
			QueryBuilders.termsQuery("name","zhangsan");
	//设置查询方式
	sourceBuilder.query(termsQueryBuilder);
	//构建高亮字段
	HighlightBuilder highlightBuilder = new HighlightBuilder();
	highlightBuilder.preTags("<font color='red'>");//设置标签前缀
	highlightBuilder.postTags("</font>");//设置标签后缀
	highlightBuilder.field("name");//设置高亮字段
	//设置高亮构建对象
	sourceBuilder.highlighter(highlightBuilder);
	//设置请求体
	request.source(sourceBuilder);
	//3.客户端发送请求,获取响应对象
	SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};

最大值查询

SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.max("maxAge").field("age"));
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);

分组查询

SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.terms("age_groupby").field("age"));
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);