Apache Cassandra is a scalable open-source distributed NoSQL database that manages large amounts of data across multiple distributed locations. It is recommended for read heavy use cases of huge volume of data.
Architecture
A Cassandra cluster can be visualized as a ring because internally it uses a consistent hashing algorithm to distribute data.
When a cluster of Cassandra nodes is created, the data is dynamically partitioned across the cluster so that every node in the cluster holds a proportionate range of data.
If the data volume grows, a new node is added to the cluster and the ring arranges itself automatically so that each node in the cluster again holds an equal proportion of data.
This is why Cassandra can scale incrementally with the ability to dynamically partition data across nodes.
When a node joins the Cassandra ring, it is assigned a token range that determines its position in the cluster.
Each node is responsible for the range of token values and the associated data assigned to that token range.
Cassandra is a distributed system that implements a peer-to-peer architecture. It uses a gossip protocol to perform internal communication. There is no master node point of failure, so every node is able to handle both reads and writes.
Cassandra requires a keyspace to store the data which is like a schema in RDBMS and table is similar to a table in RDBMS.
Cassandra Query Language (cql) is mostly similar to sql. But Query filtering can be done only on primary key index on the same order, for every other queries, new secondary index is required.
First member of the primary key list in cassandra is used as partitioning key which determines on which node the row gets stored.
The rest of the primary key is used as a clustering key which determines the order in which data needs to be stored.
This default ordering can be changed by specifying the clustering order at the time of creating the table.
Cassandra supports collection data types such as list, map and set.
These collections helps in modelling tables to save space while storing the denormalized data.
Index on collections - For map collections, index can be created on keys, values or entries. For List and set index are more straight forward. The collection with index is queried using contains, contains key and query on a specific key.
Replication Strategies
SimpleStrategy - Use for a single data center replication only. In a simple strategy, cassandra usually replicates data to the next nodes in clockwise order.
NetworkTopologyStrategy - If the data replication is to be done across data centres or planned to be done across data centres, this strategy should be used so that replication is ensured across data centres to over come outages.
Write Consistency Levels - specifies the number of replicas on which the write must succeed before returning acknowledgement to the client.
ANY - write must be done on at least one node.
ALL - all nodes should successfully do the write - results in lower availability.
ONE, TWO, QUOROM are other main options.
Read Consistency Levels - specifies the number of replicas to be queries before returning the most recent result.
For example read consistency level - ALL - will fail if even one node did not respond with data where as read consistency level of one will return data from the first queried node with data.
Read consistency also have similar consistency level properties.
In cassandra, tables should be modelled based on the queries to be executed, data is stored in denormalized way and if there is a need, replicated in different tables for meeting the query requirement. Data need to stored in sort order and cant be done with queries.
Materialized views
In Cassandra, the table is defined by the query requirement.
Sometimes there would be requirement for multiple queries and secondary index may not be useful for the use case.
Secondary index is recommended to be used only when the cardinality of the secondary index column is high. If all the records are unique, then the secondary index will not be useful.
In such scenarios, a Materialized view can be used which would replicate the data from original table with new primary key combination that helps in querying the second column. There is around 10% write overhead while using materialized views.
Committing data in cassandra
Commit logs - is an append only log. all data is first written to commit logs in the order in which it is received.
MemTables - Data is also written to MemTables. Acts as a write back cache - which keeps the data in memory and serves read/write and keep syncing to the DB. If cache fails due to restart, the data is read from commit logs to sync. helps in achieving low latency and fast reads/writes. Rows maintain sorted order.
SSTables - When MemTables reaches a threshold, data is written to SSTables (SortedSet Tables in SSDs). Data written on SSTables are immutable and can't be removed.
Bloom filter
When a node in cassandra receives a query, bloom filter is used to determine which SSTable has the data.
Bloom filter is a datatructure used to test if an element is part of a dataset.
Bloom filter may sometimes return true even when entity is not present but always return false if entity is not present. But bloom filter is fast and requires less memory.
So this is an efficient way of narrowing down the SSTables required to be searched for data.
Tombstones
Data written in SSTables are not deleted but marked as deleted using tombstones. This is a write operation and if not synced there could be ghost data till the replica is synced. Data marked with tombstones are freed using a process called compaction.
Compaction
During compaction, new SSTables are created with the relevant data and the old SSTables will get eventually purged java garbbage collector.
Creating a keyspace and table in cassandra - sample commands
https://www.datastax.com/try-it-out can be leveraged to run cassandra query language (cql) on cqlsh directly from your browser
cqlsh> CREATE KEYSPACE perfmonitor
... WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
cqlsh> USE perfmonitor;
cqlsh:perfmonitor> CREATE TABLE app_instance (
... app_id int,
... app_name varchar,
... proc_id varchar,
... host_id varchar,
... os_priority int,
... cpu_time int,
... num_io_ops int,
... PRIMARY KEY (host_id, proc_id)
... )
... WITH CLUSTERING ORDER BY (proc_id DESC);
cqlsh:perfmonitor> Describe app_instance;
qlsh:perfmonitor> insert into app_instance
... (app_id, host_id, proc_id, app_name,os_priority,cpu_time,num_io_ops)
... values
... (1,'Host1','Proc1','App1',90,145,250);
cqlsh:perfmonitor>
cqlsh:perfmonitor> insert into app_instance
... (app_id, host_id, proc_id, app_name,os_priority,cpu_time,num_io_ops)
... values
... (2,'Host2','Proc2','App2',60,155,550);
cqlsh:perfmonitor>
cqlsh:perfmonitor> Select * from app_instance;
host_id | proc_id | app_id | app_name | cpu_time | num_io_ops | os_priority
---------+---------+--------+----------+----------+------------+-------------
Host2 | Proc2 | 2 | App2 | 155 | 550 | 60
Host1 | Proc1 | 1 | App1 | 145 | 250 | 90
Query filtering can be done only on primary key index on the same order, for every other queries, new secondary index is required.
cqlsh:perfmonitor> Select * from app_instance where host_id = 'Host1';
host_id | proc_id | app_id | app_name | cpu_time | num_io_ops | os_priority
---------+---------+--------+----------+----------+------------+-------------
Host1 | Proc1 | 1 | App1 | 145 | 250 | 90
(1 rows)
cqlsh:perfmonitor> Select * from app_instance where host_id = 'Host1' and proc_id = 'Proc1';
host_id | proc_id | app_id | app_name | cpu_time | num_io_ops | os_priority
---------+---------+--------+----------+----------+------------+-------------
Host1 | Proc1 | 1 | App1 | 145 | 250 | 90
(1 rows)
cqlsh:perfmonitor> Select * from app_instance where proc_id = 'Proc1';
InvalidRequest: Error from server: code=2200 .............
cqlsh:perfmonitor> select * from app_instance where app_name ='App1';
InvalidRequest: Error from server: code=2200 .............
cqlsh:perfmonitor> Create index appname_idx on app_instance(app_name);
cqlsh:perfmonitor> select * from app_instance where app_name ='App1';
host_id | proc_id | app_id | app_name | cpu_time | num_io_ops | os_priority
---------+---------+--------+----------+----------+------------+-------------
Host1 | Proc1 | 1 | App1 | 145 | 250 | 90
(1 rows)
cqlsh:perfmonitor>
Create table samples using collections
cqlsh:perfmonitor> CREATE TABLE devices (
id uuid,
device_name text,
ip_address set<text>,
location map<text, text>,
installation_date date,
installation_year int,
manufacturer text,
serial_number text,
PRIMARY KEY (id));
cqlsh:perfmonitor> Insert into devices
(id, device_name, ip_address, location, installation_date, installation_year, manufacturer, serial_number)
Values
(uuid(), 'Server3', {'192.168.0.4'}, {'data center':'DC2', 'rack':'Rack3'}, '2016-12-10', 2016, 'Acme', 'SN7891852');
cqlsh:perfmonitor> Insert into devices
(id, device_name, ip_address, location, installation_date, installation_year, manufacturer, serial_number)
Values
(uuid(), 'Server1', {'192.168.0.1'}, {'data center':'DC1', 'rack':'Rack1'}, '2015-01-20', 2015, 'Acme', 'SN12345');
cqlsh:perfmonitor> Insert into devices
(id, device_name, ip_address, location, installation_date, installation_year, manufacturer, serial_number)
Values
(uuid(), 'Server2', {'192.168.0.2','192.168.0.3'}, {'data center':'DC1', 'rack':'Rack1'}, '2016-02-10', 2016, 'Acme', 'SN32415746');
cqlsh:perfmonitor> select * from devices;
id | device_name | installation_date | installation_year | ip_address | location | manufacturer | serial_number
--------------------------------------+-------------+-------------------+-------------------+--------------------------------+-----------------------------------------+--------------+---------------
cfd4f71b-fc8c-4566-9e74-7c475969d284 | Server2 | 2016-02-10 | 2016 | {'192.168.0.2', '192.168.0.3'} | {'data center': 'DC1', 'rack': 'Rack1'} | Acme | SN32415746
1b63f842-3f9e-4223-85b9-5dd4c8e9b72e | Server3 | 2016-12-10 | 2016 | {'192.168.0.4'} | {'data center': 'DC2', 'rack': 'Rack3'} | Acme | SN7891852
45b9c6c0-9789-44c3-adbc-61c42f1a146e | Server1 | 2015-01-20 | 2015 | {'192.168.0.1'} | {'data center': 'DC1', 'rack': 'Rack1'} | Acme | SN12345
(3 rows)
cqlsh:perfmonitor> create index testIndex on devices(location);
cqlsh:perfmonitor> select * from devices where location contains 'DC1';
id | device_name | installation_date | installation_year | ip_address | location | manufacturer | serial_number
--------------------------------------+-------------+-------------------+-------------------+--------------------------------+-----------------------------------------+--------------+---------------
cfd4f71b-fc8c-4566-9e74-7c475969d284 | Server2 | 2016-02-10 | 2016 | {'192.168.0.2', '192.168.0.3'} | {'data center': 'DC1', 'rack': 'Rack1'} | Acme | SN32415746
45b9c6c0-9789-44c3-adbc-61c42f1a146e | Server1 | 2015-01-20 | 2015 | {'192.168.0.1'} | {'data center': 'DC1', 'rack': 'Rack1'} | Acme | SN12345
(2 rows)