Reputation: 73
I'm processing a large volume of Kafka messages in .NET with a Kafka consumer at the moment.
Step 1 in my processing is to Parse the JSON and to discard many of the messages based on the value of a specific field in the JSON.
I'd like to not process (and specifically, not download) those unwanted messages in the first place.
It looks like a kSql query - written as a push query - can effectively filter out the messages I need to process.
How can I consume these via .NET though? I saw some documents mentioning a REST API, but I doubt that this is a good idea, I need to process in excess of 100 000 records per minute at peak times of day.( If I can selectively download and process messages, I will only be processing about one third of the current volume.)
Unfortunately I don't have control over the publisher, so I can't change what/how the messages are published.
Upvotes: 7
Views: 5255
Reputation: 777
You can use ksqldb Linq provider in the following manner.
Install package with Nuget package manager:
Install-Package ksqlDB.RestApi.Client
Create query with C# (.NET):
var ksqlDbUrl = @"http://localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);
await using var context = new KSqlDBContext(contextOptions);
using var subscription = context.CreateQueryStream<Message>() //stream name
.Where(p => p.RowTime >= 1510923225000) // add your own conditions
//....
.Select(l => new { l.Id, l.Message, l.RowTime })
.Subscribe(onNext: message =>
{
}, onError: error => { }, onCompleted: () => { });
The above C# code is equivalent to the following ksql:
SELECT Id, Message, RowTime FROM Messages WHERE RowTime >= 1510923225000 EMIT CHANGES;
Projects Wiki for more operators.
Upvotes: 4
Reputation: 32130
Yes, you can use ksqlDB to do this
-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT)
WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');
-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
SELECT * FROM my_source WHERE COL1='FOO';
Then using the REST API from within your application run a push query which will consume just the filtered messages:
SELECT * FROM target EMIT CHANGES;
Aside from ksqlDB, you might also want to have a look at this recently released project from the community: https://github.com/LGouellec/kafka-streams-dotnet
Upvotes: 2