Lavanya
Lavanya

Reputation: 11

Running MPI python script in MPI azure ml pipeline

I'm trying to run distributed python job through azure ML pipelines using MPIStep pipeline class, by referring to the below example link - https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/pipeline-style-transfer/pipeline-style-transfer.ipynb

I tried implemented the same but even I change the node count parameter in MpiStep class, while running the script the it shows size (i.e comm.Get_size()) as 1 always. Can you please help me in what I'm missing here. Is there any specific setup required on the cluster?

Code snippets:

Pipeline code snippet:

model_dir = model_ds.path('./'+saved_model_blob+'/',data_reference_name='saved_model_path').as_mount()
label_dir = model_ds.path('./'+model_label_blob+'/',data_reference_name='model_label_blob').as_mount()

input_images = result_ds.path('./'+score_blob_name+'/',data_reference_name='Input_images').as_mount()

output_container = 'abc'
inti_container = 'xyz'



distributed_batch_score_step = MpiStep(
    name="batch_scoring",
    source_directory=SCRIPT_FOLDER,
    script_name="batch_scoring_script_mpi.py",
    arguments=["--dataset_path", input_images, 
               "--model_name", model_dir,
               "--label_dir", label_dir, 
               "--intermediate_data_container", inti_container, 
               "--output_container", output_container],
    compute_target=gpu_cluster,
    inputs=[input_images, model_dir,label_dir],
    pip_packages=["tensorflow","tensorflow-gpu==1.13.1","pillow","azure-keyvault","azure-storage-blob"],
    conda_packages=["mesa-libgl-cos6-x86_64","mpi4py==3.0.2","opencv=3.4.2","scikit-learn=0.21.2"],                                 
    use_gpu=True,
    allow_reuse = False,
    node_count = nodecount_param,
    process_count_per_node = 1

)

Python Script code snippet:

def run(input_dataset,comm):

rank = comm.Get_rank()
size = comm.Get_size()
print("Rank:" , rank)
print("Size:", size) # shows always 1, even the input node count is >1
print(MPI.Get_processor_name())


file_names = get_file_names(args.dataset_path)
sorted(file_names)


partition_size = len(file_names) // size
print("partition_size-->",partition_size)
partitioned_filenames = file_names[rank * partition_size: (rank + 1) * partition_size]
print("RANK {}  - is processing {} images out of the total {}".format(rank, len(partitioned_filenames),
                                                                     len(file_names)))

# call to Function 01

# call to Function 02

img_names = score_df['image_name'].unique()
output_batch = pd.DataFrame()
for i in img_names:
    # call to Function 3
    output_batch = output_batch.append(pp_output, ignore_index=True)
    output_paths_list = comm.gather(output_batch, root=0)



print("RANK {} - number of pre-aggregated output files {}".format(rank, len(output_batch)))

print("saved in", currentDT + '\\' + 'data.csv')

if rank == 0:
    print("RANK {} - number of aggregated output files {}".format(rank, len(output_paths_list)))
    print("RANK {} - end".format(rank))

if __name__ == "__main__":
    with tf.device('/GPU:0'):
        init()
        comm = MPI.COMM_WORLD
        run(args.dataset_path,comm)

Upvotes: 0

Views: 584

Answers (1)

Lavanya
Lavanya

Reputation: 11

Got to know the issue is due to package version, earlier it is installed via conda with conda_packages=["mpi4py==3.0.2"], it worked after changing the install through pip - pip_packages=["mpi4py"]

Upvotes: 0

Related Questions