Jay Yip
Jay Yip

Reputation: 119

Changing configuration at runtime for PySpark

I was trying to deploy a trained Faiss index to PySpark and do a distributed search. So the whole process includes:

  1. Pre-process
  2. Load Faiss Index(~15G) and do Faiss Search
  3. Post-process and write to HDFS

I set CPUs per task as 10 (spark.task.cpus=10) in order to do multi-thread search. But step 1 and step 3 can only utilize 1 CPU per task. In order to utilize all CPUs I want to set spark.task.cpus=1 before step 1 and 3. I have tried set method of RuntimeConfig but it seems it made my program stuck. Any advice on how to change config at runtime or how to optimize this problem?

Code example:

def load_and_search(x, model_path):
    faiss_idx = faiss.read_index(model_path)
    q_vec = np.concatenate(x)
    _, idx_array = faiss_idx.search(q_vec, k=10)
    return idx_array


data = sc.textFile(input_path)

# preprocess, only used one cpu per task
data = data.map(lambda x: x)

# load faiss index and search, used multiple cpus per task
data = data.mapPartitioins(lambda x: load_and_search(x, model_path))

# postprocess and write, one cpu per task
data = data.map(lambda x: x).saveAsTextFile(result_path)

Upvotes: 4

Views: 1532

Answers (3)

Eric
Eric

Reputation: 31

A bit late. Instead of mapPartition, I used pandas_udf and broadcast-repartition to perform distributed search.

k = 10

# Enable Arrow support.
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")

# Load index and broadcast to available workers
index = faiss.read_index("path_to_index)
bc_index = spark.sparkContext.broadcast(index)

# Define pandas_Udf to perform search
@spark_fn.pandas_udf(
            returnType=ArrayType(FloatType()),
            functionType=spark_fn.PandasUDFType.SCALAR,
)
def load_and_search(query_vectors):
            import pandas as pd
            import numpy as np
            # Cast to numpy
            query_vectors = np.asarray(query_vectors.to_list())
            #query_vectors = np.concatenate(queries)
            _, index_arr = bc_index.value.search(query_vectors, k=k)
            return pd.Series(index_arr.tolist())

# Repartition queries and perform search
queries = queries.coalesce(numPartitions)
queries = queries.withColumn("output", load_and_search(spark_fn.col("vector_col)))

Upvotes: 0

Shubham Jain
Shubham Jain

Reputation: 5526

Well you can change the sparkContext properties in the following ways:

conf = sc._conf.setAll([('spark.task.cpus','1')])
sc._conf.getAll()
data = data.map(lambda x: x)

conf = sc._conf.setAll([('spark.task.cpus','10')])
sc._conf.getAll()
# load faiss index and search, used multiple cpus per task
data = data.mapPartitioins(lambda x: load_and_search(x, model_path))

conf = sc._conf.setAll([('spark.task.cpus','1')])
sc._conf.getAll()
# postprocess and write, one cpu per task
data = data.map(lambda x: x).saveAsTextFile(result_path)

getAll() can be removed, added just to check the current configuration.

Upvotes: 1

BlackBear
BlackBear

Reputation: 22979

Alternative idea: use mapPartitions for steps 1 and 3. Then, use a multiprocessing pool within each worker to map the items in the partition in parallel. This way, you can use all cpus assigned to a worker without changing configuration (which I do not know if it is at all possible).

Pseudocode:

def item_mapper(item):
    return ...

def partition_mapper(partition):
    with mp.Pool(processes=10) as pool:
        yield from pool.imap(item_mapper, partition)

rdd.mapPartitions(partition_mapper)

Upvotes: 1

Related Questions