Reputation: 53
So i am working on a project which converts non-structured logs into structured which is received in near real time stream in micro-batches of 2 seconds.
Application writes event logs. Each log has one field inside its message which is event id. Using this event id we can segregate different event.
In each event number of fields which needs to be extracted is different. For example if event id is 1 then by parsing that log we are getting 5 useful fields. In Event id 2 it may be 10. Also the meaning of fields are also subject to event. What ever value we get from position 2 of event id 1 it is absolutely different from the value we get from position 2 of event id 2. Resulting in separate schema for each event id.
I want to store this event in parquet so i have to specify strict schema. For that i am segregating all the event and later on casting events to appropriate schema. But in this case i am loosing the sequence in which all the events arrived or occurred. I guess this should be very formal and standard problem in making data structured. How can we tackle this situation efficiently and reliably. I am using logstash->kafka->spark-streaming stack.
one solution i thought is, I can add the increasing counter to each message. Due to some reasons i can not add this increasing number when events are generated. I have to handle it in logstash or kafka or spark-streaming. As kafka and spark-streaming is distributed. We maintaining single sequence counter is difficult it seems(Wanted suggetion for this also).
So i think to add it in logstash. I think of the following solution. To have one counter and increase it by one. But due to heavy load it may surpass this number in some time. We have very less updates so may be once i start logstash it may go up to 5 years in single start. Which can also lead to reach max limit.
Q1) So i am think of to implement counter such that it will reset to 0 at the end of day. As our data is partitioned on the day bases. Tried to implement in the following manner but drawback is every time i have to increase i have to check condition. Is there any better approach to handle this situation?
# I tested i have around 5000 messages coming in single second
seqCounter = 0
def increase():
seqCounter +=1
if (current_time == day_end_time): # checking this condition will add overhead it seems.
seqCounter = 0
Q2) There is one issue with reliability also. As system can fail or due to some reason logstash restart is required then also will loose the current sequence. When it will restart it will start from 0. To handle this we can cache the current sequence but i guess it will add potential io delays as at every increment number need to be updated in file. Also it will cause duplicate sequence number. Is there any better approach to handle this situation?
Q3) I can use kafka offset but it is per-partition bases. So if single topic is sending data to multiple partitions(Which is in my case), offset will be repeated. Is there any better approach to handle this situation?
Q4) Instead of logstash is there any other solution to handle this kind of situation.
** NOTE: In any of the solution major goal is to get back the sequence. The sequence number should be monotonically increasing so that i can get back event flow by sorting based upon that. Continuous sequence number is not needed.
Any help is appreciated.
Upvotes: -1
Views: 109
Reputation: 2841
If you are ok with accepting a compound 160-bit field (one int, two longs) Quality's unique_id provides a unique id, it's requirements are pretty simple (unique mac addresses for your cluster nodes and you don't rollback time on the driver node).
The benefits are it's always incrementing, related inserts are stored together, and it's fast for both storage and retrieval.
Upvotes: 0
Reputation: 217514
Trying to generate this sequence number in Logstash would only work if Logstash has a single worker (i.e., -w 1
), otherwise with several workers, events would not be processed in the order they are generated, but in parallel depending on how many workers there are.
That being said, by forcing Logstash to work with a single worker, you'd slow it down artificially, so it's not the best idea. Also, as you've rightly pointed out, you'd need to handle cases where Logstash breaks down or needs to be restarted.
That sequence number really needs to be something generated outside of Logstash. The first idea that comes to mind would be to use Redis INCR but there's no official redis
filter plugins.
Another idea would be to use an SQL sequence (for instance with Postgres) that you can easily query with the jdbc_streaming
filter plugin.
input {
...
}
filter {
jdbc_streaming {
jdbc_driver_library => "/path/to/jdbc-connector.jar"
jdbc_driver_class => "com.product.jdbc.Driver"
jdbc_connection_string => "jdbc:xxsql://localhost:1234/mydatabase"
jdbc_user => "me"
jdbc_password => "secret"
statement => "SELECT nextval('sequence');"
target => "sequence"
}
}
That would be a viable option only if you already have a DB that supports sequences, but not if you need to set one up just for this. Also depending on how many events per second are generated, having to query a remote DB for a sequence number might become a bottleneck.
Some plugins exist such as this one, but the sequence value is only in memory and would be lost if Logstash restarts. You can also create your own Java filter plugin to generate an incrementing sequence value, but you'd still need to find a way to persist the value across restarts.
There are some ongoing issues (such as #6997) that started working on that, but most of them are stale.
The best option you have remains to generate that sequence number right when events are being created.
Upvotes: 1