Reputation: 105
i'd like to unit test the following Flask API using pytest and mock:
#api.py
@api.route("/get_job/<job_name>")
class JobStatus(Resource):
@api.marshal_with(m_res)
def get(self, job_name):
try:
job_status = gcp.generate_signed_url("bucket_name", blob_path)
except ValueError as e:
try:
client = kube.SparkApplicationClient()
job_status = client.get(job_name)["status"]["applicationState"]["state"]
except ApiException as e:
abort(404, "job doesn't exist: %s\n" % e)
return {"job_status": job_status}
i wrote the following test for the case when both exception are sent but the patches don't work:
#test_api.py
import pytest
from unittest.mock import patch, MagicMock
from kubernetes.client.rest import ApiException
from api import server
@pytest.fixture
def app():
app = server.app
return app
@patch("api.gcp.generate_signed_url", MagicMock(side_effect=Exception(ValueError)))
@patch("api.kube.SparkApplicationClient.get", MagicMock(side_effect=Exception(ApiException)))
def test_get_job_status_doesnt_exists(client):
response = client.get(
"/api/get_job/68987af5-ccf4-4c11-a89a-d6645b7f7"
)
assert response.status_code == 404
basically i'm trying to get gcp.generate_signed_url
& client.get(job_name)
to raise an exception while testing
thanks!!! :)
Upvotes: 1
Views: 922
Reputation: 11414
Try:
@patch(
"api.gcp.generate_signed_url",
side_effect=ValueError)
)
And:
@patch(
"api.kube.SparkApplicationClient.get",
**{
'return_value.get.side_effect': ApiException
}
)
Upvotes: 1