Reputation: 684
I have a cassandra table 'articles' with 400,000 rows with
primary key (source,created_at desc)
When I query our data using:
select * from articles where source = 'abc' and created_at <= '2016-01-01 00:00:00'
it takes 8 minutes to read 110,000 rows.
This is extremely slow and I don't know where the error lies.
I would like to read 100,000 rows in less than 10s. Not sure if this is possible?
Here some more details:
I have 3 nodes, replication factor =2, stragegy=SimpleStrategy, 4CPU, 32G RAM
I am using Cassandra-driver-3.0.0.
I am not sure if it comes from python or Cassandra since we are also using python.
Here is my CQL schema:
CREATE TABLE crawler.articles (
source text,
created_at timestamp,
id text,
category text,
channel text,
last_crawled timestamp,
text text,
thumbnail text,
title text,
url text,
PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);
Edit:
I want to query within the last couple of days new articles therefore my query is:
SELECT * FROM articles WHERE source = 'any source'
AND created_at >= '2016-01-08 00:00:00'
A sample insert would be:
INSERT INTO articles (source,created_at,id,category,channel,last_crawled,text,thumbnail,title,url)
VALUES ('money',1452417991000,'1290141063','news_video_top','',1452418260000,'','http://inews.gtimg.com/newsapp_ls/0/143487758_150120/0','article title','http://view.inews.qq.com/a/VID2016011002195801');
Client code:
'''
import sys
import logging
from cassandra import ConsistencyLevel
timespan = int(sys.argv[1])
source = str(sys.argv[2])
logging.basicConfig(filename='statistics-%d.log' % (timespan), format='%(asctime)-15s %(filename)s %(name)-8s %(message)s', level=logging.INFO)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('statistics'))
log = logging.getLogger('statistics')
try:
from datetime import datetime, timedelta
if __name__ == '__main__':
pass
from cassandra.cluster import Cluster
log.info('[%d] connecting cassandra...' % (timespan))
cluster = Cluster(['xxx', 'xxx', 'xxx'])
session = cluster.connect('crawler')
cluster = Cluster(['xxx', 'xxx', 'xxx'])
session_statis = cluster.connect('statistics')
created_at = datetime.utcnow() + timedelta(hours=-timespan)
print "[%s] FINDING ..." % (datetime.utcnow().isoformat())
statuses = {}
stmt = session.prepare("select * from articles where source = ? and created_at >= ? ")
category_stmt = session.prepare('SELECT category FROM channels WHERE source = ? and id = ?')
rows = session.execute(stmt, [source, created_at])
for row in rows:
try:
if row.channel and source != 'toutiao':
category = session.execute(category_stmt, ['zhihu' if row.source=='zhihuzero' else row.source, row.channel])
statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': category[0].category, 'author':'', 'genre':row.category }
else:
statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': row.category, 'author':'', 'genre':'' }
except Exception, e:
continue
print "%s weibos ..." % (len(statuses))
print "[%s] CACULATING ..." % (datetime.utcnow().isoformat())
stmt = session.prepare('SELECT article, MAX(comments) AS comments,MAX(likes) AS likes,MAX(reads) AS reads,MAX(shares) AS shares FROM axes WHERE article = ? AND at >= ?')
for statuses_id, status in statuses.iteritems():
rows = session.execute(stmt, [statuses_id, datetime.utcnow() + timedelta(hours=-timespan)])
for row in rows:
if source == 'toutiao':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
status['speed'] = row.comments
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'weibohao':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.shares
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'tencent':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.comments
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'zhihu':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.likes
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'buluo':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.reads
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'zhihuzero':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.likes
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
statuses = sorted(statuses.iteritems(), key=lambda (k, v): (v['speed'], k), reverse=True)[:1000]
print "[%s] TRUNCATING ..." % (datetime.utcnow().isoformat())
session_statis.execute('DELETE FROM statistics WHERE source = %s AND timespan = %s', (source, str(timespan))) #, consistency_level=ConsistencyLevel.QUORUM
print "[%s] UPDATING ..." % (datetime.utcnow().isoformat())
for i, status in statuses:
if status['speed'] > 0:
session_statis.execute('insert into statistics.statistics(source,timespan,id,title,thumbnail,url,text,created_at,category,genre,author,reads,likes,comments,shares,speed) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', (status['source'], status['timespan'], status['id'], status['title'], status['thumbnail'], status['url'], status['text'], status['created_at'], status['category'], status['genre'], status['author'], status['reads'], status['likes'], status['comments'], status['shares'], status['speed']))
else:
print status['id'], status['url']
print "[%s] DONE ..." % (datetime.utcnow().isoformat())
log.info('[%d] done' % (timespan))
except Exception, e:
print 'except ===:', e
Thanks for your replies!
Upvotes: 4
Views: 7226
Reputation: 1
Hmm, you should think more than twice using a database like cassandra for this kind av small databases with relational use cases. Use for example postgresql , that will make you life much happier.
I a tested one one of our postgresql really large databases and with the selections you indicate on a table with more than 200 columns and approximately a billion rows you will get sub second responses with out any special tuning except of course the relevant indexes
Remember: Cassandra is only relevant for very specialized use cases .
Upvotes: 0
Reputation: 4061
Your data modelling is not optimal. You have three primary keys and two indexes. Cassandra is a write optimised database, meaning every read is costly. Multiple primary keys and indexes will really slow down your throughput. An ideal table has one primary key. If you need to query by multiple parameters, the table needs to be duplicated with the different keys/indexes as the primary key. It will be a compromise between efficiency/data modeling.
IMHO, if you try to model Cassandra like a SQL database, you are going to have a very bad experience with efficiency.
Upvotes: 3
Reputation: 16400
How are you doing your reads? If you are using synchronous mechanisms the latency between your client app and server limits your throughput. So be sure to use the async mechanism http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Session.execute_async. But be careful and limit the number of inflight requests so you don't overrun your server. Probably keep it equal to number of concurrent_reads in cassandra.yaml. With only 4 cores you probably wont benefit much from increasing it.
The python client isn't the most efficient either, you can be pretty limited on the client side, so may want to utilize your application server more by spinning up multiple instances or using the java driver (https://github.com/datastax/java-driver).
Unless your data model is super wide with hotspots (ie partitioning needed as composite with source) or using slow drives/cpu 100,000 in theory could be read on the order of 10s of seconds or less.
Couple notes on schema: Disable row caching, its rarely a good idea. Dont use secondary indexes unless you REALLY know what your doing. You probably need to break up your partition keys, ie ((source, day), created_at, id) then add 2016-01-12 for jan 12th etc as day field.
Upvotes: 3
Reputation: 9475
Your use case is a little unusual. Cassandra is intended more for transactional operations on a small number of rows rather than doing bulk processing like you might do in hadoop.
The way you are doing your query, you are accessing one partition on a single node and transferring the 100K rows to your client. That's a lot of data to move across the network and I'm not sure why you would want to do that. You're doing everything sequentially, so you're getting no parallelism or benefit from having three nodes.
Usually if you want to do bulk processing on a lot of rows in Cassandra, you'd use Spark to do distributed processing on each node rather than sequentially fetch a lot of data to a client.
Also the two indexes you are creating don't look like they will work very well. Cassandra indexes are intended for fields that have a low cardinality, but you appear to be creating indexes on high cardinality fields. Cassandra indexes are very different from indexes in relational databases.
I'd have to see you client code to know if you are doing something inefficiently there. Usually fetching a lot of rows would trigger paging, so I'm not sure how you're handling that.
Upvotes: 5