Reputation: 684
I get a stream of data which I am able to completely crawl. The data all gets put into Kafka and afterwards it gets send to Cassandra. Now the kafka consumer is very slow, much slower then the producer. I want them to be exactly the same. What can I do to achieve this result or what is wrong with my code?
Here is my Kafka consumer code in python:
import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
kafka = KafkaClient('localhost')
consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
while True:
messages = consumer.get_messages(count=16)
if len(messages) == 0:
print 'IDLE'
continue
for message in messages:
try:
response = json.loads(message.value)
data = json.loads(response['body'])
print response['body']
articles = data['articles']
idlist = [r['id'] for r in articles]
if len(idlist)>0:
article_rows = session.execute(article_lookup_stmt,[idlist])
rows = [r.id for r in article_rows]
for article in articles:
try:
if not article['id'] in rows:
article['created_at'] = parse(article['created_at'])
scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
log.debug('%s %s' % (article['id'],article['created_at']))
session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
except Exception as e:
print 'error==============:',e
continue
except Exception as e:
print 'error is:',e
log.exception(e.message)
except Exception as e:
log.exception(e.message)
EDIT:
I also added my profile results and the slow line of code seems to be
article_rows = session.execute(article_lookup_stmt,[idlist])
Sun Feb 14 16:01:01 2016 consumer.out
395793 function calls (394232 primitive calls) in 23.074 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
141 10.695 0.076 10.695 0.076 {select.select}
7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects}
1 0.542 0.542 23.097 23.097 consumer.py:5(<module>)
1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects}
38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
13 0.078 0.006 0.078 0.006 {time.sleep}
2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0}
141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
24548 0.017 0.000 0.017 0.000 {_struct.unpack}
44228/43959 0.016 0.000 0.016 0.000 {len}
Thank you for your reply.
Upvotes: 2
Views: 3239
Reputation: 5708
You can try running the consumer without saving to C*, so you can observe how much difference does it make.
If it turns out that saving to C* is a choke point (which I assume it is), you could have a thread pool (larger than 16 threads your consumer spawns) whose sole responsibility is to write to C*.
That way, you would offload the slow part of the code, which would leave only trivial parts in the consumer code.
You could use a from multiprocessing import Pool
.
More here.
Upvotes: 2