Reputation: 21
I have a cassandra cluster with two nodes .. I have setup the spark job to query from this cassandra cluster which has 3651568 number of keys.
import com.datastax.spark.connector.rdd.ReadConf
import org.apache.spark.sql.cassandra
import org.apache.spark.sql.SparkSession
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "hostname)
val sc = new SparkContext(conf)
val spark = SparkSession.builder().master("local").appName("Spark_Cassandra").config("spark.cassandra.connection.host", "hostname").getOrCreate()
val studentsDF = spark.read.cassandraFormat("keyspacename", "tablename").options(ReadConf.SplitSizeInMBParam.option(32)).load()
studentsDF.show(1000)
I am able to query for first 1000 rows but i am unable to find a way to read from 1001th
row to 2000th row so that i can read data batch wise from Cassandra table using spark job.
as per the recommendation i started using java driver
I have to query from cassandra database using datastax java driver.. I am using datastax java driver version cassandra-java-driver-3.5.1
and apache-cassandra version apache-cassandra-3.0.9
and i have tried solving the dependencies by installing jars I have also checked the yaml file seeds , listen_address, rpc_address are all pointing to my host and start_native_transport is set to true
Here is my java code to establish connection to the cassandra database
`
import java.net.InetAddress;
import com.datastax.driver.core.Metadata;
import java.net.UnknownHostException;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
public class Started {
public void connect()
{
try
{
Cluster cluster;
Session session;
cluster = Cluster.builder().addContactPoints("***.***.*.*").build();
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(2000);
System.out.println("Connected to cluster:");
session= cluster.connect("demo");
Row row = session.execute("SELECT ename FROM demo.emp").one();
System.out.println(row.getString("ename"));
cluster.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
Started st = new Started();
st.connect();
}
}
`
I have only one node in the cassandra cluster and it is up and running . i am able to cqlsh to it also on 9042 port .. so far so good but when i am running my java program i am getting this error or exception message ...
Connected to cluster:
`
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /***.***.*.*:9042 (com.datastax.driver.core.exceptions.TransportException: [/***.***.*.*:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:232)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
at com.datastax.driver.core.Cluster$Manager.negotiateProtocolVersionAndConnect(Cluster.java:1631)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1549)
at com.datastax.driver.core.Cluster.init(Cluster.java:160)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:342)
at com.datastax.driver.core.Cluster.connect(Cluster.java:292)
at Started.connect(Started.java:22)
at Started.main(Started.java:34)
`
can anyone please help !!
Upvotes: 1
Views: 1545
Reputation: 21
That's the issue of the driver compatibility . Initially i was using cassandra-java-driver-3.5.1 and apache-cassandra-3.0.9 .
switch to cassandra-java-driver-3.0.8 and apache-cassandra-3.0.9
and also install few jar files : slf4j-log4j12-1.7.7.jar
, log4j-1.2.17.jar
, netty-all-4.0.39.Final.jar
..
works for me fine :)
Upvotes: 0
Reputation: 16576
This may be a bad fit for Spark. Show for example is just showing 1000 records, but the order of the records is not guaranteed. Multiple invocations could produce different results.
Your best bet within Spark is probably to get the results as a local iterator if you want to page through them but again this is probably not the best way to do things. Spark is a system for working on data on a remote cluster. This would mean doing your processing within the dataframe api.
If you really just want to slowly page through records you can use toLocalIterator
to grab batches back to your driver machine (not recommended). But you could accomplish something similiar by just doing a Select (*) using Java Driver. The result set iterator which is returned to you will page through results automatically as you progress through the results.
https://docs.datastax.com/en/developer/java-driver/3.2/manual/paging/
ResultSet rs = session.execute("your query");
for (Row row : rs) {
// Process the row ...
// By default this will only pull a new "page" of data from cassandra
// when the previous page has been fully iterated through. See the
// docs for more details
}
RDD Docs for Cassandra Dataframe Docs for Cassandra //RDD API sparkContext.cassandraTable("ks","tab").foreach(row => //processRow)
//Dataframe API - although similar foreach is available here as well
spark.read.format("org.apache.spark.sql.cassandra")
.load()
.select(//do some transforms)
.write(//pickoutput of request)
Why you might want to do this with an example
// This reads all data in large blocks to executors, those blocks are then pulled one at a time back to the Spark Driver.
sparkContext.cassandraTable("ks","tab").toLocalIterator
Upvotes: 1