Reputation: 87
I have deployed k8s cluster with 3 nodes, deployed hdfs.I've written a simple pyspark script and want to deploy it on k8s cluster, but dont know how to initialize spark context correctly: what to need to pass as a master to SparkConf().setMaster
??(When i set master as k8s://https://172.20.234.174:6443
i'm getting errors)
The command i'm using to deploy on k8s:
bin/spark-submit \
--name spark_k8s_hello_world_0 \
--master k8s://https://172.20.234.174:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=semenchukou/pyspark-k8s-example:0.1 \
--conf spark.kubernetes.pyspark.pythonVersion=3 \
local:///app/HelloWorldSpark.py
UPD current script:
#!/usr/bin/env python
from pyspark import SparkContext
from pyspark import SparkConf
if __name__ == '__main__':
conf = SparkConf()
sc = SparkContext(conf = conf)
txt = sc.textFile('hdfs://172.20.234.174:1515/testing/testFile.txt')
first = txt.first()
sc.parallelize(first).saveAsTextFile('hdfs://172.20.234.174:9000/testing/result.txt')
I'm running the following command from a master machine in cluster:
bin/spark-submit \
--name spark_k8s_hello_world_0 \
--master k8s://https://172.20.234.174:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=semenchukou/pyspark-k8s-example:conf \
--conf spark.kubernetes.pyspark.pythonVersion=3 \
local:///app/HelloWorldSpark.py
and get following error stacktrace:
File "/app/HelloWorldSpark.py", line 8, in <module>
sc = SparkContext(conf = conf)
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: External scheduler cannot be instantiated
at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2794)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:493)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [sparkk8shelloworld0-1580920409707-driver] in namespace: [default] failed.
at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:229)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:185)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$1.apply(ExecutorPodsAllocator.scala:57)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$1.apply(ExecutorPodsAllocator.scala:55)
at scala.Option.map(Option.scala:146)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.<init>(ExecutorPodsAllocator.scala:55)
at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:89)
at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2788)
... 13 more
Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again
at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
at java.net.InetAddress.getAllByName(InetAddress.java:1193)
at java.net.InetAddress.getAllByName(InetAddress.java:1127)
at okhttp3.Dns$1.lookup(Dns.java:39)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)
at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)
at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:119)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at io.fabric8.kubernetes.client.utils.HttpClientUtils$2.intercept(HttpClientUtils.java:107)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall.execute(RealCall.java:69)
at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:379)
at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:344)
at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:313)
at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:296)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleGet(BaseOperation.java:801)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:218)
... 20 more
20/02/05 16:33:55 INFO ShutdownHookManager: Shutdown hook called
20/02/05 16:33:55 INFO ShutdownHookManager: Deleting directory /tmp/spark-1399e509-6729-436d-9355-eecec6e58113
20/02/05 16:33:55 INFO ShutdownHookManager: Deleting directory /var/data/spark-3355ab9d-38f1-4083-b7af-5fd03dc1ae2f/spark-2a479391-3a9d-4f5b-93a0-707132c802cc
Upvotes: 1
Views: 1888
Reputation: 991
you shouldn't set anything with SparkConf().setMaster()
in the code, master
config value is propagated automatically from spark-submit
.
While executing spark-submit --master ...
you can pass:
kubectl cluster-info
with appended leading k8s://
, eg.: k8s://https://xxx.xxx.xxx.xxx:443
k8s://kubernetes.default.svc.cluster.local:443
if you are running spark-submit
within the Kubernetes cluster networkAlso please refer the official docs and try to run the Spark PI example first.
Hope it helps.
UPDATE 1:
Spark Driver uses another property spark.kubernetes.driver.master
for setting Kubernetes client URL, when running in cluster mode, which defaults to https://kubernetes.default.svc
(which is the default internal Kubernetes API endpoint, Kubernetes cluster by default has Service
named kubernetes
in default
namespace): ref1, ref2.
In your case you may try to set additional --conf spark.kubernetes.driver.master=https://172.20.234.174:6443
.
Also I would advice you to check if the Service
named kubernetes
exists in default
namespace of your cluster, and it should also expose port 443
. If so - there might be problems with DNS resolution in your cluster, which is another topic I guess.
UPDATE 2:
The config option described above is not yet available and introduced in PR [SPARK-30371]. Before that Spark Driver always uses https://kubernetes.default.svc:443
to call Kubernetes APIs. If you cannot resolve this address within the cluster - probably you have some problems with DNS or cluster setup.
Upvotes: 2