Reputation: 1106
I wrote the two functions below:
def dataproc_first_job (self,task_id, app,job):
return DataProcSparkOperator(
task_id=task_id,
dataproc_spark_jars=self.jar,
cluster_name=self.cluster,
main_class=self.main_name,
dataproc_spark_properties={
'spark.driver.extraJavaOptions': '-DAppConfig.appName=%s -DTrmRaw.Options.jobName=%s' % (app, job) },
trigger_rule=TriggerRule.ALL_DONE
)
def dataproc_second_job (self,task_id, app,job,prefix,dataset):
return DataProcSparkOperator(
task_id=task_id,
dataproc_spark_jars=self.jar,
cluster_name=self.cluster,
main_class=self.main_name,
dataproc_spark_properties={
'spark.driver.extraJavaOptions': '-DAppConfig.appName=%s -DTrmRaw.Options.jobName=%s -DTrm.Options.prefix=%s -DTrm.Metadata.outputBase=%s' %(app, domain, job, prefix, dataset) },
trigger_rule=TriggerRule.ALL_DONE
)
My aim is to refactor the python code to just use one function instead of two. I thought about using decorators . I am not sure if this is the best solution.
Any ideas how to cope with this please?
Upvotes: 0
Views: 53
Reputation: 1565
You could use default arguments to the function:
def dataproc_job(self, task_id, app, job, prefix=None, dataset=None):
if prefix is not None and dataset is not None:
props={
'spark.driver.extraJavaOptions': '-DAppConfig.appName=%s -DTrmRaw.Options.jobName=%s -DTrm.Options.prefix=%s -DTrm.Metadata.outputBase=%s' %(app, domain, job, prefix, dataset)
}
else:
props={
'spark.driver.extraJavaOptions': '-DAppConfig.appName=%s -DTrmRaw.Options.jobName=%s' % (app, job)
}
return DataProcSparkOperator( ... ) # Build your object as normal
Or if there are potentially more arguments to come, you can use kwargs
:
def dataproc_job(self, task_id, app, job, **kwargs):
if kwargs["prefix"] is not None and kwargs["dataset"] is not None:
props={
'spark.driver.extraJavaOptions': '-DAppConfig.appName=%s -DTrmRaw.Options.jobName=%s -DTrm.Options.prefix=%s -DTrm.Metadata.outputBase=%s' %(app, domain, job, kwargs["prefix"], kwargs["dataset"])
}
else:
props={
'spark.driver.extraJavaOptions': '-DAppConfig.appName=%s -DTrmRaw.Options.jobName=%s' % (app, job)
}
return DataProcSparkOperator( ... ) # Build your object as normal
At this stage, either way I would refactor again and extract the properties builder section as a seperate function:
def get_dataproc_spark_properties(prefix=None, dataset=None):
if prefix is not None and dataset is not None:
# etc...
#Build and return the properties string
or
def get_dataproc_spark_properties(**kwargs):
if kwargs["prefix"] is not None and kwargs["dataset"] is not None:
# etc...
#Build and return the properties string
And then call this function in your dataproc_job
function:
def dataproc_job(self, task_id, app, job, prefix=None, dataset=None):
props = get_dataproc_spark_properties(prefix, dataset)
or
def dataproc_job(self, task_id, app, job, **kwargs):
props = get_dataproc_spark_properties(**kwargs)
Upvotes: 0
Reputation: 86
I don't think that you need decorators. I'd do it this way:
def dataproc_first_job (self, task_id, app, job, prefix=None, dataset=None):
if prefix is None or dataset is None:
dataproc_spark_properties={
'spark.driver.extraJavaOptions': '-DAppConfig.appName=%s -DTrmRaw.Options.jobName=%s -DTrm.Options.prefix=%s -DTrm.Metadata.outputBase=%s' %(app,domain, job, prefix, dataset)
}
else:
dataproc_spark_properties={
'spark.driver.extraJavaOptions': '-DAppConfig.appName=%s -DTrmRaw.Options.jobName=%s' % (app, job)
}
return DataProcSparkOperator(
task_id=task_id,
dataproc_spark_jars=self.jar,
cluster_name=self.cluster,
main_class=self.main_name,
dataproc_spark_properties,
trigger_rule=TriggerRule.ALL_DONE,
)
Upvotes: 1