user1753838
user1753838

Reputation: 65

OperationTimeoutException Cassandra cluster AWS / EMR

I have an Java app running on Amazon on a cluster of Cassandra managed by Priam.

We use the Elastic Map/Reduce service of Amazon, and at certain moment at time, when I running EMR and I try get some data inserted on Cassandra I got an Exception: OperationTimeoutException.

These are the configuration parameters passed when I create my Cassandra pool over Astyanax:

`ConnectionPoolConfigurationImpl conPool = new` `ConnectionPoolConfigurationImpl(getConecPoolName())`
    .setMaxConnsPerHost(20)
        .setSeeds("ec2-xx-xxx-xx-xx.compute-1.amazonaws.com")
    .setMaxOperationsPerConnection(100)                       .setMaxPendingConnectionsPerHost(20) 
    .setConnectionLimiterMaxPendingCount(20) 
    .setTimeoutWindow(10000) 
    .setConnectionLimiterWindowSize(1000) 
    .setMaxTimeoutCount(3) 
    .setConnectTimeout(5000) 
    .setMaxFailoverCount(-1) 
    .setLatencyAwareBadnessThreshold(20)
        .setLatencyAwareUpdateInterval(1000)
    .setLatencyAwareResetInterval(10000) 
        .setLatencyAwareWindowSize(100) 
    .setLatencyAwareSentinelCompare(100f) 


AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder()
        .forCluster("clusterName")
        .forKeyspace("keyspaceName")
    .withAstyanaxConfiguration(
           new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE))
    .withConnectionPoolConfiguration(conPool)
    .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
    .buildKeyspace(ThriftFamilyFactory.getInstance());

full stack trace:

ERROR com.s1mbi0se.dg.input.service.InputService (main): EXCEPTION:OperationTimeoutException: [host=ec2-xx-xxx-xx-xx.compute-1.amazonaws.com(10.100.6.242):9160, latency=10004(10004), attempts=1]TimedOutException()

com.netflix.astyanax.connectionpool.exceptions.OperationTimeoutException: OperationTimeoutException: [host=ec2-54-224-65-18.compute-1.amazonaws.com(10.100.6.242):9160, latency=10004(10004), attempts=1]TimedOutException()
    at com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException(ThriftConverter.java:171)
    at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:61)
    at com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$1$2.execute(ThriftColumnFamilyQueryImpl.java:206)
    at com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$1$2.execute(ThriftColumnFamilyQueryImpl.java:198)
    at com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftConnection.execute(ThriftSyncConnectionFactoryImpl.java:151)
    at com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverImpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69)
    at com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnectionPool.executeWithFailover(AbstractHostPartitionConnectionPool.java:253)
    at com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$1.execute(ThriftColumnFamilyQueryImpl.java:196)
    at com.s1mbi0se.dg.input.service.InputService.searchUserByKey(InputService.java:833)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:771)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: TimedOutException()
    at org.apache.cassandra.thrift.Cassandra$get_slice_result.read(Cassandra.java:7874)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
    at org.apache.cassandra.thrift.Cassandra$Client.recv_get_slice(Cassandra.java:594)
    at org.apache.cassandra.thrift.Cassandra$Client.get_slice(Cassandra.java:578)
    at com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$1$2.internalExecute(ThriftColumnFamilyQueryImpl.java:211)
    at com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$1$2.internalExecute(ThriftColumnFamilyQueryImpl.java:198)
    at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:56)

So I don't know what direction I get to go to solve this problem, because the problem may be in Astyanax pool configuration, EC2 machine configuration (memory increase ?), Priam configuration or some another configuration necessary for Cassandra or EMR service on AWS in my code ... any hint ?


follows the stack trace:

INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
WARN org.apache.hadoop.mapred.Child (main): Error running child
java.lang.RuntimeException: InvalidRequestException(why:Start key's token sorts after end token)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$WideRowIterator.maybeInit(ColumnFamilyRecordReader.java:453)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$WideRowIterator.computeNext(ColumnFamilyRecordReader.java:459)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$WideRowIterator.computeNext(ColumnFamilyRecordReader.java:406)
    at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
    at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordReader.getProgress(ColumnFamilyRecordReader.java:103)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.getProgress(MapTask.java:522)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:547)
    at org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:771)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: InvalidRequestException(why:Start key's token sorts after end token)
    at org.apache.cassandra.thrift.Cassandra$get_paged_slice_result.read(Cassandra.java:14168)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
    at org.apache.cassandra.thrift.Cassandra$Client.recv_get_paged_slice(Cassandra.java:769)
    at org.apache.cassandra.thrift.Cassandra$Client.get_paged_slice(Cassandra.java:753)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$WideRowIterator.maybeInit(ColumnFamilyRecordReader.java:438)
    ... 
INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task

Upvotes: 1

Views: 2626

Answers (2)

user1753838
user1753838

Reputation: 65

we solved the problem (Dean I answered this question on Cassandra Users Group but I will put again what we did here to solve the problem )

  • first we updated the Cassandra for 1.2.3 version
  • after updated Cassandra, a new exception was launched "No hosts to borrow from" and we discovered that the command "ConnectionPoolConfigurationImpl(...).setConnectTimeout(-1)" was the cause ...
  • and we put .setConnectTimeout(2000)
  • we increased the other values from Astyanax pool and our App finally worked ...

basically I think our initial problem was Amazon latency that was to high, so we changed our pool configuration and the things worked fine ...

thank all for the help (mainly Dean) !

below our actual pool configuration that worked on Amazon:

new ConnectionPoolConfigurationImpl(getConecPoolName())
.setMaxConnsPerHost(CONNECTION_POOL_SIZE_PER_HOST)
.setSeeds(getIpSeeds())
    .setMaxOperationsPerConnection(10000) 
    .setMaxPendingConnectionsPerHost(20) 
    .setConnectionLimiterMaxPendingCount(20)    
        .setTimeoutWindow(10000) 
    .setConnectionLimiterWindowSize(2000)
    .setMaxTimeoutCount(3) 
    .setConnectTimeout(100) 
    .setConnectTimeout(2000)
    .setMaxFailoverCount(-1) 
    .setLatencyAwareBadnessThreshold(20)
    .setLatencyAwareUpdateInterval(1000) // 10000
    .setLatencyAwareResetInterval(10000) // 60000
    .setLatencyAwareWindowSize(100) // 100
    .setLatencyAwareSentinelCompare(100f)                      .setSocketTimeout(30000)
    .setMaxTimeoutWhenExhausted(10000)
    .setInitConnsPerHost(10)
        ;

AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder().forCluster(clusterName).forKeyspace(keyspaceName)
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE).setConnectionPoolType(ConnectionPoolType.ROUND_ROBIN).setDiscoveryDelayInSeconds(10000)
        .setDiscoveryDelayInSeconds(10000))
        .withConnectionPoolConfiguration(conPool)
            .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
        .buildKeyspace(ThriftFamilyFactory.getInstance());

Upvotes: 1

Dean Hiller
Dean Hiller

Reputation: 20204

So, what happens if you set your timeouts to -1 instead? Personally, I would dig into the astyanax code and try to figure out how to disable the timeouts. Run your stuff again and it should keep going though your cluster may be hammered of course if you are getting timeouts...I am assuming you are okay with that.

EDIT(after post edit): shoot, I forgot to ask you which version of cassandra you are using. I am looking at this code BUT line 346 is your line 438 (you are using the widerow iterator which most likely means a row scan (ie. pieces of the row).

http://grepcode.com/file/repo1.maven.org/maven2/org.apache.cassandra/cassandra-all/1.2.2/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java#ColumnFamilyRecordReader.0split

in which we can at least see this is getting a key range but paged as rows may be too wide(there is another iterator for rows that are not too wide for memory). I believe you are correct that you cannot use two partitioner types. To get more info on this, I highly suggest modifying ColumnFamilyRecordReader.java to log the ColumnFamilySplit(it has a toString on it). You can log that in the initialize method as well as log the JobRange too(which also has a toString one it).

ie.

logger.warn("my split range="+split+" job's total range="+jobRange);

Your version will have alot of similarities with this code. What version are you on?

I would also log the KeySlice in addition to the split just in case as either can cause that error if I remember correctly. Let me know what version you are on and add some logs as well to get more info on your situation. (their stuff typically compiles very easily out of the box with no issues).

Dean

Upvotes: 0

Related Questions