Ramdev Sharma
Ramdev Sharma

Reputation: 1014

How to pull Spark jobs client logs submitted using Apache Livy batches POST method using AirFlow

I am working on submitting Spark job using Apache Livy batches POST method.

This HTTP request is send using AirFlow. After submitting job, I am tracking status using batch Id.

I want to show driver ( client logs) logs on Air Flow logs to avoid going to multiple places AirFLow and Apache Livy/Resource Manager.

Is this possible to do using Apache Livy REST API?

Upvotes: 4

Views: 7391

Answers (2)

Lightning-Analytics
Lightning-Analytics

Reputation: 11

Livy exposes REST API in 2 ways: session and batch. In your case, since we assume you are not using session, you are submitting using batches. You can post your batch using the curl command:

curl http://livy-server-IP:8998/batches

Once you have submitted the job, you would get the batch ID in return. Then you can curl using the command:

curl http://livy-server-IP:8998/batches/{batchId}/log

You can find the documentation at: https://livy.incubator.apache.org/docs/latest/rest-api.html

If you want to avoid the above steps, you can use a ready-made AMI (namely, LightningFLow) from AWS Marketplace which provides Airflow with a custom Livy operator. Livy operator submits and tracks the status of the job every 30 sec (configurable), and it also provides spark logs at the end of the spark job in Airflow UI logs.

Note: LightningFlow comes pre-integrated with all required libraries, Livy, custom operators, and local Spark cluster.

Link for AWS Marketplace: https://aws.amazon.com/marketplace/pp/Lightning-Analytics-Inc-LightningFlow-Integrated-o/B084BSD66V

This will enable you to view consolidated logs at one place, instead of shuffling between Airflow and EMR/Spark logs (Ambari/Resource Manager).

Upvotes: 1

kaxil
kaxil

Reputation: 18904

Livy has an endpoint to get logs /sessions/{sessionId}/log & /batches/{batchId}/log.

Documentation:

You can create python functions like the one shown below to get logs:

http = HttpHook("GET", http_conn_id=http_conn_id)

def _http_rest_call(self, method, endpoint, data=None, headers=None, extra_options=None):
    if not extra_options:
        extra_options = {}

    self.http.method = method
    response = http.run(endpoint, json.dumps(data), headers, extra_options=extra_options)

    return response


def _get_batch_session_logs(self, batch_id):
    method = "GET"
    endpoint = "batches/" + str(batch_id) + "/log"
    response = self._http_rest_call(method=method, endpoint=endpoint)
    # return response.json()
    return response

Upvotes: 10

Related Questions