user19930511
user19930511

Reputation: 389

Failed to connect to all addresses - Spark Beam on Kubernetes

I am trying to run a beam application on spark on kubernetes.

beam-deployment.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
 name: spark-beam-jobserver
spec:
 serviceName: spark-headless
 selector:
   matchLabels:
     app: spark-beam-jobserver
 template:
   metadata:
     labels:
       app: spark-beam-jobserver
       app.kubernetes.io/instance: custom_spark
       app.kubernetes.io/name: spark
   spec:
     containers:
     - name: spark-beam-jobserver
       image: apache/beam_spark_job_server:2.33.0
       imagePullPolicy: Always
       ports:
       - containerPort: 8099
         name: jobservice
       - containerPort: 8098
         name: artifact
       - containerPort: 8097
         name: expansion
       volumeMounts:
         - name: beam-artifact-staging
           mountPath: "/tmp/beam-artifact-staging" 
          
       command: [
           "/bin/bash", "-c", "./spark-job-server.sh --job-port=8099 --spark-master-url=spark://spark-primary:7077"
       ]
     volumes:
     - name: beam-artifact-staging
       persistentVolumeClaim:
         claimName: spark-beam-pvc
---
apiVersion: v1
kind: Service
metadata:
 name: spark-beam-jobserver
 labels:
   app: spark-beam-jobserver
spec:
 selector:
   app: spark-beam-jobserver
 type: NodePort
 ports:
 - port: 8099
   nodePort: 32090
   name: job-service
 - port: 8098
   nodePort: 32091
   name: artifacts
#  type: ClusterIP
#  ports:
#  - port: 8099
#    name: job-service
#  - port: 8098
#    name: artifacts
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: spark-primary
spec:
  serviceName: spark-headless
  replicas: 1
  selector:
    matchLabels:
      app: spark
  template:
    metadata:
      labels:
        app: spark
        component: primary
        app.kubernetes.io/instance: custom_spark
        app.kubernetes.io/name: spark
    spec:
      containers:
      - name: primary
        image: docker.io/secondcomet/spark-custom-2.4.6
        env:
        - name: SPARK_MODE
          value: "master"
        - name: SPARK_RPC_AUTHENTICATION_ENABLED
          value: "no"
        - name: SPARK_RPC_ENCRYPTION_ENABLED
          value: "no"
        - name: SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED
          value: "no"
        - name: SPARK_SSL_ENABLED
          value: "no"
        ports:
        - containerPort: 7077
          name: masterendpoint
        - containerPort: 8080
          name: ui
        - containerPort: 7078
          name: driver-rpc-port
        - containerPort: 7079
          name: blockmanager
        livenessProbe:
          httpGet:
            path: /
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10                  
        resources:
          limits:
            cpu: 1.0
            memory: 1Gi
          requests:
            cpu: 0.5
            memory: 0.5Gi
---
apiVersion: v1
kind: Service
metadata:
  name: spark-primary
  labels:
    app: spark
    component: primary
spec:
  type: ClusterIP
  ports:
  - name: masterendpoint
    port: 7077
    targetPort: 7077
  - name: rest
    port: 6066
    targetPort: 6066
  - name: ui
    port: 8080
    targetPort: 8080

  - name: driver-rpc-port
    protocol: TCP 
    port: 7078
    targetPort: 7078
  - name: blockmanager
    protocol: TCP 
    port: 7079
    targetPort: 7079

  selector:
    app: spark
    component: primary
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: spark-children
  labels:
    app: spark
spec:
  serviceName: spark-headless
  replicas: 1
  selector:
    matchLabels:
      app: spark
  template:
    metadata:
      labels:
        app: spark
        component: children
        app.kubernetes.io/instance: custom_spark
        app.kubernetes.io/name: spark
    spec:
      containers:
      - name: docker
        image: docker:19.03.5-dind
        securityContext:
          privileged: true
        volumeMounts:
          - name: dind-storage
            mountPath: /var/lib/docker
        env:
          - name: DOCKER_TLS_CERTDIR
            value: ""
        resources:
          limits:
            cpu: 1.0
            memory: 1Gi
          requests:
            cpu: 0.5
            memory: 100Mi
      - name: children
        image: docker.io/secondcomet/spark-custom-2.4.6
        env:
        - name: DOCKER_HOST
          value: "tcp://localhost:2375"
        - name: SPARK_MODE
          value: "worker"
        - name: SPARK_MASTER_URL
          value: "spark://spark-primary:7077"
        - name: SPARK_WORKER_MEMORY
          value: "1G"
        - name: SPARK_WORKER_CORES
          value: "1"
        - name: SPARK_RPC_AUTHENTICATION_ENABLED
          value: "no"
        - name: SPARK_RPC_ENCRYPTION_ENABLED
          value: "no"
        - name: SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED
          value: "no"
        - name: SPARK_SSL_ENABLED
          value: "no"
        ports:
          - containerPort: 8081
            name: ui
        volumeMounts:
          - name: beam-artifact-staging
            mountPath: "/tmp/beam-artifact-staging"
        resources:
          limits:
            cpu: 1
            memory: 2Gi
          requests:
            cpu: 0.5
            memory: 1Gi
      volumes:
      - name: dind-storage
        emptyDir:
      - name: beam-artifact-staging
        persistentVolumeClaim:
          claimName: spark-beam-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: spark-children
  labels:
    app: spark
    component: children
spec:
  type: ClusterIP
  ports:
  - name: ui
    port: 8081
    targetPort: 8081
  selector:
    app: spark
    component: children
---
apiVersion: v1
kind: Service
metadata:
  name: spark-headless
spec:
  clusterIP: None
  selector:
    app.kubernetes.io/instance: custom_spark
    app.kubernetes.io/name: spark
  type: ClusterIP
$ kubectl get all --namespace spark-beam
NAME                         READY   STATUS    RESTARTS   AGE
pod/spark-beam-jobserver-0   1/1     Running   0          58m
pod/spark-children-0         2/2     Running   0          58m
pod/spark-primary-0          1/1     Running   0          58m

NAME                           TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)
      AGE
service/spark-beam-jobserver   NodePort    10.97.173.68    <none>        8099:32090/TCP,8098:32091/TCP
      58m
service/spark-children         ClusterIP   10.105.209.30   <none>        8081/TCP
      58m
service/spark-headless         ClusterIP   None            <none>        <none>
      58m
service/spark-primary          ClusterIP   10.109.32.126   <none>        7077/TCP,6066/TCP,8080/TCP,7078/TCP,7079/TCP   58m

NAME                                    READY   AGE
statefulset.apps/spark-beam-jobserver   1/1     58m
statefulset.apps/spark-children         1/1     58m
statefulset.apps/spark-primary          1/1     58m

beam-application.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class ConvertToByteArray(beam.DoFn):
    def __init__(self):
        pass

    def setup(self):
        pass

    def process(self, row):
        try:
            yield bytearray(row + '\n', 'utf-8')

        except Exception as e:
            raise e

def run():
    options = PipelineOptions([
  
        "--runner=PortableRunner",
        "--job_endpoint=localhost:32090",
        "--save_main_session",
        "--environment_type=DOCKER",
        "--environment_config=docker.io/apache/beam_python3.7_sdk:2.33.0"

    ])

    with beam.Pipeline(options=options) as p:
        lines = (p
        | 'Create words' >> beam.Create(['this is working'])
        | 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
        | 'Build byte array' >> beam.ParDo(ConvertToByteArray())
        | 'Group' >> beam.GroupBy() # Do future batching here
        | 'print output' >> beam.Map(print)
        )

if __name__ == "__main__":
    run()

When I am trying to run the python application in my conda environment: python beam-application.py

I am getting the below error :

  File "beam.py", line 39, in <module>
    run()
  File "beam.py", line 35, in run
    | 'print output' >> beam.Map(print)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\pipeline.py", line 586, in __exit__   
    self.result = self.run()
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\pipeline.py", line 565, in run        
    return self.runner.run_pipeline(self, self._options)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 440, in run_pipeline
    job_service_handle.submit(proto_pipeline)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 114, in submit
    prepare_response.staging_session_token)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\portable_runner.py", line 218, in stage
    staging_session_token)
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\apache_beam\runners\portability\artifact_service.py", line 237, in offer_artifacts
    for request in requests:
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\grpc\_channel.py", line 426, in __next__
    return self._next()
  File "C:\Users\eapasnr\Anaconda3\envs\oden2\lib\site-packages\grpc\_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses; last error: UNAVAILABLE: WSA Error"
        debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2022-10-10T14:38:39.520460502+00:00", children:[UNKNOWN:failed to connect to all addresses; last error: UNAVAILABLE: WSA Error {grpc_status:14, created_time:"2022-10-10T14:38:39.520457024+00:00"}]}"
>

I am not sure where exactly is the problem.

What should I pass in job_endpoint and artifact_endpoint?

I also tried port-forwarding :

kubectl port-forward service/spark-beam-jobserver 32090:8099 --namespace spark-beam
kubectl port-forward service/spark-primary 8080:8080 --namespace spark-beam
kubectl port-forward service/spark-children 8081:8081 --namespace spark-beam

Upvotes: 1

Views: 411

Answers (1)

Moritz
Moritz

Reputation: 935

I suppose this is based on https://github.com/cometta/python-apache-beam-spark?

spark-beam-jobserver is using service type NodePort. So, if running in a local (minikube) cluster, you won't need any port forwarding to reach the job server.

You should be able to submit a Python job from your local shell using the following pipeline options:

  --job_endpoint=localhost:32090
  --artifact_endpoint=localhost:32091

Note, your python code above misses the artifact_endpoint. You have to provide both endpoints.

Upvotes: 2

Related Questions