Reputation: 53
Is there a way to use the datastax/spark-cassandra-connector to select the most recent version of each partition key that's equivalent to the Cassandra 3.6 and later PER PARTITION LIMIT option?
In Cassandra 3.6 and later, the PER PARTITION LIMIT option sets the maximum number of rows that the query returns from each partition. For example, create a table that will sort data into more than one partition.
I've tried the methods below with no success:
Cassandra Version
[cqlsh 5.0.1 | Cassandra 3.9.0 | CQL spec 3.4.2 | Native protocol v4]
Main
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraJavaPairRDD;
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
import scala.Tuple3;
import static java.lang.Double.*;
import static java.lang.Integer.*;
@Slf4j
public class Main extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Main(), args));
}
@Override
public int run(String[] args) throws Exception {
SPARK_SESSION = SparkSession
.builder()
.master(SPARK_MASTER)
.appName(APP_NAME)
.config("spark.cassandra.connection.host", CASSANDRA_HOST_IPS)
.config("spark.cassandra.auth.username", CASSANDRA_USER_NAME)
.config("spark.cassandra.auth.password", CASSANDRA_PASSWORD)
.config("pushdown", "true")
.getOrCreate();
try (JavaSparkContext sc = new JavaSparkContext(SPARK_SESSION.sparkContext())) {
insertPerPartitionLimitTestList();
getJavaRddPerPartitionLimitTest(sc);
getTypedJavaRddPerPartitionLimitTest(sc);
getJavaPairRddPerPartitionLimitTest(sc);
getCassandraJavaRddPerPartitionLimitTest(sc);
getTypedCassandraJavaRddPerPartitionLimitTest(sc);
getCassandraTableScanJavaRddPerPartitionLimitTest(sc);
getTypedCassandraTableScanJavaRddPerPartitionLimitTest(sc);
getCassandraJavaRddToJavaRddPerPartitionLimitTest(sc);
getSparkDatasetPerPartitionLimitTest(sc);
getSparkSqlDatasetPerPartitionLimitTest();
log.info("Done");
return 0; // success exit code
} catch (Throwable t) {
log.error("Spark transform failed.", t);
return 1; // failure exit code
}
}
public final Map<String, String> cassandraConfig(String keyspace, String table) {
return ImmutableMap.<String, String>builder()
.put("spark.cassandra.connection.host", CASSANDRA_HOST_IPS)
.put("spark.cassandra.auth.username", CASSANDRA_USER_NAME)
.put("spark.cassandra.auth.password", CASSANDRA_PASSWORD)
.put("pushdown", "true")
.put("keyspace", keyspace)
.put("table", table)
.build();
}
/**
* Generate test data to INSERT INTO the Cassandra bug.per_partition_limit_test table.
*
* @param listSize The number of rows of test data to generate.
* @return {@link List} of {@link PerPartitionLimitTest} containing test data.
*/
public List<PerPartitionLimitTest> buildPerPartitionLimitTestList(Integer listSize){
final Timestamp timeSeriesDate = Timestamp.from(LocalDateTime.now().atZone(ZoneId.of("UTC")).toInstant());
final List<PerPartitionLimitTest> perPartitionLimitTests = new ArrayList<>(listSize);
// Populate List of objects with test data.
for(int i = 0; i < listSize; i++){
final String itemUuid = UUID.randomUUID().toString();
perPartitionLimitTests.add(
PerPartitionLimitTest.of(
itemUuid,
timeSeriesDate,
String.format("/items/%s", itemUuid.toString())
)
);
}
return perPartitionLimitTests;
}
/**
* Generate test data and INSERT Dataset data into Cassandra table
*/
public void insertPerPartitionLimitTestList(){
final Map<String, String> cassandraConfig = cassandraConfig("bug", "per_partition_limit_test");
createDatasetFromList(
PerPartitionLimitTest.class,
buildPerPartitionLimitTestList(20)
)
.select("itemUuid", "timeSeriesDate", "itemUri")
.toDF("item_uuid",
"time_series_date",
"item_uri")
.write()
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(cassandraConfig)
.save();
}
private PerPartitionLimitTestRowReaderFactory perPartitionLimitTestRowReaderFactory = new PerPartitionLimitTestRowReaderFactory();
public String getPerPartitionLimitTestItemUuidMin(JavaSparkContext sc){
return String.valueOf(
getPerPartitionLimitTestDataset(
PerPartitionLimitTest.class,
"org.apache.spark.sql.cassandra",
cassandraConfig("bug", "per_partition_limit_test")
)
.first()
.getItemUuid());
}
public void getJavaRddPerPartitionLimitTest(JavaSparkContext sc){
final String itemUuidMin = String.valueOf(
getPerPartitionLimitTestDataset(
PerPartitionLimitTest.class,
"org.apache.spark.sql.cassandra",
cassandraConfig("bug", "per_partition_limit_test")
)
.first()
.getItemUuid());
JavaRDD<CassandraRow> javaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test")
.where(String.format("TOKEN(item_uuid) > TOKEN(%s) PER PARTITION LIMIT 1", itemUuidMin));
log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
}
public void getTypedJavaRddPerPartitionLimitTest(JavaSparkContext sc){
JavaRDD<PerPartitionLimitTest> javaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
.where("PER PARTITION LIMIT 1");
log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
}
public void getJavaPairRddPerPartitionLimitTest(JavaSparkContext sc){
JavaPairRDD<String, PerPartitionLimitTest> javaPairRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
.where("PER PARTITION LIMIT 1")
.keyBy((Function<PerPartitionLimitTest, String>) PerPartitionLimitTest::getItemUuid);
log.info(String.format("javaPairRDD.count() = %s", javaPairRDD.count()));
}
public void getTypedCassandraJavaRddPerPartitionLimitTest(JavaSparkContext sc){
CassandraJavaRDD<PerPartitionLimitTest> cassandraJavaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
.where("PER PARTITION LIMIT 1");
log.info(String.format("cassandraJavaRDD.count() = %s", cassandraJavaRDD.count()));
}
public void getCassandraTableScanJavaRddPerPartitionLimitTest(JavaSparkContext sc){
CassandraTableScanJavaRDD<CassandraRow> cassandraTableScanJavaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test")
.where("PER PARTITION LIMIT 1");
log.info(String.format("cassandraTableScanJavaRDD.count() = %s", cassandraTableScanJavaRDD.count()));
}
public void getTypedCassandraTableScanJavaRddPerPartitionLimitTest(JavaSparkContext sc){
CassandraTableScanJavaRDD<PerPartitionLimitTest> cassandraTableScanJavaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
.where("PER PARTITION LIMIT 1");
log.info(String.format("cassandraTableScanJavaRDD.count() = %s", cassandraTableScanJavaRDD.count()));
}
public void getCassandraJavaRddToJavaRddPerPartitionLimitTest(JavaSparkContext sc){
CassandraJavaRDD<CassandraRow> cassandraJavaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test");
JavaRDD<PerPartitionLimitTest> javaRDD = cassandraJavaRDD
.where("PER PARTITION LIMIT 1")
.map((Function<CassandraRow, PerPartitionLimitTest>) cassandraRow -> PerPartitionLimitTest.of(
cassandraRow.getUUID("item_uuid").toString(),
new Timestamp(cassandraRow.getDateTime("time_series_date").getMillis()),
cassandraRow.getString("item_uri")
));
log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
}
/**
* SELECT data from an input data source into a typed {@link Dataset}.
*
* @param clazz {@link Class} The class of type T that Spark should used to convert the internal Spark SQL representation into. This
* tells Spark the type of object each row in this Dataset should be encoded as.
* @param format Specifies the input data source format.
* @param config {@link Map} of {@link String} containing options defining the input data source connection.
* @param <T> type of class.
* @return Typed {@link Dataset} containing table data selected from the input data source.
*/
public <T> Dataset<T> getPerPartitionLimitTestDataset(Class<T> clazz, String format, Map<String, String> config) {
final Encoder<T> encoder = Encoders.bean(clazz);
return SPARK_SESSION
.read()
.format(format)
.options(config)
.load()
.select("item_uuid", "time_series_date", "item_uri")
.toDF("itemUuid", "timeSeriesDate", "itemUri")
.as(encoder);
}
public void getSparkDatasetPerPartitionLimitTest(JavaSparkContext sc){
final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
getPerPartitionLimitTestDataset(
PerPartitionLimitTest.class,
"org.apache.spark.sql.cassandra",
cassandraConfig("bug", "per_partition_limit_test")
)
.where("PER PARTITION LIMIT 1");
log.info(String.format("perPartitionLimitTestDataset.count() = %s", perPartitionLimitTestDataset.count()));
}
public void getSparkDatasetPerPartitionLimitTestWithTokenGreaterThan(JavaSparkContext sc){
final String itemUuidMin = getPerPartitionLimitTestItemUuidMin(sc);
final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
getPerPartitionLimitTestDataset(
PerPartitionLimitTest.class,
"org.apache.spark.sql.cassandra",
cassandraConfig("bug", "per_partition_limit_test")
)
.where(String.format("TOKEN(item_uuid) > TOKEN(%s) PER PARTITION LIMIT 1", itemUuidMin));
log.info(String.format("perPartitionLimitTestDataset.count() = %s", perPartitionLimitTestDataset.count()));
}
public void getSparkSqlDatasetPerPartitionLimitTest(){
final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
getPerPartitionLimitTestDataset(PerPartitionLimitTest.class, "org.apache.spark.sql.cassandra", cassandraConfig("bug", "per_partition_limit_test"));
// Register the DataFrame as a SQL temporary view
perPartitionLimitTestDataset.createOrReplaceTempView("perPartitionLimitTests");
final Encoder<PerPartitionLimitTest> perPartitionLimitTestEncoder = Encoders.bean(PerPartitionLimitTest.class);
// Modify data using Spark SQL
final Dataset<PerPartitionLimitTest> perPartitionLimitTestSqlDS = SPARK_SESSION.sql(
"SELECT item_uuid, "
+ "time_series_date, "
+ "'item_uri "
+ "FROM perPartitionLimitTests "
+ "PER PARTITION LIMIT 1")
.as(perPartitionLimitTestEncoder);
log.info(String.format("perPartitionLimitTestSqlDS.count() = %s", perPartitionLimitTestSqlDS.count()));
}
}
PerPartitionLimitTestRowReader
import java.io.Serializable;
import java.sql.Timestamp;
import com.datastax.driver.core.Row;
import com.datastax.spark.connector.CassandraRowMetadata;
import com.datastax.spark.connector.ColumnRef;
import com.datastax.spark.connector.cql.TableDef;
import com.datastax.spark.connector.rdd.reader.RowReader;
import com.datastax.spark.connector.rdd.reader.RowReaderFactory;
import scala.collection.IndexedSeq;
public class PerPartitionLimitTestRowReader extends GenericRowReader<PerPartitionLimitTest> {
private static final long serialVersionUID = 1L;
private static RowReader<PerPartitionLimitTest> reader = new PerPartitionLimitTestRowReader();
public static class PerPartitionLimitTestRowReaderFactory implements RowReaderFactory<PerPartitionLimitTest>, Serializable{
private static final long serialVersionUID = 1L;
@Override
public RowReader<PerPartitionLimitTest> rowReader(TableDef arg0, IndexedSeq<ColumnRef> arg1) {
return reader;
}
@Override
public Class<PerPartitionLimitTest> targetClass() {
return PerPartitionLimitTest.class;
}
}
@Override
public PerPartitionLimitTest read(Row row, CassandraRowMetadata rowMetaData) {
PerPartitionLimitTest perPartitionLimitTest = new PerPartitionLimitTest();
perPartitionLimitTest.setItemUuid(row.getUUID("item_uuid").toString());
perPartitionLimitTest.setTimeSeriesDate(new Timestamp(row.getTimestamp("time_series_date").getTime()));
perPartitionLimitTest.setItemUri(row.getString("item_uri"));
return perPartitionLimitTest;
}
}
}
GenericRowReader
import java.io.Serializable;
import com.datastax.spark.connector.ColumnRef;
import com.datastax.spark.connector.rdd.reader.RowReader;
import scala.Option;
import scala.collection.Seq;
public abstract class GenericRowReader<T> implements RowReader<T>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public Option<Seq<ColumnRef>> neededColumns() {
return Option.empty();
}
}
PerPartitionLimitTest Domain Entity
import java.io.Serializable;
import java.sql.Timestamp;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlType;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
@Data
@NoArgsConstructor
@Table(keyspace = "bug", name = "per_partition_limit_test")
@RequiredArgsConstructor(staticName = "of")
@XmlType(name = "PerPartitionLimitTest")
@XmlRootElement(name = "perPartitionLimitTest")
public class PerPartitionLimitTest implements Serializable {
/**
* Type 4 uuid that uniquely identifies the item.
*/
@Valid
@NotNull @NonNull
@Column(name = "item_uuid")
private String itemUuid;
/**
* The timestamp when the data was inserted into Cassandra.
*/
@NotNull @NonNull
@Column(name = "time_series_date")//, codec = TimestampTypeCodec.class)
private Timestamp timeSeriesDate;
/**
* URI that points to an itme.
*/
@Column(name = "item_uri")
@NotNull @NonNull
private String itemUri;
}
Cassandra Table:
USE bug;
DROP TABLE IF EXISTS bug.per_partition_limit_test;
CREATE TABLE bug.per_partition_limit_test (
item_uuid uuid,
time_series_date timestamp,
item_uri text static,
PRIMARY KEY ((item_uuid), time_series_date)
) WITH CLUSTERING ORDER BY (time_series_date DESC)
AND comment = 'Table Properties:
default_time_to_live - set to 518400 seconds which is 6 days, data will be automatically dropped after 6 days
Compaction
class - set to TimeWindowCompactionStrategy which is used for time series data stored in tables that use the default TTL for all data
compaction_window_unit - set to DAYS which is time unit used to define the bucket size
compaction_window_size - set to 6 which is how many units per bucket'
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '6', 'compaction_window_unit': 'DAYS'}
AND default_time_to_live = 518400
AND gc_grace_seconds = 519400;
Maven References:
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>2.0.0-M3</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.10</artifactId>
<version>2.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.2</version>
<scope>compile</scope>
</dependency>
Errors
[Stage 0:> (0 + 8) / 18]ERROR [2017-01-27 04:24:38,061] (Executor task launch worker-1) org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:132)
at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224)
at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200)
at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
... 3 common frames omitted
Wrapped by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:113)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
at com.sun.proxy.$Proxy8.prepare(Unknown Source)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:279)
... 16 common frames omitted
Wrapped by: java.io.IOException: Exception during preparation of SELECT "item_uuid", "time_series_date", "item_uri" FROM "bug"."per_partition_limit_test" WHERE token("item_uuid") > ? AND token("item_uuid") <= ? AND PER PARTITION LIMIT 1 ALLOW FILTERING: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
[Stage 0:> (0 + 8) / 18]ERROR [2017-01-27 04:26:02,044] (Executor task launch worker-3) org.apache.spark.executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3)
com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:132)
at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224)
at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200)
at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
... 3 common frames omitted
Wrapped by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:113)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
at com.sun.proxy.$Proxy8.prepare(Unknown Source)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:279)
... 16 common frames omitted
Wrapped by: java.io.IOException: Exception during preparation of SELECT "item_uuid", "time_series_date", "item_uri" FROM "bug"."per_partition_limit_test" WHERE token("item_uuid") > ? AND token("item_uuid") <= ? AND PER PARTITION LIMIT 1 ALLOW FILTERING: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
ERROR [2017-01-27 01:41:50,369] (main) Main: Spark transform failed.
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'PARTITION' expecting <EOF>(line 1, pos 67)
== SQL ==
TOKEN(item_uuid) > TOKEN(13432d97-3849-4158-8405-804447d1b0c3) PER PARTITION LIMIT 1
-------------------------------------------------------------------^^^
ERROR [2017-01-27 04:27:31,265] (main) Main: Spark transform failed.
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input ''' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IF', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', 'CURRENT_DATE', 'CURRENT_TIMESTAMP', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, SCIENTIFIC_DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 36)
== SQL ==
SELECT item_uuid, time_series_date, 'item_uri FROM perPartitionLimitTests PER PARTITION LIMIT 1
------------------------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
at Main.getSparkSqlDatasetPerPartitionLimitTest(Main.java:397)
at Main.run(Main.java:177)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
Upvotes: 2
Views: 1892
Reputation: 16576
No, there is a JIRA to add this feature https://datastax-oss.atlassian.net/browse/SPARKC-446
I added a PR for this if you would like to beta-test
Upvotes: 4