Reputation: 465
I am writing a code in palantir using pyspark and I have this error which I am unable to figure out.
The Error is:
A TransformInput object does not have an attribute withColumn.
Please check the spelling and/or the datatype of the object.
My code for your reference
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import when
from transforms.api import configure, transform, Input, Output
@transform(
result = Output('Output_data_file_location'),
first_input=Input('Input_file1'),
second_input= Input('Input_file2'),
)
def function_temp(first_input, second_input, result):
from pyspark.sql.functions import monotonically_increasing_id
res = ncbs.withColumn("id", monotonically_increasing_id())
# Recode type
res = res.withColumn("old_col_type", F.when(
(F.col("col_type") == 'left') | (F.col("col_type") == 'right'), 'turn'
).when(
(F.col("col_type") == 'up') | (F.col("col_type") == 'down'), 'straight'
))
res = res.withColumnRenamed("old_col_type","t_old_col_type") \
.withColumnRenamed("old_col2_type","t_old_col2_type")
res = res.filter((res.col_type== 'straight')
res = res.join(second_input, #eqNullSafe is like an equal sign but includes null in join
(res.col1.eqNullSafe(second_input.pre_col1)) &
(res.col2.eqNullSafe(second_input.pre_col2)),
how='left')\
.drop(*["pre_col1", "pre_col2"]).withColumnRenamed("temp_result", "final_answer")
result.write_dataframe(res)
Can anyone help me with the error. Thanks in advance
Upvotes: 3
Views: 1189
Reputation: 6998
The error code you are receiving explains it pretty well, you are calling .withColumn()
on an object that is not a regular Spark Dataframe but a TransformInput
object. You need to call the .dataframe()
method to access the Dataframe.
The documentation for reference.
In addition you should move the monotonically_increasing_id
to the top of the file, since Foundrys transform logic level versioning only works when the imports are happening on the module level, according to the documentation.
Upvotes: 9