Reputation: 33
I currently receive csv files in the following format:
Col1, Col2, Col3, Col4
Header1, , ,
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
Header2, , ,
Val1, Val2, Val3, Val4
Header3, , ,
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
The number of rows per header can vary and the Headers can contain any words.
The expected result should be one of: Option 1: Save headers to additional column in 1 file File: abc/abc/complete_output
Col1, Col2, Col3, Col4, Col5
Val1, Val2, Val3, Val4, Header1
Val1, Val2, Val3, Val4, Header1
Val1, Val2, Val3, Val4, Header1
Val1, Val2, Val3, Val4, Header2
Val1, Val2, Val3, Val4, Header3
Val1, Val2, Val3, Val4, Header3
Option 2: create different file per header: File1: abc/abc/Header1
Col1, Col2, Col3, Col4
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
File2: abc/abc/Header2
Col1, Col2, Col3, Col4
Val1, Val2, Val3, Val4
File3: abc/abc/Header3
Col1, Col2, Col3, Col4
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
The files should either be split from the received format to different files or the header rows should be mapped to an additional column. Can this be done in Azure Data Factory, including Data Flow options? There is no access to a Databricks cluster.
P.S. I know this would be easy with a Python script whatsoever, but I hope to be able to build the complete flow in ADF.
I tried splitting the file based on conditional split. However, this does not work, as this just allows to select rows. This could only be used if (one of) the row values gave an indication about the Header.
No other things seem usable to me.
Edit: added desired output options as asked
Upvotes: 0
Views: 1046
Reputation: 6114
First row as header
option (So that you would get the output as shown in below image.) I have used ;
as column delimiter and |
as row delimiter and.header
activity is to extract the header (col1,col2,col3,col4) from lookup output.@first(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))))
each file
set variable activity to store all the data for each file. I initialized it with header
variable value.get first filename
is used to extract the name of first file (header1 in this case) using collection and string functions.@replace(first(skip(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))),1)),',,,','')
@skip(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))),2)
if condition
activity to check if the line is header (to be considered as filename) or not. Accordingly, I have concatenated each line accordingly and used copy data activity as per requirement.{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "file as text",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"dataset": {
"referenceName": "csv1",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "header",
"type": "SetVariable",
"dependsOn": [
{
"activity": "file as text",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "header",
"value": {
"value": "@first(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))))",
"type": "Expression"
}
}
},
{
"name": "each file",
"type": "SetVariable",
"dependsOn": [
{
"activity": "header",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "each file",
"value": {
"value": "@variables('header')",
"type": "Expression"
}
}
},
{
"name": "get first filename",
"type": "SetVariable",
"dependsOn": [
{
"activity": "each file",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "filename",
"value": {
"value": "@replace(first(skip(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))),1)),',,,','')",
"type": "Expression"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "get first filename",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@skip(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))),2)",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@contains(item(),',,,')",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "each row",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "each row",
"value": {
"value": "@concat(variables('each file'),decodeUriComponent('%0A'),item())",
"type": "Expression"
}
}
},
{
"name": "complete data",
"type": "SetVariable",
"dependsOn": [
{
"activity": "each row",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "each file",
"value": {
"value": "@variables('each row')",
"type": "Expression"
}
}
}
],
"ifTrueActivities": [
{
"name": "create each file",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"additionalColumns": [
{
"name": "req",
"value": {
"value": "@variables('each file')",
"type": "Expression"
}
}
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"mappings": [
{
"source": {
"name": "req",
"type": "String"
},
"sink": {
"type": "String",
"physicalType": "String",
"ordinal": 1
}
}
],
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "demo",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "op_files",
"type": "DatasetReference",
"parameters": {
"fileName": {
"value": "@variables('filename')",
"type": "Expression"
}
}
}
]
},
{
"name": "change filename",
"type": "SetVariable",
"dependsOn": [
{
"activity": "create each file",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "filename",
"value": {
"value": "@replace(item(),',,,','')",
"type": "Expression"
}
}
},
{
"name": "re initialise each file value",
"type": "SetVariable",
"dependsOn": [
{
"activity": "change filename",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "each file",
"value": {
"value": "@variables('header')",
"type": "Expression"
}
}
}
]
}
}
]
}
},
{
"name": "for last file within csv",
"type": "Copy",
"dependsOn": [
{
"activity": "ForEach1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"additionalColumns": [
{
"name": "req",
"value": {
"value": "@variables('each file')",
"type": "Expression"
}
}
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"mappings": [
{
"source": {
"name": "req",
"type": "String"
},
"sink": {
"type": "String",
"physicalType": "String",
"ordinal": 1
}
}
],
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "demo",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "op_files",
"type": "DatasetReference",
"parameters": {
"fileName": {
"value": "@variables('filename')",
"type": "Expression"
}
}
}
]
}
],
"variables": {
"header": {
"type": "String"
},
"each file": {
"type": "String"
},
"filename": {
"type": "String"
},
"each row": {
"type": "String"
}
},
"annotations": []
}
}
Upvotes: 2
Reputation: 563
If the input dataset is static, considering the second option as requirement then you can go with the following approach:
Add Filter transformation after source with expression as : !startsWith(Col1, 'Header')
Add surrogate key transformation to create the incremental identity column
Add conditional split transformation to split the data into three parts having these expressions:
stream1
: Id>=1 && Id<=3
stream2
: Id==4
stream3 :
Default
Use Select transformation to deselect 'Id' column
Add sink transformation to load the data to csv file
Upvotes: 0