Reputation: 717
I use a SentenceTransformer model to embed ~3 million text documents and write to opensearch. I'm using PySpark's predict_batch_udf, and running with Kubeflow Pipeline. pytorch_model.bin is ~500MB, so probably <1G mem needed to load.
Basic code structure:
def main():
spark = pyspark.sql.SparkSession.builder.appName('xxx')\
.config("spark.hadoop.fs.s3a.endpoint", "http://xx")\
.config("spark.executor.instances", os.environ['exec_inst'])\
.config("spark.executor.cores", os.environ['exec_core'])\
.......
.getOrCreate() # actually written in another SparkUtil object, but same content
df = spark.read.format("delta").load(db_name)
model = SentenceTransformer(os.environ['ranking_model_dir']).eval()
model_bc = spark.sparkContext.broadcast(ranking_model)
def make_predict_fn():
def predict(inputs: str):
with torch.no_grad():
with torch.cuda.amp.autocast():
soln_embed = model_bc.value.encode(inputs, normalize_embeddings=True)
torch.cuda.empty_cache()
return soln_embed
return predict
# create standard PandasUDF from predict function
embed = predict_batch_udf(make_predict_fn,
return_type=ArrayType(FloatType()),
batch_size=8) # smallest bs to use tensor core 4*32=128
df = soln_df.withColumn("embeddings", embed('text_to_embed')).drop('text_to_embed')
spark_util.write_to_opensearch(df, os_index, 'my_id') # using the same SparkUtil obj
Now, I've practically played with all layouts of this code. I've moved the function definition/model loading/broadcast vs no broadcast in and out the main function.
The weird part is: the exact same code, when I ran it on kubeflow jupyter notebook, it just works. The cell runs for an hour or two then finishes, Cuda memory goes up to ~15G, my GPU has 16G VRAM. But when I put it in python script and run in a kubeflow job pod, it would barely last 2 minutes before crashing. I even tried making the main() async and run in eventloop to simulate jupyter notebook, but no help.
From what I saw, it's just two distinct types of error. With broadcast:
GPU info, spark info run config stuff
/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:224: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead
24/06/29 06:56:27 ERROR Executor: Exception in task 17.0 in stage 5.0 (TID 80)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/pyspark/ml/functions.py", line 806, in predict
preds = predict_fn(single_input)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/app/process_rerank_embd_gpu.py", line 46, in predict
soln_embed = model_bc.value.encode(inputs, normalize_embeddings=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
......
[bunch of propagation of errors, pytorch this calls that stuff]
......
File "/usr/local/lib/python3.11/site-packages/torch/nn/functional.py", line 2237, in embedding
return torch.embedding(weight, input, padding_idx, scale_grad_by_freq, sparse)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
Without broadcast:
same stuff as above
/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:224: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead
24/06/29 05:16:23 ERROR Executor: Exception in task 38.0 in stage 5.0 (TID 101)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/pyspark/ml/functions.py", line 806, in predict
preds = predict_fn(single_input)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/app/process_rerank_embd_gpu.py", line 45, in predict
soln_embed = ranking_model.encode(inputs, normalize_embeddings=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
......
[bunch of propagation of errors, pytorch this calls that stuff]
......
File "/usr/local/lib/python3.11/site-packages/transformers/models/distilbert/modeling_distilbert.py", line 241, in forward
context = torch.matmul(weights, v) # (bs, n_heads, q_length, dim_per_head)
^^^^^^^^^^^^^^^^^^^^^^^^
torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 0 has a total capacity of 15.77 GiB of which 19.12 MiB is free. Process 41502 has 1.07 GiB memory in use. Process 2489411 has 614.00 MiB memory in use. Process 2489412 has 614.00 MiB memory in use. Process 2489410 has 784.00 MiB memory in use. Process 2489329 has 792.00 MiB memory in use. Process 2489401 has 796.00 MiB memory in use. Process 2489397 has 614.00 MiB memory in use. Process 2489199 has 796.00 MiB memory in use. Process 2489286 has 614.00 MiB memory in use. Process 2489203 has 634.00 MiB memory in use. Process 2489334 has 614.00 MiB memory in use. Process 2489362 has 614.00 MiB memory in use. Process 2489357 has 614.00 MiB memory in use. Process 2489264 has 614.00 MiB memory in use. Process 2489366 has 788.00 MiB memory in use. Process 2489240 has 772.00 MiB memory in use. Process 2489230 has 614.00 MiB memory in use. Process 2489226 has 614.00 MiB memory in use. Process 2489451 has 614.00 MiB memory in use. Process 2489306 has 614.00 MiB memory in use. Process 2489376 has 614.00 MiB memory in use. Process 2489315 has 614.00 MiB memory in use. Process 2489259 has 614.00 MiB memory in use. Process 2489399 has 432.00 MiB memory in use. Of the allocated memory 411.66 MiB is allocated by PyTorch, and 8.34 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation. See documentation for Memory Management (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)
Both is one out of many same patterned error, indicating it's multiple processes running at the same time, so cores and instances played a role, but reducing them doesn't help. And interestingly, in my notebook, my spark settings is 2 instances 16 cores, it still works.
Upvotes: 0
Views: 94