Reputation: 1740
I have 2000 parquet files in a directory. Each parquet file is roughly 20MB in size. The compression used is SNAPPY. Each parquet file has rows that look like the following:
+------------+-----------+-----------------+
| customerId | productId | randomAttribute |
+------------+-----------+-----------------+
| ID1 | PRODUCT1 | ATTRIBUTE1 |
| ID2 | PRODUCT2 | ATTRIBUTE2 |
| ID2 | PRODUCT3 | ATTRIBUTE3 |
+------------+-----------+-----------------+
Each column entry is a string. I am using p3.8xlarge EC2 instance with the following configurations:
I am trying the following code:
def read_all_views(parquet_file_lst):
df_lst = []
for file in parquet_file_lst:
df = cudf.read_parquet(file, columns=['customerId', 'productId'])
df_lst.append(df)
return cudf.concat(df_lst)
This crashes after processing the first 180 files with the following runtime error:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 9, in read_all_views
File "/home/ubuntu/miniconda3/lib/python3.7/site-packages/cudf/io/parquet.py", line 54, in read_parquet
use_pandas_metadata,
File "cudf/_lib/parquet.pyx", line 25, in
cudf._lib.parquet.read_parquet
File "cudf/_lib/parquet.pyx", line 80, in cudf._lib.parquet.read_parquet
RuntimeError: rmm_allocator::allocate(): RMM_ALLOC: unspecified launch failure
Only 10% of both GPU and the CPU RAM is utilized at any given time. Any ideas how to debug this or what are the workarounds for the same?
Upvotes: 1
Views: 3964
Reputation: 4214
cuDF is a single GPU library. 2000 files of 20 MB would be about 40 GB of data, which is more than you can fit in memory in a single V100 GPU.
For workflows that require more a single GPU, cuDF relies on Dask. The following example illustrates how you could use cuDF + Dask to read data into distributed GPU memory with multiple GPUs in a single node. This doesn't answer your debugging question, but should hopefully solve your problem.
First, I use a few lines of code to create a Dask cluster of two GPUs.
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import dask_cudf
cluster = LocalCUDACluster() # by default use all GPUs in the node. I have two.
client = Client(cluster)
client
# The print output of client:
#
# Client
# Scheduler: tcp://127.0.0.1:44764
# Dashboard: http://127.0.0.1:8787/status
# Cluster
# Workers: 2
# Cores: 2
# Memory: 404.27 GB
Next I'll create a couple of parquet files for this example.
import os
import cudf
from cudf.datasets import randomdata
if not os.path.exists('example_output'):
os.mkdir('example_output')
for x in range(2):
df = randomdata(nrows=10000,
dtypes={'a':int, 'b':str, 'c':str, 'd':int},
seed=12)
df.to_parquet('example_output/df')
Let's look at the memory on each of my GPUs with nvidia-smi
.
nvidia-smi
Thu Sep 26 19:13:46 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.104 Driver Version: 410.104 CUDA Version: 10.0 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla T4 On | 00000000:AF:00.0 Off | 0 |
| N/A 51C P0 29W / 70W | 6836MiB / 15079MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla T4 On | 00000000:D8:00.0 Off | 0 |
| N/A 47C P0 28W / 70W | 5750MiB / 15079MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
Notice the two values. 6836 MB on GPU 0 and 5750 MB on GPU 1 (I happen to have unrelated data already in memory on these GPUs). Now let's read our entire directory of two parquet files with Dask cuDF and then persist
it. Persisting it forces computation -- Dask execution is lazy so just calling read_parquet
only adds a task to the task graph. ddf
is a Dask DataFrame.
ddf = dask_cudf.read_parquet('example_output/df')
ddf = ddf.persist()
Now let's look at nvidia-smi
again.
Thu Sep 26 19:13:52 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.104 Driver Version: 410.104 CUDA Version: 10.0 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla T4 On | 00000000:AF:00.0 Off | 0 |
| N/A 51C P0 29W / 70W | 6938MiB / 15079MiB | 2% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla T4 On | 00000000:D8:00.0 Off | 0 |
| N/A 47C P0 28W / 70W | 5852MiB / 15079MiB | 2% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
Dask handles distributing our data across both GPUs for us.
Upvotes: 9