Reputation: 1903
I have this task:
load_io_monthly_billing_to_snowflake = SnowflakeLoadOperator(
task_id='load_io_monthly_billing_to_Snowflake',
table=IOMonthlyBillingSnowflakeTable(),
merge=False,
partition=MonthlyBillingS3PartitionLatestFile(
source_resolver=get_latest_io_monthly_billing_file_from_new_prefix,
bucket=Variable.get('s3_bucket'),
location_base="BTR/",
partition_type=None
)
)
in the source_resolver the operator is calling this function:
def get_latest_io_monthly_billing_file_from_new_prefix(*args, **kwargs):
'does stuff to return file s3 file path'
return file
and in my partition Class:
class MonthlyBillingS3PartitionLatestFile(Partition):
def __init__(self,
source_resolver,
*args,
obj=None,
**kwargs):
super().__init__(*args, **kwargs)
self.object = obj
self.source_resolver = source_resolver
@property
def formatted_partition_value(self):
source = self.source_resolver
prefix, filename = os.path.split(source)
This line prefix, filename = os.path.split(source)
is causing the error which I don't understand because the get_latest_io_monthly_billing_file_from_new_prefix
is definitely returning a path, so why is this error happening:
prefix, filename = os.path.split(source)
File "/usr/local/lib/python3.8/posixpath.py", line 103, in split
p = os.fspath(p)
TypeError: expected str, bytes or os.PathLike object, not function
Upvotes: 1
Views: 1899
Reputation: 83527
You just need to call the function:
source = self.source_resolver()
Note the parentheses. You will also need to supply any parameters.
Upvotes: 1