Michael K
Michael K

Reputation: 439

Databricks hierarchical sql query output as nested json

How can I produce the following nested json output (conversation = parent, message = child) using Databricks with pySpark or SQL

Desired output:

{
    "conversation_id": 1,
    "conversation_detail": "between mike and angie",
    "messages": [
        {
            "message_id": 1,
            "value": "hi mike"
        },
        {
            "message_id": 2,
            "value": "hi angie"
        }
    ]
},
{
    "conversation_id": 2,
    "conversation_detail": "between tim and lisa",
    "messages": [
        {
            "message_id": 1,
            "value": "hi tim"
        },
        {
            "message_id": 2,
            "value": "hi lisa"
        }
    ]
}

Basic output:

+---------------+----------------------+----------+---------------+--------+
|conversation_id|conversation_detail   |message_id|conversation_id|value   |
+---------------+----------------------+----------+---------------+--------+
|1              |between mike and angie|1         |1              |hi mike |
|1              |between mike and angie|2         |1              |hi angie|
|2              |between tim and lisa  |3         |2              |hi tim  |
|2              |between tim and lisa  |4         |2              |hi lisa |
+---------------+----------------------+----------+---------------+--------+

Basic query:

SELECT *
FROM conversation c 
JOIN messages m ON c.conversation_id = m.conversation_id

Setup:

CREATE OR REPLACE TABLE conversation 
(
    conversation_id INT,
    conversation_detail STRING
);

CREATE OR REPLACE TABLE messages 
(
    message_id INT,
    conversation_id INT,
    value STRING
);

INSERT INTO conversation
VALUES
  (1, 'between mike and angie'),
  (2, 'between tim and lisa');
  
INSERT INTO messages
VALUES
  (1, 1, 'hi mike'),
  (2, 1, 'hi angie'),
  (3, 2, 'hi tim'),
  (4, 2, 'hi lisa');

What I've tried:

SQL Server's FOR JSON function behaves like it want because it respects the parent/child relationship represented by the join.

Upvotes: 0

Views: 667

Answers (1)

Michael K
Michael K

Reputation: 439

I found the solution as follows:

import pyspark.sql.functions as F
import json
from pyspark.sql.functions import collect_list, create_map, lit

res = (
    df.groupBy("conversation_id", "conversation_detail")
    .agg(
        F.collect_list(
            F.create_map(
                F.lit("message_id"), "message_id", 
                F.lit("value"), "value"
            )
        ).alias("messages")
    )
    .toJSON()
    .collect()
)


for r in res:
    print(json.dumps(json.loads(r), indent=2))

Upvotes: 0

Related Questions