Philip
Philip

Reputation: 343

How to get an arbitrary file SHA into Airflow's GithubOperator?

It seems like this should be a fairly standard use case for GithubOperator, so I must be missing something obvious. Surely this can't be difficult to do!

High level task: Using Airflow, orchestrate the update of contents of a certain arbitrary file on a certain arbitrary Github repo.

Attempted implementation: The Github API requires a SHA hash of the existing file in order to update it. I have managed to get that, using another GithubOperator instance and its return value as an XCom, so here I will assume that we have that arbitrary sha so we can focus on the argument passing into this GithubOperator that is failing.

Of course if there's some other solution not requiring using XCom to move the value of the SHA from task to task, that is OK too. (One alternative, to write the whole thing in a Python taskflow task, is still on the table but seems more complex than just getting the appropriate operator to work!)

What's not working

When I run the following code I see the unrendered template in my logs; it then fails an assertion on the type of the supposed SHA (i.e., it's not a string because the template didn't render)

[2023-06-01, 17:14:50 EDT] {{my_dag_file.py:33}} INFO - in 
main_result_processor the_sha is {{ task_instance.xcom_pull(task_ids='retrieve_xcom_blob_sha', dag_id='push_a_change_to_github', key='return_value') }}
[2023-06-01, 17:14:50 EDT] {{taskinstance.py:1768}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/providers/github/operators/github.py", line 72, in execute
    return self.result_processor(github_result)
  File "/usr/local/airflow/dags/my_dag.py", line 59, in <lambda>
    result_processor=lambda repo: main_result_processor(repo, blob_sha),
  File "/usr/local/airflow/dags/my_dag.py", line 34, in main_result_processor
    repo.update_file(
  File "/usr/local/airflow/.local/lib/python3.10/site-packages/github/Repository.py", line 2220, in update_file
    assert isinstance(sha, str)
AssertionError

DAG code:

"""
Minimal case of what's failing -- since we can get as far as extracting the SHA, let's just
hardcode that SHA and see if we can get it into the GithubOperator.
"""

import json
import logging

from airflow.decorators import dag, task
from airflow.providers.github.operators.github import GithubOperator
from airflow.utils.dates import days_ago

PARTIAL_URL = "our-account/our-repo"
default_args = {}


@dag(
    default_args={},
    schedule_interval=None,
    start_date=days_ago(365),
    dag_id='push_a_change_to_github',
)
def push_a_change_to_github_main():
    dummy01_contents = json.dumps(
        {
            "revision": 1,
            "message": "This is a dummy JSON file",
        }
    )
    dummy_path = "some-dir/dummy01.json"
    def main_result_processor(repo, the_sha):
        logging.info(f"in main_result_processor the_sha is {the_sha}")
        repo.update_file(
            path=dummy_path,
            message=f"""Update {dummy_path}

            Automated commit by an Airflow DAG
            """,
            content=json.dumps(dummy01_contents),
            sha=the_sha,
        )

    @task(provide_context=True, retries=1)
    def retrieve_xcom_blob_sha():
        # We need this task in here because in the real world we can't just hardcode a
        # SHA at DAG scope. This forces us to use the XCom mechanism to return a value from a task.
        # Once it works we can replace this taskflow Python task with the GithubOperator task to
        # query Github for the SHA, which we've verified does work.
        some_sha = "abcd0123ef01abcdabcd0123ef01abcd456789ab"
        return some_sha

    blob_sha = retrieve_xcom_blob_sha()
    push_an_update = GithubOperator(
        task_id="push_an_update",
        retries=1,
        github_method="get_repo",
        github_method_args={"full_name_or_id": PARTIAL_URL, },
        result_processor=lambda repo: main_result_processor(repo, blob_sha),
    )


push_a_change_to_github_main()

Upvotes: 0

Views: 162

Answers (1)

Philip
Philip

Reputation: 343

I deemed it plausible that the GithubOperator may not support this kind of use case. As an alternative, we can use the PythonVirtualenvOperator and TaskFlow syntax:

    @task.virtualenv(requirements=['PyGithub==1.58.2', ], retries=0, system_site_packages=False)
    def push_one_s3_file(sha_arg):
        from github import Github   # careful with the case!
    
        # local vars need to be [re]defined here!
        token = "gh_123"
        owner_repo = "mygithub/myrepo"
        file_path = "dir/subdir/file_to_update.py"
        new_contents = """# This is our new file"""
        gh = Github(token)
        repo = gh.get_repo(owner_repo)
        contents = repo.get_contents(dummy_path, ref="main")
        response = repo.update_file(contents.path, "Automated commit message", new_contents,
                         sha=sha_arg)

Do not confuse PyGithub with the github installation package; use the former!

Note that you can get the sha from a previous invocation of the GithubOperator then return it via your result_processor into a standard XCom return value. Thus when you call this TaskFlow task, you have it ready to pass in as a function argument.

Nevertheless, I hope there is a way to do this with GithubOperator, because it seems cleaner than my solution here.

Upvotes: 0

Related Questions