Ivan
Ivan

Reputation: 7746

Cassandra C# insert seems to be deleting prior data?

I created a cassandra db like this:

cqlsh:timeseries> describe keyspace timeseries;

CREATE KEYSPACE timeseries WITH replication = {
  'class': 'SimpleStrategy',
  'replication_factor': '1'
};

USE timeseries;

CREATE TABLE option_data (
  ts timestamp,
  ask decimal,
  bid decimal,
  expiry timestamp,
  id text,
  strike decimal,
  symbol text,
  PRIMARY KEY ((ts))
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
  comment='' AND
  dclocal_read_repair_chance=0.100000 AND
  gc_grace_seconds=864000 AND
  index_interval=128 AND
  read_repair_chance=0.000000 AND
  replicate_on_write='true' AND
  populate_io_cache_on_flush='false' AND
  default_time_to_live=0 AND
  speculative_retry='99.0PERCENTILE' AND
  memtable_flush_period_in_ms=0 AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};

CREATE TABLE underlying_data (
  symbol text,
  ask decimal,
  bid decimal,
  ts bigint,
  PRIMARY KEY ((symbol))
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
  comment='' AND
  dclocal_read_repair_chance=0.100000 AND
  gc_grace_seconds=864000 AND
  index_interval=128 AND
  read_repair_chance=0.000000 AND
  replicate_on_write='true' AND
  populate_io_cache_on_flush='false' AND
  default_time_to_live=0 AND
  speculative_retry='99.0PERCENTILE' AND
  memtable_flush_period_in_ms=0 AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};

CREATE INDEX underlying_data_ts_idx ON underlying_data (ts);

cqlsh:timeseries>

I have a C# function:

public void InsertUnderlying(long timestamp, string symbol, decimal bid, decimal ask)
        {
            var batchStmt = new BatchStatement();
            var v2Insert = new SimpleStatement("insert into underlying_data " +
                "(ts, symbol, bid, ask) values(?, ?, ?, ?);");
            batchStmt.Add(v2Insert.Bind(timestamp, symbol, bid, ask));

            session.Execute(batchStmt);
        }

I call this function in realtime to add data. However, when I do a query from the CQL,

cqlsh:timeseries> select * from underlying_data;

I only see one row even though I have called this function many many times. Not sure how I append data instead of overwriting it?

Upvotes: 2

Views: 176

Answers (1)

Aaron
Aaron

Reputation: 57748

In Cassandra, primary keys are unique. Your table underlying_data is only keyed on the symbol column:

PRIMARY KEY ((symbol))

That means all inserts for a particular symbol will overwrite each other:

INSERT INTO underlying_data (symbol, ts, ask, bid) VALUES ('SPX',1412102636,3.1,4.0);
INSERT INTO underlying_data (symbol, ts, ask, bid) VALUES ('SPX',1412102708,3.0,4.4);
INSERT INTO underlying_data (symbol, ts, ask, bid) VALUES ('SPX',1412102731,2.1,5.0);

SELECT * FROM underlying_data;

 symbol | ts         | ask | bid
--------+------------+-----+-----
    SPX | 1412102731 | 2.1 | 5.0

To store each INSERT, add ts to your primary key definition:

PRIMARY KEY (symbol, ts)

Additionally, Cassandra does not differentiate between an INSERT and an UPDATE (essentially an "UPSERT"). While syntactically different, they both accomplish the same thing: storing column values for a specific key. This means that you can insert new records with an UPDATE, as well as update existing records with an INSERT. Ike Walker has a good blog posting that describes this: How to do an Upsert in Cassandra.

Upvotes: 3

Related Questions