Reputation: 389
I am trying to run a beam application on spark on kubernetes.
beam-deployment.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: spark-beam-jobserver
spec:
serviceName: spark-headless
selector:
matchLabels:
app: spark-beam-jobserver
template:
metadata:
labels:
app: spark-beam-jobserver
app.kubernetes.io/instance: custom_spark
app.kubernetes.io/name: spark
spec:
containers:
- name: spark-beam-jobserver
image: apache/beam_spark_job_server:2.33.0
imagePullPolicy: Always
ports:
- containerPort: 8099
name: jobservice
- containerPort: 8098
name: artifact
- containerPort: 8097
name: expansion
volumeMounts:
- name: beam-artifact-staging
mountPath: "/tmp/beam-artifact-staging"
command: [
"/bin/bash", "-c", "./spark-job-server.sh --job-port=8099 --spark-master-url=spark://spark-primary:7077"
]
volumes:
- name: beam-artifact-staging
persistentVolumeClaim:
claimName: spark-beam-pvc
---
apiVersion: v1
kind: Service
metadata:
name: spark-beam-jobserver
labels:
app: spark-beam-jobserver
spec:
selector:
app: spark-beam-jobserver
type: NodePort
ports:
- port: 8099
nodePort: 32090
name: job-service
- port: 8098
nodePort: 32091
name: artifacts
# type: ClusterIP
# ports:
# - port: 8099
# name: job-service
# - port: 8098
# name: artifacts
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: spark-primary
spec:
serviceName: spark-headless
replicas: 1
selector:
matchLabels:
app: spark
template:
metadata:
labels:
app: spark
component: primary
app.kubernetes.io/instance: custom_spark
app.kubernetes.io/name: spark
spec:
containers:
- name: primary
image: docker.io/secondcomet/spark-custom-2.4.6
env:
- name: SPARK_MODE
value: "master"
- name: SPARK_RPC_AUTHENTICATION_ENABLED
value: "no"
- name: SPARK_RPC_ENCRYPTION_ENABLED
value: "no"
- name: SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED
value: "no"
- name: SPARK_SSL_ENABLED
value: "no"
ports:
- containerPort: 7077
name: masterendpoint
- containerPort: 8080
name: ui
- containerPort: 7078
name: driver-rpc-port
- containerPort: 7079
name: blockmanager
livenessProbe:
httpGet:
path: /
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
resources:
limits:
cpu: 1.0
memory: 1Gi
requests:
cpu: 0.5
memory: 0.5Gi
---
apiVersion: v1
kind: Service
metadata:
name: spark-primary
labels:
app: spark
component: primary
spec:
type: ClusterIP
ports:
- name: masterendpoint
port: 7077
targetPort: 7077
- name: rest
port: 6066
targetPort: 6066
- name: ui
port: 8080
targetPort: 8080
- name: driver-rpc-port
protocol: TCP
port: 7078
targetPort: 7078
- name: blockmanager
protocol: TCP
port: 7079
targetPort: 7079
selector:
app: spark
component: primary
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: spark-children
labels:
app: spark
spec:
serviceName: spark-headless
replicas: 1
selector:
matchLabels:
app: spark
template:
metadata:
labels:
app: spark
component: children
app.kubernetes.io/instance: custom_spark
app.kubernetes.io/name: spark
spec:
containers:
- name: docker
image: docker:19.03.5-dind
securityContext:
privileged: true
volumeMounts:
- name: dind-storage
mountPath: /var/lib/docker
env:
- name: DOCKER_TLS_CERTDIR
value: ""
resources:
limits:
cpu: 1.0
memory: 1Gi
requests:
cpu: 0.5
memory: 100Mi
- name: children
image: docker.io/secondcomet/spark-custom-2.4.6
env:
- name: DOCKER_HOST
value: "tcp://localhost:2375"
- name: SPARK_MODE
value: "worker"
- name: SPARK_MASTER_URL
value: "spark://spark-primary:7077"
- name: SPARK_WORKER_MEMORY
value: "1G"
- name: SPARK_WORKER_CORES
value: "1"
- name: SPARK_RPC_AUTHENTICATION_ENABLED
value: "no"
- name: SPARK_RPC_ENCRYPTION_ENABLED
value: "no"
- name: SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED
value: "no"
- name: SPARK_SSL_ENABLED
value: "no"
ports:
- containerPort: 8081
name: ui
volumeMounts:
- name: beam-artifact-staging
mountPath: "/tmp/beam-artifact-staging"
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 0.5
memory: 1Gi
volumes:
- name: dind-storage
emptyDir:
- name: beam-artifact-staging
persistentVolumeClaim:
claimName: spark-beam-pvc
---
apiVersion: v1
kind: Service
metadata:
name: spark-children
labels:
app: spark
component: children
spec:
type: ClusterIP
ports:
- name: ui
port: 8081
targetPort: 8081
selector:
app: spark
component: children
---
apiVersion: v1
kind: Service
metadata:
name: spark-headless
spec:
clusterIP: None
selector:
app.kubernetes.io/instance: custom_spark
app.kubernetes.io/name: spark
type: ClusterIP
$ kubectl get all --namespace spark-beam
NAME READY STATUS RESTARTS AGE
pod/spark-beam-jobserver-0 1/1 Running 0 58m
pod/spark-children-0 2/2 Running 0 58m
pod/spark-primary-0 1/1 Running 0 58m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
AGE
service/spark-beam-jobserver NodePort 10.97.173.68 <none> 8099:32090/TCP,8098:32091/TCP
58m
service/spark-children ClusterIP 10.105.209.30 <none> 8081/TCP
58m
service/spark-headless ClusterIP None <none> <none>
58m
service/spark-primary ClusterIP 10.109.32.126 <none> 7077/TCP,6066/TCP,8080/TCP,7078/TCP,7079/TCP 58m
NAME READY AGE
statefulset.apps/spark-beam-jobserver 1/1 58m
statefulset.apps/spark-children 1/1 58m
statefulset.apps/spark-primary 1/1 58m
beam-application.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class ConvertToByteArray(beam.DoFn):
def __init__(self):
pass
def setup(self):
pass
def process(self, row):
try:
yield bytearray(row + '\n', 'utf-8')
except Exception as e:
raise e
def run():
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:32090",
"--save_main_session",
"--environment_type=DOCKER",
"--environment_config=docker.io/apache/beam_python3.7_sdk:2.33.0"
])
with beam.Pipeline(options=options) as p:
lines = (p
| 'Create words' >> beam.Create(['this is working'])
| 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
| 'Build byte array' >> beam.ParDo(ConvertToByteArray())
| 'Group' >> beam.GroupBy() # Do future batching here
| 'print output' >> beam.Map(print)
)
if __name__ == "__main__":
run()
When I am trying to run the python application in my conda environment: python beam-application.py
I am getting the below error :
File "beam.py", line 39, in <module>
run()
File "beam.py", line 35, in run
| 'print output' >> beam.Map(print)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\pipeline.py", line 586, in __exit__
self.result = self.run()
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\pipeline.py", line 565, in run
return self.runner.run_pipeline(self, self._options)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 440, in run_pipeline
job_service_handle.submit(proto_pipeline)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 114, in submit
prepare_response.staging_session_token)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 218, in stage
staging_session_token)
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\artifact_service.py", line 237, in offer_artifacts
for request in requests:
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\grpc\_channel.py", line 426, in __next__
return self._next()
File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\grpc\_channel.py", line 826, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNAVAILABLE: WSA Error"
debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2022-10-10T14:38:39.520460502+00:00", children:[UNKNOWN:failed to connect to all addresses; last error: UNAVAILABLE: WSA Error {grpc_status:14, created_time:"2022-10-10T14:38:39.520457024+00:00"}]}"
>
I am not sure where exactly is the problem.
What should I pass in job_endpoint and artifact_endpoint?
I also tried port-forwarding :
kubectl port-forward service/spark-beam-jobserver 32090:8099 --namespace spark-beam
kubectl port-forward service/spark-primary 8080:8080 --namespace spark-beam
kubectl port-forward service/spark-children 8081:8081 --namespace spark-beam
Upvotes: 1
Views: 411
Reputation: 935
I suppose this is based on https://github.com/cometta/python-apache-beam-spark?
spark-beam-jobserver
is using service type NodePort
. So, if running in a local (minikube) cluster, you won't need any port forwarding to reach the job server.
You should be able to submit a Python job from your local shell using the following pipeline options:
--job_endpoint=localhost:32090
--artifact_endpoint=localhost:32091
Note, your python code above misses the artifact_endpoint
. You have to provide both endpoints.
Upvotes: 2