截至1.13.1,官方文档所提供的方式已经废弃
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
HttpHost httpHost = new HttpHost(esHost, esPort, esScheme);
List<HttpHost> httpPosts = new ArrayList<>();
httpPosts.add(httpHost);

RestClientFactory restClientFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esUsername, esPassword));
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
};

ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpPosts, new ElasticsearchSinkFunction<String>() {
@Override
public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
requestIndexer.add(createIndexRequest(s));
}

public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
//将需要写入ES的字段依次添加到Map当中
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.source(json);
}
});
esSinkBuilder.setRestClientFactory(restClientFactory);
ElasticsearchSink<Map> sinkFunction = esSinkBuilder.build();