Reputation: 75
I have a dataproc pipeline with which I do webscraping and store data in gcp. Task setting is something like this:
create_dataproc_cluster = DataprocCreateClusterOperator(
task_id='create_dataproc_cluster',
impersonation_chain=default_args['service_account'],
cluster_name=default_args['cluster_name'],
cluster_config=default_args['cluster_config'],
region=default_args['region'],
project_id=default_args['project_id'],
timeout=36000
)
load_file_from_web = DataprocSubmitPySparkJobOperator(
task_id='load_file_from_web',
cluster_name=default_args['cluster_name'],
region=default_args['region'],
main=default_args['load_from_web_file'], (a python code to do web scarping)
project_id=default_args['project_id'],
labels={'team': 'AAA',
'purpose': 'load_file_from_web'}
)
a task to delete the cluster like delete_dataproc_cluster
start >> create_dataproc_cluster >> load_file_from_web >> delete_dataproc_cluster >> end
I also pull config and args from a json file:
"cluster_config": {
"config_bucket": "AAAA",
"temp_bucket": "BBBB",
"gce_cluster_config": {
"internal_ip_only": true,
"subnetwork_uri": "CCC",
"service_account": "DDDD",
"service_account_scopes": [
"https://www...."
],
"tags": [
"JJJJ",
....
]
},
"master_config": {
"image_uri":"a custom image",
"num_instances": 1,
"machine_type_uri": "..",
"disk_config": {
"boot_disk_type": "...",
"boot_disk_size_gb": ...
}
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "...",
"disk_config": {
"boot_disk_type": "...",
"boot_disk_size_gb": ...
}
}
}
However I got this error:
> Job failed with message [ImportError: html5lib not found, please
> install it]. Additional details can be found at:\nhttps:
Which one is the feasible solution and how can I implement it? Install the package in the cluster OR add init-action OR force the dataproc job to install the package before going through the main lines of the code?(install within the python code)
Upvotes: 2
Views: 27