Reputation: 2926
I have partitioned .parquet files hosted on an FTP server with the following structure :
├───train_set=TS04
│ part.0.parquet
│ part.1.parquet
│ part.2.parquet
│
├───train_set=TS05
│ part.10.parquet
│ part.11.parquet
│
├───train_set=TS06
│ part.12.parquet
│ part.13.parquet
│
├───train_set=TS07
│ part.17.parquet
│
├───train_set=TS08
│ part.20.parquet
│
└───train_set=TS09
part.21.parquet
part.22.parquet
When attempting to parse them with pd.read_parquet, and engine
set to fastparquet
, this works just fine :
df = pd.read_parquet(
'ftp://[email protected]:21/root_evts',
storage_options={'password':'123'},
engine="fastparquet"
)
len(df) # => 9510029
However, when I use engine=pyarrow
it hangs for 30 seconds, then throws this weird error
df = pd.read_parquet(
'ftp://[email protected]:21/root_evts',
storage_options={'password':'123'},
engine="pyarrow"
)
error_reply: 200 Switching to Binary mode.
Here is the full traceback :
---------------------------------------------------------------------------
error_reply Traceback (most recent call last)
Cell In[15], line 1
----> 1 o_jru = pd.read_parquet(
2 'ftp://[email protected]:21/root_evts',
3 storage_options={'password':'123'},
4 engine="pyarrow"
5 )
File ~\source\repos\hotline_tool_venv\Lib\site-packages\pandas\io\parquet.py:670, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
667 use_nullable_dtypes = False
668 check_dtype_backend(dtype_backend)
--> 670 return impl.read(
671 path,
672 columns=columns,
673 filters=filters,
674 storage_options=storage_options,
675 use_nullable_dtypes=use_nullable_dtypes,
676 dtype_backend=dtype_backend,
677 filesystem=filesystem,
678 **kwargs,
679 )
File ~\source\repos\hotline_tool_venv\Lib\site-packages\pandas\io\parquet.py:272, in PyArrowImpl.read(self, path, columns, filters, use_nullable_dtypes, dtype_backend, storage_options, filesystem, **kwargs)
265 path_or_handle, handles, filesystem = _get_path_or_handle(
266 path,
267 filesystem,
268 storage_options=storage_options,
269 mode="rb",
270 )
271 try:
--> 272 pa_table = self.api.parquet.read_table(
273 path_or_handle,
274 columns=columns,
275 filesystem=filesystem,
276 filters=filters,
277 **kwargs,
278 )
279 result = pa_table.to_pandas(**to_pandas_kwargs)
281 if manager == "array":
File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\parquet\core.py:1825, in read_table(source, columns, use_threads, schema, use_pandas_metadata, read_dictionary, memory_map, buffer_size, partitioning, filesystem, filters, use_legacy_dataset, ignore_prefixes, pre_buffer, coerce_int96_timestamp_unit, decryption_properties, thrift_string_size_limit, thrift_container_size_limit, page_checksum_verification)
1813 # TODO test that source is not a directory or a list
1814 dataset = ParquetFile(
1815 source, read_dictionary=read_dictionary,
1816 memory_map=memory_map, buffer_size=buffer_size,
(...)
1822 page_checksum_verification=page_checksum_verification,
1823 )
-> 1825 return dataset.read(columns=columns, use_threads=use_threads,
1826 use_pandas_metadata=use_pandas_metadata)
File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\parquet\core.py:1468, in ParquetDataset.read(self, columns, use_threads, use_pandas_metadata)
1460 index_columns = [
1461 col for col in _get_pandas_index_columns(metadata)
1462 if not isinstance(col, dict)
1463 ]
1464 columns = (
1465 list(columns) + list(set(index_columns) - set(columns))
1466 )
-> 1468 table = self._dataset.to_table(
1469 columns=columns, filter=self._filter_expression,
1470 use_threads=use_threads
1471 )
1473 # if use_pandas_metadata, restore the pandas metadata (which gets
1474 # lost if doing a specific `columns` selection in to_table)
1475 if use_pandas_metadata:
File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\_dataset.pyx:562, in pyarrow._dataset.Dataset.to_table()
File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\_dataset.pyx:3722, in pyarrow._dataset.Scanner.to_table()
File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\error.pxi:154, in pyarrow.lib.pyarrow_internal_check_status()
File ~\source\repos\hotline_tool_venv\Lib\site-packages\pyarrow\types.pxi:88, in pyarrow.lib._datatype_to_pep3118()
File ~\source\repos\hotline_tool_venv\Lib\site-packages\fsspec\spec.py:1846, in AbstractBufferedFile.read(self, length)
1843 if length == 0:
1844 # don't even bother calling fetch
1845 return b""
-> 1846 out = self.cache._fetch(self.loc, self.loc + length)
1847 self.loc += len(out)
1848 return out
File ~\source\repos\hotline_tool_venv\Lib\site-packages\fsspec\caching.py:189, in ReadAheadCache._fetch(self, start, end)
187 part = b""
188 end = min(self.size, end + self.blocksize)
--> 189 self.cache = self.fetcher(start, end) # new block replaces old
190 self.start = start
191 self.end = self.start + len(self.cache)
File ~\source\repos\hotline_tool_venv\Lib\site-packages\fsspec\implementations\ftp.py:326, in FTPFile._fetch_range(self, start, end)
323 raise TransferDone
325 try:
--> 326 self.fs.ftp.retrbinary(
327 f"RETR {self.path}",
328 blocksize=self.blocksize,
329 rest=start,
330 callback=callback,
331 )
332 except TransferDone:
333 try:
334 # stop transfer, we got enough bytes for this block
File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:436, in FTP.retrbinary(self, cmd, callback, blocksize, rest)
422 """Retrieve data in binary mode. A new port is created for you.
423
424 Args:
(...)
433 The response code.
434 """
435 self.voidcmd('TYPE I')
--> 436 with self.transfercmd(cmd, rest) as conn:
437 while 1:
438 data = conn.recv(blocksize)
File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:393, in FTP.transfercmd(self, cmd, rest)
391 def transfercmd(self, cmd, rest=None):
392 """Like ntransfercmd() but returns only the socket."""
--> 393 return self.ntransfercmd(cmd, rest)[0]
File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:353, in FTP.ntransfercmd(self, cmd, rest)
351 size = None
352 if self.passiveserver:
--> 353 host, port = self.makepasv()
354 conn = socket.create_connection((host, port), self.timeout,
355 source_address=self.source_address)
356 try:
File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:327, in FTP.makepasv(self)
325 """Internal: Does the PASV or EPSV handshake -> (address, port)"""
326 if self.af == socket.AF_INET:
--> 327 untrusted_host, port = parse227(self.sendcmd('PASV'))
328 if self.trust_server_pasv_ipv4_address:
329 host = untrusted_host
File ~\AppData\Local\Programs\Python\Python311\Lib\ftplib.py:839, in parse227(resp)
835 '''Parse the '227' response for a PASV request.
836 Raises error_proto if it does not contain '(h1,h2,h3,h4,p1,p2)'
837 Return ('host.addr.as.numbers', port#) tuple.'''
838 if resp[:3] != '227':
--> 839 raise error_reply(resp)
840 global _227_re
841 if _227_re is None:
error_reply: 200 Switching to Binary mode.
At first, I thought about a bug in the implementation of the FTP server, but I have tried 3 implementations (Filezilla, garethflowers/docker-ftp-server, delfer/docker-alpine-ftp-server) and they all throw the exact same error.
I would like to use row-wise filtering in read_parquet
but this only works with pyarrow, which is why I want to switch.
How can I trace back what is causing the error ?
Upvotes: 0
Views: 100