Ali Ait-Bachir
Ali Ait-Bachir

Reputation: 730

Elasticsearch Connector as Source in Flink

I used Elasticsearch Connector as a Sink to insert data into Elasticsearch (see : https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html).

But, I did not found any connector to get data from Elasticsearch as source.

Is there any connector or example to use Elasticsearch documents as source in a Flink pipline?

Regards,

Ali

Upvotes: 3

Views: 2281

Answers (4)

RKNida
RKNida

Reputation: 1

Add below dependency

<dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>7.17.18</version>
</dependency>

Use below code to connect to Elasticsearch

ElasticsearchTransport transport = null;
        try {
            final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
            HttpHost httpHost = new HttpHost("elasticsearch.com", 443, "https");

        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials("username", "password")); //add ES - username & pwd
        RestClientBuilder restClientBuilder = RestClient.builder(httpHost)
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    httpClientBuilder.disableAuthCaching();
                    httpClientBuilder.setDefaultHeaders(List.of(
                            new BasicHeader(
                                    HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON)));
                    httpClientBuilder.addInterceptorLast((HttpResponseInterceptor)
                            (response, context) ->
                                    response.addHeader("X-Elastic-Product", "Elasticsearch"));


                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

                });
        RestClient restClient = restClientBuilder.build();
        transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

    }catch (Exception e){
        log.error("Unable to connect with Elasticsearch");
    }

get/read all data from Elasticsearch index

SearchResponse<ObjectNode> response1 = restClient.search(s -> s
                                .index("elastic_index"),//add name of ES- index
                        ObjectNode.class //Add Class eg : Employee.class 
                );
                List<Employee> empList =new ArrayList<>();
    
                DataStream<Employee> empStream =null;
                List<Hit<ObjectNode>> hits = response1.hits().hits();
                for (Hit<ObjectNode> hit : hits) {
                    ObjectNode json = hit.source();
                    log.info("Found employee " + json + ", score " + hit.score());
        Employee emp=new Employee();
        emp.setName(json.get("name").toString());
    
       empList.add(emp);
       
        }

Finally use list as source

DataStream<Employee> stream=env.fromCollection(empList);

Upvotes: 0

cclient
cclient

Reputation: 845

Hadoop Compatibility + Elasticsearch Hadoop

https://github.com/cclient/flink-connector-elasticsearch-source

Upvotes: 0

Ali Ait-Bachir
Ali Ait-Bachir

Reputation: 730

I finaly defined a simple read from ElasticSearch function

    public static class ElasticsearchFunction
        extends ProcessFunction<MetricMeasurement, MetricPrediction> {

    public ElasticsearchFunction() throws UnknownHostException {
        client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("YOUR_IP"), PORT_NUMBER));
    }

    @Override
    public void processElement(MetricMeasurement in, Context context, Collector<MetricPrediction> out) throws Exception {
        MetricPrediction metricPrediction = new MetricPrediction();

        metricPrediction.setMetricId(in.getMetricId());
        metricPrediction.setGroupId(in.getGroupId());
        metricPrediction.setBucket(in.getBucket());

        // Get the metric measurement from Elasticsearch
        SearchResponse response = client.prepareSearch("YOUR_INDEX_NAME")
                .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.termQuery("YOUR_TERM", in.getMetricId()))   // Query
                .setPostFilter(QueryBuilders.rangeQuery("value").from(0L).to(50L))     // Filter
                .setFrom(0).setSize(1).setExplain(true)
                .get();

        SearchHit[] results = response.getHits().getHits();
        for(SearchHit hit : results){
            String sourceAsString = hit.getSourceAsString();
            if (sourceAsString != null) {
                ObjectMapper mapper = new ObjectMapper();
                MetricMeasurement obj = mapper.readValue(sourceAsString, MetricMeasurement.class);
                obj.getMetricId();
                metricPrediction.setPredictionValue(obj.getValue());
            }
        }
        out.collect(metricPrediction);
    }
}

Upvotes: 1

kkrugler
kkrugler

Reputation: 9245

I don't know of an explicit ES source for Flink. I did see one user talking about using elasticsearch-hadoop as a HadoopInputFormat with Flink, but I don't know if that worked for them (see their code).

Upvotes: 1

Related Questions