Rish Sri
Rish Sri

Reputation: 1

Azure Data factory Data Flow Activity

I Want to copy log analytics table A data to Table B in same log analytics workspace. To map hierarchical data from a source to a destination in Azure Data Factory, I am using Data Flows. My configuration.

My source response is:

`[
  {
    "tables": [
      {
        "name": "PrimaryResult",
        "columns": [
          {
            "name": "TimeGenerated",
            "type": "datetime"
          },
          {
            "name": "day_of_occurrence",
            "type": "datetime"
          },
          {
            "name": "request_id",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "url",
            "type": "string"
          },
          {
            "name": "item_type",
            "type": "string"
          },
          {
            "name": "cloud_role_name",
            "type": "string"
          },
          {
            "name": "P995_duration",
            "type": "real"
          },
          {
            "name": "P99_duration",
            "type": "real"
          },
          {
            "name": "P95_duration",
            "type": "real"
          },
          {
            "name": "P90_duration",
            "type": "real"
          },
          {
            "name": "P75_duration",
            "type": "real"
          },
          {
            "name": "P50_duration",
            "type": "real"
          },
          {
            "name": "item_count",
            "type": "int"
          }
        ],
        "rows": [
          [
            "2024-05-06T00:00:00Z",
            "2024-05-06T00:00:00Z",
            "",
            "GET /api/version",
            "https://api.vgdev.glint.cloud.dev.microsoft/api/version",
            "",
            "apiserver",
            5.8315,
            5.234724489795918,
            3.174455248506027,
            2.5453802347746817,
            1.2622889021938888,
            1.1403031329836708,
            18762
          ]
        ]
      }
    ]
  }
]
`

However LOG ANALYTICS sestination accepts jsob body in this format

`[
{
"TimeGenerated":"2024-05-06T03:59:02.9499434Z",
"day_of_occurrence":"2024-05-06T03:59:02.9499434Z",
"request_id":"dfasdsad",
"name":"gfhgfh",
"url":"gfhgf",
"item_type":"",
"cloud_role_name":"fvxccx",
"P995_duration":2,
"P99_duration":1,
"P95_duration":6,
"P90_duration":8,
"P75_duration":9.1,
"P50_duration":50,
"item_count":11
},
{
"TimeGenerated":"2024-05-06T03:59:02.9499434Z",
"day_of_occurrence":"2024-05-06T03:59:02.9499434Z",
"request_id":"xcxzzz",
"name":"ewqe",
"url":"bfvbvb",
"item_type":"",
"cloud_role_name":"gjghj",
"P995_duration":2,
"P99_duration":1,
"P95_duration":4,
"P90_duration":54,
"P75_duration":8.1,
"P50_duration":5,
"item_count":13
}

]`

How can I transform from source to destination?

I am not able to achieve transformation ?

Upvotes: 0

Views: 57

Answers (1)

Rakesh Govindula
Rakesh Govindula

Reputation: 11454

To achieve your requirement, you can follow the below step by step procedure. But note that the below approach works only for this case, and you need to add the data types manually as specifying data types dynamically from input JSON might not be possible in Dataflow.

After taking your JSON in the Dataflow source, take a Flatten transformation and flatten the tables array first and select only required fields rows and columns from it.

enter image description here

After this, take second flatten transformation and flatten the rows array.

enter image description here

Now, take a derived column transformation and create a new column with below expression.

replace(concat('{',replace(replace(replace(toString(mapIndex(rows,concat("'",columns[#index].name,"':",iif(in(['string','datetime'],columns[#index].type),concat("'",#item,"'"),#item)))),'[',''),']',''),'"',''),'}'),"'",'"')

enter image description here

This uses the rows and columns arrays and generates the required JSON object string for every row like below.

enter image description here

Use select transformation after this and select only new column.

Now, use Parse transformation to get the JSON objects from the above JSON string.

In this, give the new column and you need to give the output data types.

(TimeGenerated as string,
        day_of_occurrence as string,
        request_id as string,
        name as string,
        url as string,
        item_type as string,
        cloud_role_name as string,
        P995_duration as double,
        P99_duration as double,
        P95_duration as double,
        P90_duration as double,
        P75_duration as double,
        P50_duration as double,
        item_count as integer)

You can change the data types as per your requirement. Here, for sample I have given double data type, you can change it to float as well.

enter image description here

After this, the required columns will be created with given data types.

enter image description here

To get individual columns, use another select transformation with Rule based mapping.

enter image description here

This will generate required output columns.

enter image description here

You can change the decimals from here using derived column transformations as per your requirement. Give your target as sink and data will get copied.

This is Dataflow JSON till the select transformation for your reference:

{
    "name": "Logs_dataflow",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "logs_source_json",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [],
            "transformations": [
                {
                    "name": "flatten1"
                },
                {
                    "name": "flatten2"
                },
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "select1"
                },
                {
                    "name": "select2"
                },
                {
                    "name": "parse1"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          tables as (name as string, columns as (name as string, type as string)[], rows as string[][])[]",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'arrayOfDocuments') ~> source1",
                "source1 foldDown(unroll(tables, tables),",
                "     mapColumn(",
                "          columns = tables.columns,",
                "          rows = tables.rows",
                "     ),",
                "     skipDuplicateMapInputs: false,",
                "     skipDuplicateMapOutputs: false) ~> flatten1",
                "flatten1 foldDown(unroll(rows, rows),",
                "     mapColumn(",
                "          rows,",
                "          columns",
                "     ),",
                "     skipDuplicateMapInputs: false,",
                "     skipDuplicateMapOutputs: false) ~> flatten2",
                "flatten2 derive(new = replace(concat('{',replace(replace(replace(toString(mapIndex(rows,concat(\"'\",columns[#index].name,\"':\",iif(in(['string','datetime'],columns[#index].type),concat(\"'\",#item,\"'\"),#item)))),'[',''),']',''),'\"',''),'}'),\"'\",'\"')) ~> derivedColumn1",
                "derivedColumn1 select(mapColumn(",
                "          new",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "parse1 select(mapColumn(",
                "          each(new,match(true()))",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select2",
                "select1 parse(new = new ? (TimeGenerated as string,",
                "          day_of_occurrence as string,",
                "          request_id as string,",
                "          name as string,",
                "          url as string,",
                "          item_type as string,",
                "          cloud_role_name as string,",
                "          P995_duration as double,",
                "          P99_duration as double,",
                "          P95_duration as double,",
                "          P90_duration as double,",
                "          P75_duration as double,",
                "          P50_duration as double,",
                "          item_count as integer),",
                "     format: 'json',",
                "     documentForm: 'singleDocument') ~> parse1"
            ]
        }
    }
}

Upvotes: 0

Related Questions