Evaldas Buinauskas
Evaldas Buinauskas

Reputation: 14097

Joining streaming data in Apache Spark

Apologies if title is too vague, but I had trouble to phrase it properly.

So basically I'm trying to figure out whether Apache Spark, together with Apache Kafka is able to sync data from my relational database to Elasticsearch.

My plan is to use one of the Kafka connectors to read data from RDBMS and push it into Kafka topics. That would be the ERD of the model and DDL. Quite basic, Report and Product tables that have many-to-many relationship that exists in ReportProduct table: ERD

CREATE TABLE dbo.Report (
    ReportID INT NOT NULL PRIMARY KEY,
    Title NVARCHAR(500) NOT NULL,
    PublishedOn DATETIME2 NOT NULL);

CREATE TABLE dbo.Product (
    ProductID INT NOT NULL PRIMARY KEY,
    ProductName NVARCHAR(100) NOT NULL);

CREATE TABLE dbo.ReportProduct (
    ReportID INT NOT NULL,
    ProductID INT NOT NULL,
    PRIMARY KEY (ReportID, ProductID),
    FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
    FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));

INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');

INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');

INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);

SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
    ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
    ON P.ProductID = RP.ProductID;

My goal is to transform this into document with the following structure:

{
  "ReportID":1,
  "Title":"Yet Another Apache Spark StackOverflow question",
  "PublishedOn":"2017-09-12T19:15:28+00:00",
  "Product":[
    {
      "ProductID":1,
      "ProductName":"Apache"
    },
    {
      "ProductID":2,
      "ProductName":"Spark"
    },
    {
      "ProductID":3,
      "ProductName":"StackOverflow"
    },
    {
      "ProductID":4,
      "ProductName":"Random product"
    }
  ]
}

I was able to form such kind of structure using static data that I have mocked up locally:

report.join(
  report_product.join(product, "product_id")
    .groupBy("report_id")
    .agg(
      collect_list(struct("product_id", "product_name")).alias("product")
    ), "report_id").show

But I realize that this is too basic and streams are going to be way more complicated.

Data is changing irregularly, reports and their products are being constantly changed, products are changed once in a while (mostly on a weekly basis).

I would like to replicate any kind of changes into Elasticsearch that have happened in one of these tables.

Upvotes: 0

Views: 710

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32140

  1. Kafka Connect to pull the data from your source DB - you can use the JDBC Source which is available as part of Confluent Platform (or separately), and may also want to investigate kafka-connect-cdc-mssql

  2. Once you've got the data in Kafka, use either the Kafka Streams API to manipulate the data as desired, or look at the newly released KSQL. Which you choose will be driven by things like your preference for coding in Java (with Kafka Streams) or manipulating data in a SQL-like environment (with KSQL). Regardless, the output of both of these is going to be another Kafka topic.

  3. Finally, stream the Kafka topic from above into Elasticsearch using the Elasticsearch Kafka Connect plugin (available here, or as part of the Confluent Platform)

Upvotes: 1

Related Questions