Reputation: 469
I have a csv file that I read it into a 2D list and I want to use scatter method in MPI (mpi4py) to send different chunk of this list into different processing elements to process them as follows:
df = []
with open("data_tiny.csv") as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
df.append(row)
recvbuf = [[""] * (len(df[0])) for _ in range(math.ceil(len(df)//size))]
recvbuf= comm.scatter(df, root=0)
print('Rank: ',rank, ', recvbuf received: ',recvbuf)
for t in recvbuf[:]:
if t[7] != 'o3':
recvbuf.remove(t)
comm.gather(recvbuf, df, root=0)
if rank == 0:
print('Rank: ',rank, ', recvbuf received: ',df)
and I get the following error:
Traceback (most recent call last):
File "MPI_1.py", line 21, in <module>
recvbuf= comm.scatter(df, root=0)
File "mpi4py/MPI/Comm.pyx", line 1267, in mpi4py.MPI.Comm.scatter
File "mpi4py/MPI/msgpickle.pxi", line 730, in mpi4py.MPI.PyMPI_scatter
File "mpi4py/MPI/msgpickle.pxi", line 119, in mpi4py.MPI.Pickle.dumpv
ValueError: expecting 4 items, got 54
the error says scatter expects 4 items, got 54 (the length of df (2D array) is 54 that's why it says the scatter got 54). My question is how can i pass a 2d list to the scatter method (not through using numpy) and resolve the error here.
the input data is a 9 columns and 54 rows of data such as:
a, aa, aaa, aaaa, aaaaa, aaaaaa, ab, abb, abbb
a1, aa1, aaa1, aaaa1, aaaaa1, aaaaaa1, ab1, abb1, abbb1
a2, aa2, aaa2, aaaa2, aaaaa2, aaaaaa2, ab2, abb2, abbb2
a3, aa3, aaa3, aaaa3, aaaaa3, aaaaaa3, ab3, abb3, abbb3
.....
.....
Upvotes: 2
Views: 1759
Reputation: 51393
ValueError: expecting 4 items, got 54
This happens because the scatter routine:
recvbuf= comm.scatter(df, root=0)
expects that df
has the same length as the number of processes running (i.e., comm.size).
Since you are running with 4 processes and df
has 54 elements you get the error.
> ValueError: expecting 4 items, got 54
To solve this you need to pack df
so that it contains as many element as the number of processes, where each element can be an array with the elements to be send to the given process.
For example, let us say that you are running with 4 processes and df=[1,2,3,4,5,6,7,8]
you would need to make df=[[1,2][3,4][5,6][7,8]]
. Where df[0] will go to the process 0, df[1] to process 1 and so on.
An example of a possible solution:
import csv
import math
from mpi4py import MPI
def split(a, n):
k, m = divmod(len(a), n)
return list(a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
df = []
if rank == 0:
with open("data_tiny.csv") as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
df.append(row)
df = split(df, size)
recvbuf = comm.scatter(df, root=0)
print(recvbuf)
Upvotes: 4