stn53
stn53

Reputation: 97

AttributeError thrown at build time but not in preview mode

I want to write unstructured data to file in a transform. Running the transform in preview mode is successful, but building the dataset is not. Specifically, it seems like the FileSystem object has a files_dir attribute only when previewing and not at build time.

@transform(
    data_input = Input("..."),
    data_output= Output("..."),
)
def transform(data_input, data_output):
    for file_status in data_input.filesystem().files('*.<file_ending>').collect():
         data= PreProcessor(f"{data_input.filesystem().files_dir}/{file_status.path}")

         # Throws "AttributeError: 'FileSystem' object has no attribute 'files_dir'"
         data.write(f"{data_output.filesystem().files_dir}/transformed_file")

How can I resolve this issue? Why is the files_dir attribute not set at build time?

Upvotes: 1

Views: 172

Answers (2)

stn53
stn53

Reputation: 97

The issue occurs because files_dir at runtime during preview mode. Instead, I used the hadoop_path attribute, which fixes the issue.

Upvotes: 2

fmsf
fmsf

Reputation: 37177

Preview and build use different data volumes, which means that you may have slightly different results. Preview uses a subset of files, or a subset of data (1000 rows only), this means that your code in preview may not hit edge cases if they are not present in the specific subset that was chosen to be computed.

Here's some logic where I was getting all headers from a cluster of CSVs as an example. This works in both preview and build time.

@transform(
    out=Output("out..."),
    raw=Input("in..."),
)
def extract_all_headers(raw, out, ctx):
    rows = []
    headers = {}
    files = raw.filesystem().ls("*.csv")
    for csv in files:
        with raw.filesystem().open(csv.path) as f:
            header = f.readline()
            for name in header.split(","):
                name = name.strip()
                if name not in headers:
                    headers[name] = True
                    rows.append([name])
            f.close()

    schema = T.StructType([T.StructField("column_name", T.StringType(), True)])
    
    out.write_dataframe(ctx.spark_session.createDataFrame(rows, schema=schema))

Upvotes: 0

Related Questions