Reputation: 389
I have already deployed Spark on Kubernetes, below is the deployment.yaml,
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: pyspark-pi
namespace: default
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "user/pyspark-app:1.0"
imagePullPolicy: Always
mainApplicationFile: local:///app/pyspark-app.py
sparkVersion: "3.1.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
executor:
cores: 1
instances: 2
memory: "512m"
labels:
version: 3.1.1
Below is the service.yaml:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: spark-operator-role
namespace: default
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: edit
subjects:
- kind: ServiceAccount
name: spark
namespace: default
GCP Spark operator is also installed on Kubernetes
Below are the services running:
pyspark-pi-84dad9839f7f5f43-driver-svc ClusterIP None <none> 7078/TCP,7079/TCP,4040/TCP 2d14h
Now I want to run a beam application on this driver. Please find the sample code for beam application below:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=http://127.0.0.1:4040/",
"--environment_type=DOCKER",
"--environment_config=docker.io/apache/beam_python3.7_sdk:2.33.0"
])
# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]
# create a pipeline
pipeline = beam.Pipeline(options=options)
counts = (pipeline | "create" >> beam.Create(data)
| "split" >> beam.ParDo(lambda row: row.split(" "))
| "pair" >> beam.Map(lambda w: (w, 1))
| "group" >> beam.CombinePerKey(sum))
# lets collect our result with a map transformation into output array
output = []
def collect(row):
output.append(row)
return True
counts | "print" >> beam.Map(collect)
# Run the pipeline
result = pipeline.run()
# lets wait until result a available
result.wait_until_finish()
# print the output
print(output)
When I am trying to run the above apache beam application, it is throwing the below error:
$ python beam2.py
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
Traceback (most recent call last):
File "beam2.py", line 31, in <module>
result = pipeline.run()
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\pipeline.py", line 565, in run
return self.runner.run_pipeline(self, self._options)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 438, in run_pipeline
job_service_handle = self.create_job_service(options)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 317, in create_job_service
return self.create_job_service_handle(server.start(), options)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\job_server.py", line 54, in start
grpc.channel_ready_future(channel).result(timeout=self._timeout)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\grpc\_utilities.py", line 139, in result
self._block(timeout)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\grpc\_utilities.py", line 85, in _block
raise grpc.FutureTimeoutError()
grpc.FutureTimeoutError
I think the problem is with pipeline options and mainly with job_endpoint. Without pipeline options the applcication is running fine and giving the output to the console.
Which IP address and host should I provide to the job end point to make it work on spark.
Upvotes: 1
Views: 324
Reputation: 935
There's a couple of requirements to be met to use the PortableRunner
with Spark:
job_endpoint
and artifact_endpoint
have to point to the job-server (using respective ports) and not to Spark itself.environment_type=DOCKER
, you have to make sure Docker is installed on your Spark workers.Unfortunately Beam's documentation isn't great in that area. But I suggest you have a look at https://beam.apache.org/documentation/runners/spark/#running-on-a-pre-deployed-spark-cluster
Upvotes: 1
Reputation: 6582
You have the runner environment that instantiates the Beam
job : Kubernetes
In the execution phase your Beam
job uses the Docker
image : environment_config=docker.io/apache/beam_python3.7_sdk:2.33.0"
To work correctly the runner needs to have the same versions used by the image, in this case :
You need to install Beam Python 2.33.0 package and Python 3.7 on Kubernetes
.
Upvotes: 1