Reputation: 691
I try to import list of files from HDFS in python.
How to do this from HDFS :
path =r'/my_path'
allFiles = glob.glob(path + "/*.csv")
df_list = []
for file_ in allFiles:
df = pd.read_csv(file_,index_col=None, header=0,sep=';')
df_list.append(df)
I think subprocess.Popen do the trick but how to extract only the filename ?
import subprocess
p = subprocess.Popen("hdfs dfs -ls /my_path/ ",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in p.stdout.readlines():
print(line)
The output is like this :
b'Found 32 items\n'
b'-rw------- 3 user hdfs 42202621 2019-01-21 10:05 /my_path/file1.csv\n'
b'-rw------- 3 user hdfs 99320020 2019-01-21 10:05 /my_path/file2.csv\n'
Upvotes: 2
Views: 2587
Reputation: 43
The accepted answer to this question is the worst possible approach I could imagine and a crime against humanity. You can read and write with pyarrow natively. The syntax is as follows:
from pyarrow import fs
import pyarrow.parquet as pq
# connect to hadoop
hdfs = fs.HadoopFileSystem('hostname', 8020)
# will read single file from hdfs
with hdfs.open_input_file(path) as pqt:
df = pq.read_table(pqt).to_pandas()
# will read directory full of partitioned parquets (ie. from spark)
df = pq.ParquetDataset(path, hdfs).read().to_pandas()
Upvotes: 0
Reputation: 29742
Declaimer: This will be a long and tedious. But given the circumstance, I'll try to make it as general and reproducible as possible.
Given the requirement of no external libraries (except for pandas
?), there isn't must of a choice to take. I suggest utilizing WebHDFS
as much as possible.
AFAIK, installation of HDFS, by default, includes an installation of WebHDFS. Following solution heavily relies on WebHDFS.
To begin with, you must be aware of WebHDFS urls. WebHDFS is installed on HDFS Namenode(s), and default port is 50070.
Therefore, we start with http://[namenode_ip]:50070/webhdfs/v1/
, where /webhdfs/v1
/ is a common url for all.
For the sake of example, let's assume it as http://192.168.10.1:50070/web/hdfs/v1
.
Ordinarily, one can use curl
to list contents of a HDFS directory.
For detailed explanation, refer to WebHDFS REST API: List a Directory
If you were to use curl
, following provides FileStatuses
of all the files inside a given directory.
curl "http://192.168.10.1:50070/webhdfs/v1/<PATH>?op=LISTSTATUS"
^^^^^^^^^^^^ ^^^^^ ^^^^ ^^^^^^^^^^^^^
Namenode IP Port Path Operation
As mentioned, this returns FileStatuses in JSON object:
{
"FileStatuses":
{
"FileStatus":
[
{
"accessTime" : 1320171722771,
"blockSize" : 33554432,
"group" : "supergroup",
"length" : 24930,
"modificationTime": 1320171722771,
"owner" : "webuser",
"pathSuffix" : "a.patch",
"permission" : "644",
"replication" : 1,
"type" : "FILE"
},
{
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0,
"modificationTime": 1320895981256,
"owner" : "szetszwo",
"pathSuffix" : "bar",
"permission" : "711",
"replication" : 0,
"type" : "DIRECTORY"
},
...
]
}
}
Same result can be achieved using python's default libs:
import requests
my_path = '/my_path/'
curl = requests.get('http://192.168.10.1:50070/webhdfs/v1/%s?op=LISTSTATUS' % my_path)
And as shown above, the actual status of each file is two levels below of the result JSON. In other words, to get the FileStatus of each file:
curl.json()['FileStatuses']['FileStatus']
[
{
"accessTime" : 1320171722771,
"blockSize" : 33554432,
"group" : "supergroup",
"length" : 24930,
"modificationTime": 1320171722771,
"owner" : "webuser",
"pathSuffix" : "a.patch",
"permission" : "644",
"replication" : 1,
"type" : "FILE"
},
{
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0,
"modificationTime": 1320895981256,
"owner" : "szetszwo",
"pathSuffix" : "bar",
"permission" : "711",
"replication" : 0,
"type" : "DIRECTORY"
},
...
]
Since you now have all the information you want, all you need to do is parsing.
import os
file_paths = []
for file_status in curl.json()['FileStatuses']['FileStatus']:
file_name = file_status['pathSuffix']
# this is the file name in the queried directory
if file_name.endswith('.csv'):
# if statement is only required if the directory contains unwanted files (i.e. non-csvs).
file_paths.append(os.path.join(path, file_name))
# os.path.join asserts your result consists of absolute path
file_paths
['/my_path/file1.csv',
'/my_path/file2.csv',
...]
Now you know the paths of files and WebHDFS links, pandas.read_csv
can handle rest of the works.
import pandas as pd
dfs = []
web_url = "http://192.168.10.1:50070/webhdfs/v1/%s?op=OPEN"
# ^^^^^^^
# Operation is now OPEN
for file_path in file_paths:
file_url = web_url % file_path
# http://192.168.10.1:50070/webhdfs/v1/my_path/file1.csv?op=OPEN
dfs.append(pd.read_csv(file_url))
And there you go with all the .csv
s imported and assigned to dfs
.
If your HDFS is configured for HA (High Availability), there will be multiple namenodes and thus your namenode_ip
must be set accordingly: It must be the IP of an active node.
Upvotes: 1