Reputation: 730
I used Elasticsearch Connector as a Sink to insert data into Elasticsearch (see : https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html).
But, I did not found any connector to get data from Elasticsearch as source.
Is there any connector or example to use Elasticsearch documents as source in a Flink pipline?
Regards,
Ali
Upvotes: 3
Views: 2281
Reputation: 1
Add below dependency
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>7.17.18</version>
</dependency>
Use below code to connect to Elasticsearch
ElasticsearchTransport transport = null;
try {
final CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
HttpHost httpHost = new HttpHost("elasticsearch.com", 443, "https");
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("username", "password")); //add ES - username & pwd
RestClientBuilder restClientBuilder = RestClient.builder(httpHost)
.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.disableAuthCaching();
httpClientBuilder.setDefaultHeaders(List.of(
new BasicHeader(
HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON)));
httpClientBuilder.addInterceptorLast((HttpResponseInterceptor)
(response, context) ->
response.addHeader("X-Elastic-Product", "Elasticsearch"));
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
RestClient restClient = restClientBuilder.build();
transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
}catch (Exception e){
log.error("Unable to connect with Elasticsearch");
}
get/read all data from Elasticsearch index
SearchResponse<ObjectNode> response1 = restClient.search(s -> s
.index("elastic_index"),//add name of ES- index
ObjectNode.class //Add Class eg : Employee.class
);
List<Employee> empList =new ArrayList<>();
DataStream<Employee> empStream =null;
List<Hit<ObjectNode>> hits = response1.hits().hits();
for (Hit<ObjectNode> hit : hits) {
ObjectNode json = hit.source();
log.info("Found employee " + json + ", score " + hit.score());
Employee emp=new Employee();
emp.setName(json.get("name").toString());
empList.add(emp);
}
Finally use list as source
DataStream<Employee> stream=env.fromCollection(empList);
Upvotes: 0
Reputation: 845
Hadoop Compatibility + Elasticsearch Hadoop
https://github.com/cclient/flink-connector-elasticsearch-source
Upvotes: 0
Reputation: 730
I finaly defined a simple read from ElasticSearch function
public static class ElasticsearchFunction
extends ProcessFunction<MetricMeasurement, MetricPrediction> {
public ElasticsearchFunction() throws UnknownHostException {
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("YOUR_IP"), PORT_NUMBER));
}
@Override
public void processElement(MetricMeasurement in, Context context, Collector<MetricPrediction> out) throws Exception {
MetricPrediction metricPrediction = new MetricPrediction();
metricPrediction.setMetricId(in.getMetricId());
metricPrediction.setGroupId(in.getGroupId());
metricPrediction.setBucket(in.getBucket());
// Get the metric measurement from Elasticsearch
SearchResponse response = client.prepareSearch("YOUR_INDEX_NAME")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("YOUR_TERM", in.getMetricId())) // Query
.setPostFilter(QueryBuilders.rangeQuery("value").from(0L).to(50L)) // Filter
.setFrom(0).setSize(1).setExplain(true)
.get();
SearchHit[] results = response.getHits().getHits();
for(SearchHit hit : results){
String sourceAsString = hit.getSourceAsString();
if (sourceAsString != null) {
ObjectMapper mapper = new ObjectMapper();
MetricMeasurement obj = mapper.readValue(sourceAsString, MetricMeasurement.class);
obj.getMetricId();
metricPrediction.setPredictionValue(obj.getValue());
}
}
out.collect(metricPrediction);
}
}
Upvotes: 1
Reputation: 9245
I don't know of an explicit ES source for Flink. I did see one user talking about using elasticsearch-hadoop as a HadoopInputFormat
with Flink, but I don't know if that worked for them (see their code).
Upvotes: 1