Reputation: 119
I was trying to deploy a trained Faiss index to PySpark and do a distributed search. So the whole process includes:
I set CPUs per task as 10 (spark.task.cpus=10
) in order to do multi-thread search. But step 1 and step 3 can only utilize 1 CPU per task. In order to utilize all CPUs I want to set spark.task.cpus=1
before step 1 and 3. I have tried set method of RuntimeConfig
but it seems it made my program stuck. Any advice on how to change config at runtime or how to optimize this problem?
Code example:
def load_and_search(x, model_path):
faiss_idx = faiss.read_index(model_path)
q_vec = np.concatenate(x)
_, idx_array = faiss_idx.search(q_vec, k=10)
return idx_array
data = sc.textFile(input_path)
# preprocess, only used one cpu per task
data = data.map(lambda x: x)
# load faiss index and search, used multiple cpus per task
data = data.mapPartitioins(lambda x: load_and_search(x, model_path))
# postprocess and write, one cpu per task
data = data.map(lambda x: x).saveAsTextFile(result_path)
Upvotes: 4
Views: 1532
Reputation: 31
A bit late. Instead of mapPartition, I used pandas_udf and broadcast-repartition to perform distributed search.
k = 10
# Enable Arrow support.
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")
# Load index and broadcast to available workers
index = faiss.read_index("path_to_index)
bc_index = spark.sparkContext.broadcast(index)
# Define pandas_Udf to perform search
@spark_fn.pandas_udf(
returnType=ArrayType(FloatType()),
functionType=spark_fn.PandasUDFType.SCALAR,
)
def load_and_search(query_vectors):
import pandas as pd
import numpy as np
# Cast to numpy
query_vectors = np.asarray(query_vectors.to_list())
#query_vectors = np.concatenate(queries)
_, index_arr = bc_index.value.search(query_vectors, k=k)
return pd.Series(index_arr.tolist())
# Repartition queries and perform search
queries = queries.coalesce(numPartitions)
queries = queries.withColumn("output", load_and_search(spark_fn.col("vector_col)))
Upvotes: 0
Reputation: 5526
Well you can change the sparkContext properties in the following ways:
conf = sc._conf.setAll([('spark.task.cpus','1')])
sc._conf.getAll()
data = data.map(lambda x: x)
conf = sc._conf.setAll([('spark.task.cpus','10')])
sc._conf.getAll()
# load faiss index and search, used multiple cpus per task
data = data.mapPartitioins(lambda x: load_and_search(x, model_path))
conf = sc._conf.setAll([('spark.task.cpus','1')])
sc._conf.getAll()
# postprocess and write, one cpu per task
data = data.map(lambda x: x).saveAsTextFile(result_path)
getAll() can be removed, added just to check the current configuration.
Upvotes: 1
Reputation: 22979
Alternative idea: use mapPartitions
for steps 1 and 3. Then, use a multiprocessing pool within each worker to map the items in the partition in parallel. This way, you can use all cpus assigned to a worker without changing configuration (which I do not know if it is at all possible).
Pseudocode:
def item_mapper(item):
return ...
def partition_mapper(partition):
with mp.Pool(processes=10) as pool:
yield from pool.imap(item_mapper, partition)
rdd.mapPartitions(partition_mapper)
Upvotes: 1