Arthur Attout
Arthur Attout

Reputation: 2926

Read partitioned parquet files over ftp works with fastparquet, fails with pyarrow

I have partitioned .parquet files hosted on an FTP server with the following structure :

├───train_set=TS04
│       part.0.parquet
│       part.1.parquet
│       part.2.parquet
│
├───train_set=TS05
│       part.10.parquet
│       part.11.parquet
│
├───train_set=TS06
│       part.12.parquet
│       part.13.parquet
│
├───train_set=TS07
│       part.17.parquet
│
├───train_set=TS08
│       part.20.parquet
│
└───train_set=TS09
        part.21.parquet
        part.22.parquet

When attempting to parse them with pd.read_parquet, and engine set to fastparquet, this works just fine :

df = pd.read_parquet(
    'ftp://[email protected]:21/root_evts',
    storage_options={'password':'123'},
    engine="fastparquet"
)
len(df)  # => 9510029

However, when I use engine=pyarrow it hangs for 30 seconds, then throws this weird error

df = pd.read_parquet(
    'ftp://[email protected]:21/root_evts',
    storage_options={'password':'123'},
    engine="pyarrow"
)

error_reply: 200 Switching to Binary mode.

Here is the full traceback :

---------------------------------------------------------------------------
error_reply                               Traceback (most recent call last)
Cell In[15], line 1
----> 1 o_jru = pd.read_parquet(
      2     'ftp://[email protected]:21/root_evts',
      3     storage_options={'password':'123'},
      4     engine="pyarrow"
      5 )

File ~\source\repos\hotline_tool_venv\Lib\site-packages\pandas\io\parquet.py:670, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
    667     use_nullable_dtypes = False
    668 check_dtype_backend(dtype_backend)
--> 670 return impl.read(
    671     path,
    672     columns=columns,
    673     filters=filters,
    674     storage_options=storage_options,
    675     use_nullable_dtypes=use_nullable_dtypes,
    676     dtype_backend=dtype_backend,
    677     filesystem=filesystem,
    678     **kwargs,
    679 )

File ~\source\repos\hotline_tool_venv\Lib\site-packages\pandas\io\parquet.py:272, in PyArrowImpl.read(self, path, columns, filters, use_nullable_dtypes, dtype_backend, storage_options, filesystem, **kwargs)
    265 path_or_handle, handles, filesystem = _get_path_or_handle(
    266     path,
    267     filesystem,
    268     storage_options=storage_options,
    269     mode="rb",
    270 )
    271 try:
--> 272     pa_table = self.api.parquet.read_table(
    273         path_or_handle,
    274         columns=columns,
    275         filesystem=filesystem,
    276         filters=filters,
    277         **kwargs,
    278     )
    279     result = pa_table.to_pandas(**to_pandas_kwargs)
    281     if manager == "array":

File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\parquet\core.py:1825, in read_table(source, columns, use_threads, schema, use_pandas_metadata, read_dictionary, memory_map, buffer_size, partitioning, filesystem, filters, use_legacy_dataset, ignore_prefixes, pre_buffer, coerce_int96_timestamp_unit, decryption_properties, thrift_string_size_limit, thrift_container_size_limit, page_checksum_verification)
   1813     # TODO test that source is not a directory or a list
   1814     dataset = ParquetFile(
   1815         source, read_dictionary=read_dictionary,
   1816         memory_map=memory_map, buffer_size=buffer_size,
   (...)
   1822         page_checksum_verification=page_checksum_verification,
   1823     )
-> 1825 return dataset.read(columns=columns, use_threads=use_threads,
   1826                     use_pandas_metadata=use_pandas_metadata)

File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\parquet\core.py:1468, in ParquetDataset.read(self, columns, use_threads, use_pandas_metadata)
   1460         index_columns = [
   1461             col for col in _get_pandas_index_columns(metadata)
   1462             if not isinstance(col, dict)
   1463         ]
   1464         columns = (
   1465             list(columns) + list(set(index_columns) - set(columns))
   1466         )
-> 1468 table = self._dataset.to_table(
   1469     columns=columns, filter=self._filter_expression,
   1470     use_threads=use_threads
   1471 )
   1473 # if use_pandas_metadata, restore the pandas metadata (which gets
   1474 # lost if doing a specific `columns` selection in to_table)
   1475 if use_pandas_metadata:

File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\_dataset.pyx:562, in pyarrow._dataset.Dataset.to_table()

File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\_dataset.pyx:3722, in pyarrow._dataset.Scanner.to_table()

File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\error.pxi:154, in pyarrow.lib.pyarrow_internal_check_status()

File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\types.pxi:88, in pyarrow.lib._datatype_to_pep3118()

File ~\source\repos\hotline_tool_venv\Lib\site-packages\fsspec\spec.py:1846, in AbstractBufferedFile.read(self, length)
   1843 if length == 0:
   1844     # don't even bother calling fetch
   1845     return b""
-> 1846 out = self.cache._fetch(self.loc, self.loc + length)
   1847 self.loc += len(out)
   1848 return out

File ~\source\repos\hotline_tool_venv\Lib\site-packages\fsspec\caching.py:189, in ReadAheadCache._fetch(self, start, end)
    187     part = b""
    188 end = min(self.size, end + self.blocksize)
--> 189 self.cache = self.fetcher(start, end)  # new block replaces old
    190 self.start = start
    191 self.end = self.start + len(self.cache)

File ~\source\repos\hotline_tool_venv\Lib\site-packages\fsspec\implementations\ftp.py:326, in FTPFile._fetch_range(self, start, end)
    323         raise TransferDone
    325 try:
--> 326     self.fs.ftp.retrbinary(
    327         f"RETR {self.path}",
    328         blocksize=self.blocksize,
    329         rest=start,
    330         callback=callback,
    331     )
    332 except TransferDone:
    333     try:
    334         # stop transfer, we got enough bytes for this block

File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:436, in FTP.retrbinary(self, cmd, callback, blocksize, rest)
    422 """Retrieve data in binary mode.  A new port is created for you.
    423 
    424 Args:
   (...)
    433   The response code.
    434 """
    435 self.voidcmd('TYPE I')
--> 436 with self.transfercmd(cmd, rest) as conn:
    437     while 1:
    438         data = conn.recv(blocksize)

File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:393, in FTP.transfercmd(self, cmd, rest)
    391 def transfercmd(self, cmd, rest=None):
    392     """Like ntransfercmd() but returns only the socket."""
--> 393     return self.ntransfercmd(cmd, rest)[0]

File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:353, in FTP.ntransfercmd(self, cmd, rest)
    351 size = None
    352 if self.passiveserver:
--> 353     host, port = self.makepasv()
    354     conn = socket.create_connection((host, port), self.timeout,
    355                                     source_address=self.source_address)
    356     try:

File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:327, in FTP.makepasv(self)
    325 """Internal: Does the PASV or EPSV handshake -> (address, port)"""
    326 if self.af == socket.AF_INET:
--> 327     untrusted_host, port = parse227(self.sendcmd('PASV'))
    328     if self.trust_server_pasv_ipv4_address:
    329         host = untrusted_host

File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:839, in parse227(resp)
    835 '''Parse the '227' response for a PASV request.
    836 Raises error_proto if it does not contain '(h1,h2,h3,h4,p1,p2)'
    837 Return ('host.addr.as.numbers', port#) tuple.'''
    838 if resp[:3] != '227':
--> 839     raise error_reply(resp)
    840 global _227_re
    841 if _227_re is None:

error_reply: 200 Switching to Binary mode.

At first, I thought about a bug in the implementation of the FTP server, but I have tried 3 implementations (Filezilla, garethflowers/docker-ftp-server, delfer/docker-alpine-ftp-server) and they all throw the exact same error.

I would like to use row-wise filtering in read_parquet but this only works with pyarrow, which is why I want to switch.

How can I trace back what is causing the error ?

Upvotes: 0

Views: 100

Answers (0)

Related Questions