Reputation: 97
I want to write unstructured data to file in a transform. Running the transform in preview mode is successful, but building the dataset is not. Specifically, it seems like the FileSystem
object has a files_dir
attribute only when previewing and not at build time.
@transform(
data_input = Input("..."),
data_output= Output("..."),
)
def transform(data_input, data_output):
for file_status in data_input.filesystem().files('*.<file_ending>').collect():
data= PreProcessor(f"{data_input.filesystem().files_dir}/{file_status.path}")
# Throws "AttributeError: 'FileSystem' object has no attribute 'files_dir'"
data.write(f"{data_output.filesystem().files_dir}/transformed_file")
How can I resolve this issue? Why is the files_dir
attribute not set at build time?
Upvotes: 1
Views: 172
Reputation: 97
The issue occurs because files_dir
at runtime during preview mode. Instead, I used the hadoop_path
attribute, which fixes the issue.
Upvotes: 2
Reputation: 37177
Preview and build use different data volumes, which means that you may have slightly different results. Preview uses a subset of files, or a subset of data (1000 rows only), this means that your code in preview may not hit edge cases if they are not present in the specific subset that was chosen to be computed.
Here's some logic where I was getting all headers from a cluster of CSVs as an example. This works in both preview and build time.
@transform(
out=Output("out..."),
raw=Input("in..."),
)
def extract_all_headers(raw, out, ctx):
rows = []
headers = {}
files = raw.filesystem().ls("*.csv")
for csv in files:
with raw.filesystem().open(csv.path) as f:
header = f.readline()
for name in header.split(","):
name = name.strip()
if name not in headers:
headers[name] = True
rows.append([name])
f.close()
schema = T.StructType([T.StructField("column_name", T.StringType(), True)])
out.write_dataframe(ctx.spark_session.createDataFrame(rows, schema=schema))
Upvotes: 0