距离上一次更新该文章已经过了 734 天,文章所描述的內容可能已经发生变化,请留意。
截至1.13.1,官方文档所提供的方式已经废弃
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| HttpHost httpHost = new HttpHost(esHost, esPort, esScheme); List<HttpHost> httpPosts = new ArrayList<>(); httpPosts.add(httpHost);
RestClientFactory restClientFactory = new RestClientFactory() { @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esUsername, esPassword)); restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } };
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpPosts, new ElasticsearchSinkFunction<String>() { @Override public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(s)); }
public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element);
return Requests.indexRequest() .index("my-index") .source(json); } }); esSinkBuilder.setRestClientFactory(restClientFactory); ElasticsearchSink<Map> sinkFunction = esSinkBuilder.build();
|