Reputation: 1019
I am trying to write integration test for the following python code:
import xx.settings.config as stg
from xx.infrastructure.utils import csvReader, dataframeWriter
from pyspark.sql import SparkSession
from typing import List
from awsglue.utils import getResolvedOptions
import sys
def main(argv: List[str]) -> None:
args = getResolvedOptions(
s3_bronze_bucket_name = args['S3_BRONZE_BUCKET_NAME']
s3_pre_silver_bucket_name = args['S3_PRE_SILVER_BUCKET_NAME']
s3_bronze_path = args['S3_BRONZE_PATH']
s3_pre_silver_path = args['S3_PRE_SILVER_PATH']
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
for table in list(stg.data_schema.keys()):
raw_data = stg.data_schema[table].columns.to_dict()
df = csvReader(spark, s3_bronze_bucket_name, s3_bronze_path, table, schema, '\t')
dataframeWriter(df, s3_pre_silver_bucket_name, s3_pre_silver_path, table, stg.data_schema[table].partitionKey)
if __name__ == '__main__':
I basically loop on a list of tables then read their content (csv format) from S3 and write them in parquet format in S3 also.
These are definitions of csvReader and dataframeWriter:
def csvReader(spark: SparkSession, bucket: str, path: str, table: str, schema: StructType, sep: str) -> DataFrame:
return ('csv')
.option('header', 'true')
.option('sep', sep)
def dataframeWriter(df: DataFrame, bucket: str, path: str, table: str, partition_key: str) -> None:
For my integration tests I would like to replace S3 interaction with local files interaction (read css from local and write parquet in local. This is what I done:
import os
from unittest import TestCase
from unittest.mock import patch, Mock
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import xx.application.perfmarket_pre_silver as perfmarket_pre_silver
from dvr_config_utils.config import initialize_settings
def local_csvReader(spark: SparkSession, table: str, schema: StructType, sep: str):
"""Mocked function that replaces real csvReader. this one reads from local rather than S3."""
return ('csv')
.option('header', 'true')
.option('sep', sep)
def local_dataframeWriter(df, table: str, partition_key: str):
"""Mocked function that replaces real dataframeWriter. this one writes in local rather than S3."""
output_dir = f'../output_mock/{table}/'
if not os.path.exists(output_dir):
class TestPerfmarketSilver(TestCase):
def setUpClass(cls):
cls.spark = SparkSession.builder.master('local').appName('TestPerfmarketSilver').getOrCreate()
cls.spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
def tearDownClass(cls):
"""Clean up the Spark session and test data."""
os.system('rm -rf ../output_mock')
@patch('src.xx.infrastructure.utils.csvReader', side_effect=local_csvReader)
@patch('xx.infrastructure.utils.dataframeWriter', side_effect=local_dataframeWriter)
def test_main(self, mock_csvreader, mock_datawriter, mocked_get_resolved_options: Mock):
expected_results = {'chemins': {'nbRows': 8}}
mocked_get_resolved_options.return_value = {
'JOB_NAME': 'perfmarket_pre_silver_test',
'S3_BRONZE_BUCKET_NAME': 'test_bronze',
'S3_PRE_SILVER_BUCKET_NAME': 'test_pre_silver',
'S3_BRONZE_PATH': '../input_mock',
'S3_PRE_SILVER_PATH': '../output_mock'
for table in stg.data_schema.keys():
# Verify that the output Parquet file is created
output_path = f'../output_mock/{table}/'
# Read the written Parquet file and check the data
written_df =
self.assertEqual(written_df.count(), expected_results[table]['nbRows']) # Check row count
for table in stg.data_schema.values()
for column_data in table['columns'].values()
== written_df.columns
What I wanted to do with these two lines:
@patch('src.xx.infrastructure.utils.csvReader', side_effect=local_csvReader)
@patch('xx.infrastructure.utils.dataframeWriter', side_effect=local_dataframeWriter)
Is to replace definitions of csvReader
by local_csvReader
and dataframeWriter
by local_dataframeWriter
Unfortunately, code is retuning
py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
This is my project structure:
├── src/
│ └── xx/
│ ├── application/
│ │ └──
│ ├──
│ ├── infrastructure/
│ │ ├──
│ │ └──
│ └── other_modules/
└── tests/
└── integration_tests/
└── application/
Both csvReader
and dataframeWriter
are defined in
Error is pointing to csvReader
call in main code (first snippet).
So my replacing technique is clearly not working.
What am I doing wrong please ?
Upvotes: 0
Views: 90
Reputation: 1019
This is what I did finally:
from moto import mock_aws
def local_csvReader(spark: SparkSession, table: str, schema: StructType, sep: str):
"""Mocked function that replaces real csvReader. this one reads from local rather than S3."""
return ('csv')
.option('header', 'true')
.option('sep', sep)
def local_dataframeWriter(df, table: str, partition_key: str):
"""Mocked function that replaces real dataframeWriter. this one writes in local rather than S3."""
output_dir = f'../output_mock/{table}/'
if not os.path.exists(output_dir):
def test_main(self, mock_csv_reader, mock_dataframe_writer):
from xx.application.perfmarket_pre_silver import main
mock_csv_reader.side_effect = local_csvReader
mock_dataframe_writer.side_effect = local_dataframeWriter
with patch('sys.argv', mock_args):
Upvotes: 0
Reputation: 13460
replace the s3a:// URL with file:// URLS and spark will just read/write to your local filesystem.
Trying to emulate S3 semantics is a really complex piece of work and I would seriously advise against it. If you do have to follow that path, either deploy minio locally or try out Adobe's S3 mock: . It has all the S3 operations the S3A connector supports. At least I thin so...we only test the s3a code against real S3 endpoints: S3 standard, S3 express, google gcs (!) and some third party stores. marginally slower but way better at finding regressions and quirks.
Upvotes: -1