user19930511
user19930511

Reputation: 389

How to deploy a Beam Application on already deployed Spark application on Kubernetes?

I have already deployed Spark on Kubernetes, below is the deployment.yaml,

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: pyspark-pi
  namespace: default
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "user/pyspark-app:1.0"
  imagePullPolicy: Always
  mainApplicationFile: local:///app/pyspark-app.py
  sparkVersion: "3.1.1"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: spark
  executor:
    cores: 1
    instances: 2
    memory: "512m"
    labels:
      version: 3.1.1

Below is the service.yaml:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spark-operator-role
  namespace: default
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: edit
subjects:
  - kind: ServiceAccount
    name: spark
    namespace: default

GCP Spark operator is also installed on Kubernetes

Below are the services running:

pyspark-pi-84dad9839f7f5f43-driver-svc   ClusterIP   None             <none>        7078/TCP,7079/TCP,4040/TCP   2d14h

Now I want to run a beam application on this driver. Please find the sample code for beam application below:

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=http://127.0.0.1:4040/",
    "--environment_type=DOCKER",
    "--environment_config=docker.io/apache/beam_python3.7_sdk:2.33.0"
])

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline(options=options)
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    output.append(row)
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print(output)

When I am trying to run the above apache beam application, it is throwing the below error:

$ python beam2.py
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
Traceback (most recent call last):
  File "beam2.py", line 31, in <module>
    result = pipeline.run()
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\pipeline.py", line 565, in run        
    return self.runner.run_pipeline(self, self._options)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 438, in run_pipeline
    job_service_handle = self.create_job_service(options)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 317, in create_job_service
    return self.create_job_service_handle(server.start(), options)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\job_server.py", line 54, in start
    grpc.channel_ready_future(channel).result(timeout=self._timeout)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\grpc\_utilities.py", line 139, in result
    self._block(timeout)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\grpc\_utilities.py", line 85, in _block
    raise grpc.FutureTimeoutError()
grpc.FutureTimeoutError

I think the problem is with pipeline options and mainly with job_endpoint. Without pipeline options the applcication is running fine and giving the output to the console.

Which IP address and host should I provide to the job end point to make it work on spark.

Upvotes: 1

Views: 324

Answers (2)

Moritz
Moritz

Reputation: 935

There's a couple of requirements to be met to use the PortableRunner with Spark:

  • You have to also run a Spark job-server alongside your cluster. It takes care of submitting the application to spark (and will function as driver). Note, the Spark version of the job-server should closely match the version of your cluster!
  • Both, job_endpoint and artifact_endpoint have to point to the job-server (using respective ports) and not to Spark itself.
  • Finally, to use environment_type=DOCKER, you have to make sure Docker is installed on your Spark workers.

Unfortunately Beam's documentation isn't great in that area. But I suggest you have a look at https://beam.apache.org/documentation/runners/spark/#running-on-a-pre-deployed-spark-cluster

Upvotes: 1

Mazlum Tosun
Mazlum Tosun

Reputation: 6582

  • You have the runner environment that instantiates the Beam job : Kubernetes

  • In the execution phase your Beam job uses the Docker image : environment_config=docker.io/apache/beam_python3.7_sdk:2.33.0"

To work correctly the runner needs to have the same versions used by the image, in this case :

  • Beam Python 2.33.0
  • Python 3.7

You need to install Beam Python 2.33.0 package and Python 3.7 on Kubernetes.

Upvotes: 1

Related Questions