Reputation: 5389
I would like to do some cleanup at the start of my Spark program (Pyspark). For example, I would like to delete data from previous HDFS run. In pig this can be done using commands such as
fs -copyFromLocal ....
rmf /path/to-/hdfs
or locally using sh command.
I was wondering how to do the same with Pyspark.
Upvotes: 13
Views: 41732
Reputation: 656
solution1 -subprocess
def copy_from_local(local_file, hdfs_file, logger):
import subprocess
proc = subprocess.Popen(["hdfs", "dfs", "-copyFromLocal", "-f", local_file, hdfs_file])
proc.communicate()
if proc.returncode != 0:
logger.info("copyFromLocal {} to {} error".format(local_file, hdfs_file))
return False
else:
logger.info("copyFromLocal {} to {} success".format(local_file, hdfs_file))
return True
solution2 -py4j
def copy_from_local_file(sc, logger, local_file, hdfs_file, delSrc=True, overwrite=True):
# copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
Path = sc._jvm.org.apache.hadoop.fs.Path
try:
getFileSystem(sc).copyFromLocalFile(delSrc, overwrite, Path(local_file), Path(hdfs_file))
logger.info("copyFromLocal {} to {} success".format(local_file, hdfs_file))
except Exception as e:
logger.error(e)
logger.info("copyFromLocal {} to {} error".format(local_file, hdfs_file))
def getFileSystem(sc):
# Prepare a FileSystem manager
FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(sc._jsc.hadoopConfiguration())
return fs
and you can get the py4j jvm FileSystem objet and do file operation in above
getFileSystem(sc) = {JavaObject} DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_15601292_17, [email protected] (auth:KERBEROS)]]
access = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e7d0>
addCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432610>
addCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435610>
addDelegationTokens = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44323d0>
allowSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432990>
append = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c90>
areSymlinksEnabled = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cf10>
cancelDeleteOnExit = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432850>
clearStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e410>
close = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb90>
closeAll = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432c10>
closeAllForUGI = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432150>
completeLocalOutput = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435190>
concat = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e450>
copyFromLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e650>
copyToLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e990>
create = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432790>
createEncryptionZone = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432310>
createNewFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44321d0>
createNonRecursive = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432f90>
createSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e290>
createSymlink = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a50>
delete = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435390>
deleteOnExit = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec10>
deleteSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e110>
disallowSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee50>
enableSymlinks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ed90>
equals = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ef90>
exists = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432050>
finalizeUpgrade = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432890>
get = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ea10>
getAclStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a90>
getAllStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e8d0>
getBlockSize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e1d0>
getCanonicalServiceName = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cdd0>
getChildFileSystems = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435410>
getClass = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432290>
getClient = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44358d0>
getConf = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435750>
getContentSummary = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44357d0>
getCorruptBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432550>
getDataNodeStats = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432950>
getDefaultBlockSize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d50>
getDefaultReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e210>
getDefaultUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435490>
getDelegationToken = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c50>
getDiskStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee10>
getEZForPath = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435710>
getFileBlockLocations = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e2d0>
getFileBlockStorageLocations = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e890>
getFileChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d10>
getFileLinkStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435a90>
getFileStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e250>
getFileSystemClass = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e850>
getHomeDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44326d0>
getInotifyEventStream = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432690>
getLength = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b90>
getLinkTarget = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cfd0>
getLocal = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ef10>
getMissingBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435450>
getName = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432350>
getNamed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ea90>
getRawCapacity = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435950>
getRawUsed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e690>
getReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e150>
getScheme = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec90>
getServerDefaults = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432650>
getSnapshotDiffReport = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e0d0>
getSnapshottableDirListing = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435a50>
getStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432490>
getStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cf50>
getStoragePolicies = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432e90>
getUnderReplicatedBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ced0>
getUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44350d0>
getUsed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432f10>
getWorkingDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44329d0>
getXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432410>
getXAttrs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432b10>
globStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435810>
hashCode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435510>
initialize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44320d0>
isDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435150>
isFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ed10>
isFileClosed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435650>
isInSafeMode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e4d0>
listCacheDirectives = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442edd0>
listCachePools = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435210>
listCorruptFileBlocks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432ad0>
listEncryptionZones = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432750>
listFiles = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435690>
listLocatedStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb50>
listStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432590>
listXAttrs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e550>
makeQualified = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b50>
metaSave = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cd50>
mkdir = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e310>
mkdirs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e5d0>
modifyAclEntries = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b10>
modifyCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435590>
modifyCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435350>
moveFromLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432510>
moveToLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e510>
newInstance = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432bd0>
newInstanceLocal = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432710>
notify = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e910>
notifyAll = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435250>
open = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432210>
printStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e190>
recoverLease = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432fd0>
refreshNodes = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44355d0>
removeAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ce50>
removeAclEntries = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e390>
removeCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a10>
removeCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e710>
removeDefaultAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ce10>
removeXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec50>
rename = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee90>
renameSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d90>
resolvePath = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e9d0>
restoreFailedStorage = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432810>
rollEdits = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432dd0>
rollingUpgrade = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44324d0>
saveNamespace = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c10>
setAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44352d0>
setBalancerBandwidth = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e3d0>
setConf = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44359d0>
setDefaultUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432ed0>
setOwner = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e050>
setPermission = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb10>
setQuota = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432c90>
setReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435990>
setSafeMode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435890>
setStoragePolicy = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44354d0>
setTimes = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435050>
setVerifyChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432b90>
setWorkingDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432e50>
setWriteChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44351d0>
setXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44328d0>
startLocalOutput = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432190>
supportsSymlinks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435ad0>
toString = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4dbcbdc10>
wait = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e750>
Upvotes: 1
Reputation: 14689
You can delete an hdfs
path in PySpark
without using third party dependencies as follows:
from pyspark.sql import SparkSession
# example of preparing a spark session
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
# Prepare a FileSystem manager
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
path = "Your/hdfs/path"
# use the FileSystem manager to remove the path
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
To improve one step further, you can wrap the above idea into a helper function that you can re-use across jobs/packages:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
def delete_path(spark, path):
sc = spark.sparkContext
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
delete_path(spark, "Your/hdfs/path")
Upvotes: 29
Reputation: 2110
from https://diogoalexandrefranco.github.io/interacting-with-hdfs-from-pyspark/ using only PySpark
######
# Get fs handler from java gateway
######
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("hdfs://somehost:8020"), sc._jsc.hadoopConfiguration())
# We can now use the Hadoop FileSystem API (https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html)
fs.listStatus(Path('/user/hive/warehouse'))
# or
fs.delete(Path('some_path'))
the other solutions didn't work in my case, but this blog post helped :)
Upvotes: 7
Reputation: 330063
You can execute arbitrary shell command using form example subprocess.call
or sh
library so something like this should work just fine:
import subprocess
some_path = ...
subprocess.call(["hadoop", "fs", "-rm", "-f", some_path])
If you use Python 2.x you can try using spotify/snakebite
:
from snakebite.client import Client
host = ...
port = ...
client = Client(host, port)
client.delete(some_path, recurse=True)
hdfs3
is yet another library which can be used to do the same thing:
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=host, port=port)
HDFileSystem.rm(some_path)
Apache Arrow Python bindings are the latest option (and that often is already available on Spark cluster, as it is required for pandas_udf
):
from pyarrow import hdfs
fs = hdfs.connect(host, port)
fs.delete(some_path, recursive=True)
Upvotes: 18