环境准备(8.x 版本)
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<elastic.version>8.1.0</elastic.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>8.1.0</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elastic.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>现在使用的基于 https 安全的 Elasticsearch 服务,所以首先我们需要将之前的证书进行一个转换
openssl pkcs12 -in elastic-stack-ca.p12 -clcerts -nokeys -out java-ca.crtJava 客户端连接
// 导入的类
import co.elastic.clients.elasticsearch.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport; import org.apache.http.HttpHost;
import org.apache.http.auth.*;
import org.apache.http.client.*;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.*;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.ssl.*;
import org.elasticsearch.client.*;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.*;
import java.security.KeyStore;
import java.security.cert.*;
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "O3x0hfu7i=ZbQvlktCnd"));
Path caCertificatePath = Paths.get("ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)) {
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(
new HttpHost("linux1", 9200, "https"))
.setHttpClientConfigCallback(new
RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);
...
transport.close();操作数据(普通操作)
索引操作
// 创建索引
CreateIndexRequest request = new CreateIndexRequest.Builder().index("myindex").build();
final CreateIndexResponse createIndexResponse = client.indices().create(request);
System.out.println("创建索引成功:" + createIndexResponse.acknowledged());
// 查询索引
GetIndexRequest getIndexRequest = new GetIndexRequest.Builder().index("myindex").build();
final GetIndexResponse getIndexResponse = client.indices().get(getIndexRequest);
System.out.println("索引查询成功:" + getIndexResponse.result());
// 删除索引
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index("myindex").build();
final DeleteIndexResponse delete = client.indices().delete(deleteIndexRequest); final boolean acknowledged = delete.acknowledged(); System.out.println("删除索引成功:" + acknowledged);文档操作
// 创建文档
IndexRequest indexRequest = new IndexRequest.Builder()
.index("myindex")
.id(user.getId().toString())
.document(user)
.build();
final IndexResponse index = client.index(indexRequest);
System.out.println("文档操作结果:" + index.result());
// 批量创建文档
final List<BulkOperation> operations = new ArrayList<BulkOperation>();
for ( int i= 1;i <= 5; i++ ) {
final CreateOperation.Builder builder = new CreateOperation.Builder();
builder.index("myindex");
builder.id("200" + i);
builder.document(new User(2000 + i, 30 + i * 10, "zhangsan" + i, "beijing", 1000 + i*1000));
final CreateOperation<Object> objectCreateOperation = builder.build();
final BulkOperation bulk = new BulkOperation.Builder().create(objectCreateOperation).build();
operations.add(bulk);
}
BulkRequest bulkRequest = new BulkRequest.Builder().operations(operations).build();
final BulkResponse bulkResponse = client.bulk(bulkRequest);
System.out.println("数据操作成功:" + bulkResponse);
// 删除文档
DeleteRequest deleteRequest = new DeleteRequest.Builder().index("myindex").id("1001").build();
client.delete(deleteRequest);文档查询
final SearchRequest.Builder searchRequestBuilder = new SearchRequest.Builder().index("myindex1");
MatchQuery matchQuery = new MatchQuery.Builder().field("city").query(FieldValue.of("beijing")).build();
Query query = new Query.Builder().match(matchQuery).build();
searchRequestBuilder.query(query);
SearchRequest searchRequest = searchRequestBuilder.build();
final SearchResponse<Object> search = client.search(searchRequest, Object.class);
System.out.println(search);操作数据(函数操作)
索引操作
// 创建索引
final Boolean acknowledged =
client.indices().create(p ->p.index("")).acknowledged();
System.out.println("创建索引成功");
// 获取索引
System.out.println(
client.indices().get(req -> req.index("myindex1")
).result());
// 删除索引
client.indices().delete(
reqbuilder -> reqbuilder.index("myindex")
).acknowledged();文档操作
// 创建文档
System.out.println(
client.index(
req ->
req.index("myindex")
.id(user.getId().toString())
.document(user)
).result()
);
// 批量创建文档
client.bulk(
req -> {
users.forEach(
u -> {
req.operations(
b -> {
b.create(
d -> d.id(u.getId().toString()).index("myindex").document(u)
);
return b;
}
);
}
);
return req;
}
);
// 删除文档
client.delete(
req -> req.index("myindex").id("1001")
);文档查询
client.search(
req -> {
req.query(
q ->
q.match(
m -> m.field("city").query("beijing")
)
);
return req;
}
, Object.class
);客户端异步操作
ES Java API 提供了同步和异步的两种客户端处理。之前演示的都是同步处理,异步客户端的处理和同步客户端处理的 API 基本原理相同,不同的是需要异步对返回结果进行相应的处理。
// 创建索引
asyncClient.indices().create(
req -> {
req.index("newindex");
return req;
}
).whenComplete(
(resp, error) -> {
System.out.println("回调函数");
if ( resp != null ) {
System.out.println(resp.acknowledged());
} else {
error.printStackTrace();
}
}
);
System.out.println("主线程操作...");
---
asyncClient.indices().create(
req -> {
req.index("newindex");
return req;
}
).thenApply(
resp -> {
return resp.acknowledged();
}
).whenComplete(
(resp, error) -> {
System.out.println("回调函数");
if ( !resp ) {
System.out.println();
} else {
error.printStackTrace();
}
}
);环境准备(7.x 版本)
Maven 依赖
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch 的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch 依赖 2.x 的 log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
</dependencies>
客户端基础使用
// 创建客户端对象
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http")));
// ...
System.out.println(client);
// 关闭客户端连接
client.close();代码重构 - 文档操作开始
import org.elasticsearch.client.RestHighLevelClient;
public interface ElasticsearchTask {
void doSomething(RestHighLevelClient client) throws Exception;
}
---
public class ConnectElasticsearch{
public static void connect(ElasticsearchTask task){
// 创建客户端对象
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http")));
try {
task.doSomething(client);
// 关闭客户端连接
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
---
public class SomeClass {
public static void main(String[] args) {
ConnectElasticsearch.connect(client -> {
//do something
});
}
}索引 - 增删改查
创建索引:
// 创建客户端对象
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http")));
// 创建索引 - 请求对象
CreateIndexRequest request = new CreateIndexRequest("user2");
// 发送请求,获取响应
CreateIndexResponse response = client.indices().create(request,
RequestOptions.DEFAULT);
// 响应状态
boolean acknowledged = response.isAcknowledged();查询索引
// 查询索引 - 请求对象
GetIndexRequest request = new GetIndexRequest("user2");
// 发送请求,获取响应
GetIndexResponse response = client.indices().get(request,
RequestOptions.DEFAULT);
System.out.println("aliases:"+response.getAliases());
System.out.println("mappings:"+response.getMappings());
System.out.println("settings:"+response.getSettings());aliases:{user2=[]}
mappings:{user2= org.elasticsearch.cluster.metadata.MappingMetadata@ad700514 }
settings:{user2={"index.creation_date":"1617948726976","index.number_of_replicas":"1","index.number_of_shards":"1","index.provided_name":"user2","index.uuid":"UGZ1ntcySnK6hWyP2qoVpQ","index.version.created":"7080099"}}
删除索引
// 删除索引 - 请求对象
DeleteIndexRequest request = new DeleteIndexRequest("user2");
// 发送请求,获取响应
AcknowledgedResponse response = client.indices().delete(request,RequestOptions.DEFAULT);
// 操作结果
System.out.println("操作结果 : " + response.isAcknowledged());文档 - 增删改查
新增文档
ConnectElasticsearch.connect(client -> {
// 新增文档 - 请求对象
IndexRequest request = new IndexRequest();
// 设置索引及唯一性标识
request.index("user").id("1001");
// 创建数据对象
User user = new User();
user.setName("zhangsan");
user.setAge(30);
user.setSex("男");
ObjectMapper objectMapper = new ObjectMapper();
String productJson = objectMapper.writeValueAsString(user);
// 添加文档数据,数据格式为 JSON 格式
request.source(productJson, XContentType.JSON);
// 客户端发送请求,获取响应对象
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
// 打印结果信息
System.out.println("_index:" + response.getIndex());
System.out.println("_id:" + response.getId());
System.out.println("_result:" + response.getResult());
});修改文档
ConnectElasticsearch.connect(client -> {
// 修改文档 - 请求对象
UpdateRequest request = new UpdateRequest();
// 配置修改参数
request.index("user").id("1001");
// 设置请求体,对数据进行修改
request.doc(XContentType.JSON, "sex", "女");
// 客户端发送请求,获取响应对象
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
System.out.println("_index:" + response.getIndex());
System.out.println("_id:" + response.getId());
System.out.println("_result:" + response.getResult());
});基于 ID 查询
ConnectElasticsearch.connect(client -> {
//1.创建请求对象
GetRequest request = new GetRequest().index("user").id("1001");
//2.客户端发送请求,获取响应对象
GetResponse response = client.get(request, RequestOptions.DEFAULT);
3.打印结果信息
System.out.println("_index:" + response.getIndex()); // user
System.out.println("_type:" + response.getType()); // doc
System.out.println("_id:" + response.getId()); // 1001
System.out.println("source:" + response.getSourceAsString()); // {"name":"zhangsan","age":30,"sex":"男"}
});基于 ID 删除
ConnectElasticsearch.connect(client -> {
//创建请求对象
DeleteRequest request = new DeleteRequest().index("user").id("1001");
//客户端发送请求,获取响应对象
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
//打印信息
System.out.println(response.toString());
});
---
DeleteResponse[index=user,type=_doc,id=1001,version=16,result=deleted,shards=ShardInfo{total=2, successful=1, failures=[]}]批量新增文档
ConnectElasticsearch.connect(client -> {
//创建批量新增请求对象
BulkRequest request = new BulkRequest();
request.add(new
IndexRequest().index("user").id("1001").source(XContentType.JSON, "name",
"zhangsan"));
request.add(new
IndexRequest().index("user").id("1002").source(XContentType.JSON, "name",
"lisi"));
request.add(new
IndexRequest().index("user").id("1003").source(XContentType.JSON, "name",
"wangwu"));
//客户端发送请求,获取响应对象
BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
});批量删除文档
ConnectElasticsearch.connect(client -> {
//创建批量删除请求对象
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest().index("user").id("1001"));
request.add(new DeleteRequest().index("user").id("1002"));
request.add(new DeleteRequest().index("user").id("1003"));
//客户端发送请求,获取响应对象
BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
});全量查询
ConnectElasticsearch.connect(client -> {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 查询所有数据
sourceBuilder.query(QueryBuilders.matchAllQuery());
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook()); // 查询耗时
System.out.println("timeout:" + response.isTimedOut()); // 是否超时
System.out.println("total:" + hits.getTotalHits()); // 命中个数
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
});条件查询
client -> {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("age", "30"));
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
};分页查询
client -> {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
// 分页查询
// 当前页其实索引(第一条数据的顺序号), from
sourceBuilder.from(0);
// 每页显示多少条 size
sourceBuilder.size(2);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};查询排序
client -> {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
// 排序
sourceBuilder.sort("age", SortOrder.ASC);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};组合查询
client -> {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 必须包含
boolQueryBuilder.must(QueryBuilders.matchQuery("age", "30"));
// 一定不含
boolQueryBuilder.mustNot(QueryBuilders.matchQuery("name", "zhangsan"));
// 可能包含
boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "男"));
sourceBuilder.query(boolQueryBuilder);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};范围查询
client -> {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
// 大于等于
//rangeQuery.gte("30");
// 小于等于
rangeQuery.lte("40");
sourceBuilder.query(rangeQuery);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};模糊查询
client -> {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.fuzzyQuery("name","wangwu").fuzziness(Fuzziness.ONE));
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};高亮查询
client -> {
// 高亮查询
SearchRequest request = new SearchRequest().indices("user");
//2.创建查询请求体构建器
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//构建查询方式:高亮查询
TermsQueryBuilder termsQueryBuilder =
QueryBuilders.termsQuery("name","zhangsan");
//设置查询方式
sourceBuilder.query(termsQueryBuilder);
//构建高亮字段
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.preTags("<font color='red'>");//设置标签前缀
highlightBuilder.postTags("</font>");//设置标签后缀
highlightBuilder.field("name");//设置高亮字段
//设置高亮构建对象
sourceBuilder.highlighter(highlightBuilder);
//设置请求体
request.source(sourceBuilder);
//3.客户端发送请求,获取响应对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
};最大值查询
SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.max("maxAge").field("age"));
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);分组查询
SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.terms("age_groupby").field("age"));
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);