AngryCoder
AngryCoder

Reputation: 496

Rule-based mapping on Copy Activity in Azure Data Factory

I'm trying to create a dynamic mapping when I use copy data activity on Azure Data Factory. I want to create a parquet file that contains the same data that I'm reading from the source but I want to modfy some columns names to remove white spaces on it (It's a bug of Parquet format) and I want to do that automatically.

I have seen that this is possible in mapping data flow, but I don't see any such functionality on Copy Activity (Mapping data flow is limited to a few connectors as a source, so I can't use it).

As you can see on the image, it seems that I can only modify individual columns, not a few of them that fullfil certain conditions

enter image description here

How can I do that?

Thanks in advance

Upvotes: 1

Views: 1742

Answers (2)

Jason_450
Jason_450

Reputation: 31

Here is a solution to apply a dynamic column name mapping with ADF so that you can still use the copy data activities with parquet format, even when the source column names have pesky white-space characters which are not supported.

The solution involves three parts:

  1. Dynamically generate your list of mapped column names. The example below demonstrates how you could encode the white-space from an SQL database table source dataset dynamically with a lookup activity (referred to as 'lookup column mapping' below).
;with cols as (
select 
    REPLACE(column_name, (' ', '__wspc__') as new_name,
    column_name as old_name
from INFORMATION_SCHEMA.columns
where table_name = '@{pipeline().parameters.SOURCE_TABLE}'
and table_schema = '@{pipeline().parameters.SOURCE_SCHEMA}'
)
select ' |||'+old_name+'||'+new_name+'|' as mapping
from cols;
  1. Use an expression to repack the column mapping derived in the lookup activity in step 1. into the json syntax expected by the copy data activity template. You can insert this into a set variable activity with Array type variable (referred to as 'column_mapping_list' below).
@json(
    concat(
        '[ ',
        join(
            split(
                join(
                    split(
                        join(
                            split(
                                join(
                                    xpath(
                                        xml(
                                            json(
                                                concat( 
                                                '{\"root_xml_node\": ', 
                                                    string(activity('lookup column mapping').output),
                                                '}' 
                                                )
                                            )
                                        ),
                                    '/root_xml_node/value/mapping/text()',
                                    )
                                ','
                                ),
                            '|||'
                            ),
                        '{\"source\": { \"name\": \"'
                        ), 
                    '||'
                    ), 
                '\" },\"sink\": { \"name\": \"'
                ),
            '|'
            ), 
        '\" }}'
        ),  
    ' ]'
    )
)

Unfortunately the expression is more convoluted than we would like as the xpath function requires a single root node which is not provided by the lookup activity output, and the string escaping of the ADF json templates present some challenges to simplifying this.

  1. Lastly, use the column mapping list variable as "dynamic content" in the mapping section of the copy data activity with the following expression
@json(
    concat(
        '{ \"type\": \"TabularTranslator\", \"mappings\":',
        string(variables('column_mapping_list')),
    '}'
    )
)

Expected results:
Step 1.
'my wspccol' -> '|||my wspccol||my__wspc__wspcol|'

Step 2.
'|||my wspccol||my__wspc__wspccol|' -> ['{ "source": { "name": "my wspccol" }, "sink": { "name": "my__wspc__wspccol" } }']

Step 3.

{ 
    "type": "TabularTranslator", 
    "mappings": [
        { 
            "source": { "name": "my wspccol" }, 
            "sink": { "name": "my__wspc__wspccol" } 
        }
    ] 
}

Additionally:

Upvotes: 3

wBob
wBob

Reputation: 14389

The Copy activity can change from one file type to another, eg csv to json, parquet to database but it does not inherently allow any transform, such as changing the content of any columns, even adding additional columns.

Alternately consider using ADF to call a Databricks notebook for these complex rule-based transforms.

Upvotes: 0

Related Questions