Reputation: 101
I'm relatively new in programming( you can see it in my code) but I currently learning more about kafka and java for data process. With data in a topic, I need to do joins with some tables for check data exists and get others data, so i do some requests to database(Too many fields to retrieve, I need separate queries to make it readable).For each record retrieved from the topic I do some connections to the databases and then (afters process data) Update tables( I do this with batchs for tables, only this is fast).
My problem here is the time. I do tests with two hundred thousand registers and... It takes half an hour and six thousand, it's too slow. My code is something like
public class TestKafka {
public static Connection conexion = null;
public static void main(){
conexion = C3P0DataSource.getInstance().getConnection();
runConsumer();
}
.
..
public static void runConsumer(){
try // ( Connection conexion C3P0DataSource.getInstance().getConnection();)
{
conexion.setAutoCommit(false);
while (true) { // with kafka connector - I try to simulate data streaming
final ConsumerRecords<String, String> consumerRecords = consumer.poll(Long.MAX_VALUE);
List<Map<String, String>> recordData = new ArrayList<Map<String, String>>();
ObjectMapper mapper = new ObjectMapper();
for (ConsumerRecord<String, String> record : consumerRecords) {
Map<String, String> map = new HashMap<String, String>();
DataStructure_Topic config = mapper.readValue(record.value(), DataStructure_Topic.class);
map.put("row_id_1", config.getCodent());
map.put("row_id_2", config.getCentalta());
map.put("row_id_3", config.getCuenta());
datosAComprobar.add(map);
recordData = firstConsult(recordData, conexion);
if (recordData.size() > 0) {
recordData = SecondConsult(recordData, conexion);
// few petitions to the database
if (recordData.size() > 0) {
// ..data processing.. and update
}
}
datosAComprobar.clear();
}
consumer.commitSync();
Thread.sleep(100);
}
} catch(){...}
}
Request to the database( same structure per query):
public static List<Map<String, String>> FirstConsult(List<Map<String, String>> MyList, Connection conn) {
PreparedStatement stmt = null;
ResultSet rs = null;
List<Map<String, String>> list = new ArrayList<Map<String, String>>();
String query = "";
int contador = 1;
for (Map<String, String> val : MyList) {
query += "select " + val.get("row1") + " as r1, " + val.get("row2") + " as row2,"+val.get("cuenta")+"from table_a inner join table_b...."
if (contador < MyList.size()) {
query += "\r\nunion\r\n";
}
contador += 1;
}
try {
stmt = conn.prepareStatement(query);
rs = stmt.executeQuery();
ResultSetMetaData rsmd = rs.getMetaData();
int columnsNumber = rsmd.getColumnCount();
if (rs.next()) {
do {
Map<String, String> map = new HashMap<String, String>();
for (int i = 1; i <= columnsNumber; i++) {
String columnValue = rs.getString(i);
String columnName = rsmd.getColumnName(i);
map.put(columnName, columnValue);
}
if (!map.isEmpty()) {
list.add(map);
}
} while (rs.next());
}
} catch(e){...} finally {
try {
if(rs != null) rs.close();
if (stmt != null) stmt.close();
} catch (SQLException e) {...}
}
return list;
}
How can I improve my code or at least the connection to the database to get better times...? As I load more records it is slower. Do I need to close my connection? I close all statements and resultSets ...
Upvotes: 2
Views: 538
Reputation: 32090
As you've identified, this is not an efficient way to do things. A common pattern is instead of lookups to a database, you bring the database into Kafka and do the work there.
You can ingest a database table(s) into a Kafka topic using CDC and then use a stream processing technology such as Kafka Streams or ksqlDB to join between your original Kafka topic and the necessary data from the new Kafka topic populated from the database. My talk here shows it in action.
Upvotes: 1
Reputation: 3832
In your case you are fetching data from Database from different table and after process updating back to database tables again.
Kafka is distributed messaging system which can be parallelized by partitions and consumers. Means if you have N partitions with N consumer we can parallelize process upto N.
So if you are planning to use Kafka means you should be parallelizing process which will share the load across processes and reduce your overall performance time.
But keep in mind in that case you will be handling parallel transaction process in DB side.
Upvotes: 0