Pari Margu
Pari Margu

Reputation: 209

Permission denied: user=zeppelin while using %spark.pyspark interpreter in AWS EMR cluster

I have created pyspark structured streaming program and trying to execute in the Zeppelin notebook:

%spark.pyspark 

query_window = windowedCounts \ 
                 .writeStream \ 
                 .outputMode("complete") \ 
                 .format("memory") \ 
                 .queryName("myTable_window") \ 
                 .start() 

I am getting the following error:

Py4JJavaError: An error occurred while calling o191.start.
: org.apache.hadoop.security.AccessControlException: Permission denied: user=zeppelin, access=WRITE, inode="/mnt/tmp/temporary-e0cf0f09-a6f4-44d6-9a72-324660085608/metadata":hdfs:hadoop:drwxr-xr-x

I am using the Zeppelin Notebook Version 0.7.1 in AWS EMR cluster.

Help would be much appreciated.

Full stacktrace:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-8165971491474576109.py", line 349, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-8165971491474576109.py", line 342, in <module>
    exec(code)
  File "<stdin>", line 5, in <module>
  File "/usr/lib/spark/python/pyspark/sql/streaming.py", line 816, in start
    return self._sq(self._jwrite.start())
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o191.start.
: org.apache.hadoop.security.AccessControlException: Permission denied: user=zeppelin, access=WRITE, inode="/mnt/tmp/temporary-e0cf0f09-a6f4-44d6-9a72-324660085608/metadata":hdfs:hadoop:drwxr-xr-x
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:320)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1728)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1712)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1695)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2515)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2450)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2334)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:397)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1653)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
    at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
    at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:782)
    at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:102)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:100)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:227)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=zeppelin, access=WRITE, inode="/mnt/tmp/temporary-e0cf0f09-a6f4-44d6-9a72-324660085608/metadata":hdfs:hadoop:drwxr-xr-x
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:320)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1728)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1712)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1695)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2515)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2450)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2334)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:397)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy12.create(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
    at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy13.create(Unknown Source)
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648)
    ... 30 more

Upvotes: 8

Views: 3026

Answers (2)

Nagesh Singh Chauhan
Nagesh Singh Chauhan

Reputation: 794

I was also facing the issue, your file system does not allowing the writeStream to write the data in the specified file locatioon that is why "Permission denied". Add checkpoint location either HDFS or S3.

query = lines \
.writeStream \
.format("console") \
.option("checkpointLocation","s3://S3_bucket/test") \
.start()

Hope this helps.

Upvotes: 1

ArunK
ArunK

Reputation: 1781

it appears that the writeStream query is attempting the write the checkpointLocation to the cloud HDFS and zeppelin has issues with writing to the cloud HDFS.

by adding the checkpoint location to somewhere on S3 solves the issue.

query_window = windowedCounts \ 
            .writeStream \
            .outputMode("complete") \
            .format("memory") \
            .queryName("myTable_window") \
            .option("checkpointLocation","path_on_s3") \
            .start() 

Upvotes: 5

Related Questions