Thursday, 31 May 2012

Integrating Cassandra data nodes in a Hadoop cluster


1. Networking setup

Make sure the machines are able to reach each other on the network. Also update /etc/hosts on all machines. For example in our setup we use:
# /etc/hosts on all nodes
192.168.1.96 master
192.168.1.97 slave1
192.168.1.98 slave2
192.168.1.99 slave3

2. Java

Install java 6 if not already installed.
$ sudo add-apt-repository "deb http://archive.canonical.com/ubuntu maverick partner"
$ sudo apt-get update
$ sudo apt-get install sun-java6-jre

3. Hadoop installation

Add Cloudera CDH3 apt repository.
$ sudo add-apt-repository "deb http://archive.cloudera.com/debian maverick-cdh3 contrib"
$ wget -O - http://archive.cloudera.com/debian/archive.key | sudo apt-key add -
Install Hadoop.
On master:
$ sudo apt-get install hadoop-0.20-{namenode,datanode,jobtracker,tasktracker}
On slaves:
$ sudo apt-get install hadoop-0.20-{datanode,tasktracker}
 
This will create: hadoop group, hdfs and mapred users. The namenode and datanode daemons will run use hdfs user. The jobtracker and tasktracker daemons will use mapred user.
You can find Clouderaís patches for hadoop here: /usr/lib/hadoop-0.20/cloudera/patches

4. Hadoop configuration.

Configure hadoop on all machines:
Edit core-site.xml
 <property>
   <name> fs.default.name </name>
   <value> hdfs://master:8020 </value>
 </property>
Edit hdfs-site.xml
 <property>
   <name> dfs.replication </name>
   <value> 2 </value>
 </property>
Edit mapred-site.xml
 <property>
   <name> mapred.job.tracker </name>
   <value> master:8021 </value>
 </property>
Stating the cluster is done is 2 steps:
Step1: Starting HDFS
The namenode daemon must be started on master:
$ sudo service hadoop-0.20-namenode start
Then datanode daemons must be started on all slaves (in our setup: master, slave1, slave2 and slave3):
$ sudo service hadoop-0.20-datanode start
Check the success or failure by inspecting the logs on master and slaves:
Check namenode log on master
$ sudo less /var/log/hadoop-0.20/hadoop-hadoop-namenode-demo.log
Check datanode logs on all slaves
$ sudo less /var/log/hadoop-0.20/hadoop-hadoop-datanode-demo.log
Step2: Starting MapReduce
The jobtracker daemon must be started on master:
$ sudo service hadoop-0.20-jobtracker start
Then tasktracker daemons must be started on all slaves (in our setup: master, slave1, slave2 and slave3):
$ sudo service hadoop-0.20-tasktracker start
Check the success or failure by inspecting the logs on master and slaves:
Check jobtracker log on master
$ sudo less /var/log/hadoop-0.20/hadoop-hadoop-jobtracker-demo.log
Check tasktracker logs on all slaves
$ sudo less /var/log/hadoop-0.20/hadoop-hadoop-tasktracker-demo.log

5. Install Cassandra

You must do this on all cassandra nodes. A common practice is to install cassandra on every hadoop datanode. So every hadoop datanode will also be a cassandra node.
Import cassandraís apt repository key

gpg --keyserver pgp.mit.edu --recv-keys F758CE318D77295D
gpg --export --armor F758CE318D77295D | sudo apt-key add -

Add cassandraís apt repository entries in /etc/apt/sources.list and install cassandra.

$ sudo add-apt-repository "deb http://www.apache.org/dist/cassandra/debian unstable main"
$ sudo apt-get update
$ sudo apt-get install cassandra

6. Configure Cassandra

Edit cassandra.yaml on all nodes, replace hostname_or_ip with the hostname or the ip of the node.
listen_address: 
rpc_address: 
After you configured all the nodes you can check cassandra cluster status using:
$ /usr/bin/nodetool -host  ring
Replace with the hostname or ip of a node. If you properly configured the cluster then you shall see a list of all the nodes from the cluster along with their assigned tokens.

7. Balance Cassandra Cluster

If you add nodes to your cluster your ring will be unbalanced and only way to get perfect balance is to compute new tokens for every node and assign them to each node manually by using nodetool move command.
Here's a python program which can be used to calculate new tokens for the nodes. 
def tokens(nodes):
 for x in xrange(nodes):
  print 2 ** 127 / nodes * x 
There's also nodetool loadbalance which is essentially a convenience over decommission + bootstrap, only instead of telling the target node where to move on the ring it will choose its location based on the same heuristic as Token selection on bootstrap. You should not use this as it doesn't rebalance the entire ring.
The status of move and balancing operations can be monitored using nodetool with the streams argument.
 /usr/bin/nodetool -host  move 

Cassandra with hadoop

Cluster Configuration


To configure a Cassandra cluster yourself so that Hadoop may operate over its data, it's best to overlay a Hadoop cluster over your Cassandra nodes.

You'll want to have a separate server for your Hadoop NameNode/JobTracker

Then install a Hadoop TaskTracker and hadoop datanode on each of your Cassandra nodes.
 That will allow the JobTracker to assign tasks to the Cassandra nodes that contain data for those tasks.

 Hadoop requires a distributed filesystem for copying dependency jars, static data, and intermediate results to be stored.
The nice thing about having a TaskTracker on every node is that you get data locality and your analytics engine scales with your data.

You also never need to shuttle around your data once you've performed analytics on it - you simply output to Cassandra and you are able to access that data with high random-read performance. Note that Cassandra implements the same interface as HDFS to achieve data locality.
A note on speculative execution: you may want to disable speculative execution for your hadoop jobs that either read or write to Cassandra. This isn't required, but may be helpful to reduce unnecessary load.


One configuration note on getting the task trackers to be able to perform queries over Cassandra: you'll want to update your HADOOP_CLASSPATH in your <hadoop>/conf/hadoop-env.sh to include the Cassandra lib libraries. For example you'll want to do something like this in the hadoop-env.sh on each of your task trackers:
export HADOOP_CLASSPATH=/opt/cassandra/lib/*:$HADOOP_CLASSPATH

Virtual Datacenter


One thing that many have asked about is whether Cassandra with Hadoop will be usable from a random access perspective. For example, you may need to use Cassandra for serving web latency requests. You may also need to run analytics over your data.

In Cassandra 0.7+ there is the NetworkTopologyStrategy which allows you to customize your cluster's replication strategy by datacenter. What you can do with this is create a 'virtual datacenter' to separate nodes that serve data with high random-read performance from nodes that are meant to be used for analytics.

You need to have a snitch configured with your topology and then according to the datacenters defined there (either explicitly or implicitly), you can indicate how many replicas you would like in each datacenter.

You would install task trackers on nodes in your analytics section and make sure that a replica is written to that 'datacenter' in your NetworkTopologyStrategy configuration. The practical upshot of this is your analytics nodes always have current data and your high random-read performance nodes always serve data with predictable performance. 

For using PIG
  • Set the HADOOP_HOME environment variable to <hadoop_dir>, e.g. /opt/hadoop or /etc/hadoop
  • Set the PIG_CONF environment variable to <hadoop_dir>/conf
  • Set the JAVA_HOME

Hadoop/Cassandra Cluster Configuration

The recommended cluster configuration essentially overlays Hadoop over Cassandra. This involves installing a Hadoop TaskTracker on each Cassandra node. Also – and this is important – one server in the cluster should be dedicated to the following Hadoop components:
  • JobTracker
  • datanode
  • namenode
This dedicated server is required because Hadoop uses HDFS to store JAR dependencies for your job, static data, and other required information. In the overall context of your cluster, this is a very small amount of data, but it is critical to running a MapReduce job.
Apache Cassandra Hadoop MapReduce
Hadoop TaskTrackers and Cassandra Nodes
Running a Hadoop TaskTracker on a Cassandra node requires you to update the HADOOP_CLASSPATH in <hadoop>/conf/hadoop-env.sh to include the Cassandra libraries. For example, add an entry like the following in the hadoop-env.sh on each of the task tracker nodes: 
export HADOOP_CLASSPATH=/opt/cassandra/lib/*:$HADOOP_CLASSPATH
When a Hadoop TaskTracker runs on the same servers as the Cassandra nodes, each TaskTracker is sent tasks only for data belonging to the token range in the local Cassandra node. 

This allows tremendous gains in efficiency and processing times, as the Cassandra nodes receive only queries for which they are the primary replica, avoiding the overhead of the Gossip protocol.

Handling Input and Output from Cassandra

The class org.apache.cassandra.hadoop.ColumnFamilyInputFormat allows you to read data stored in Cassandra from a Hadoop MapReduce job, and its companion class org.apache.cassandra.hadoop.ColumnFamilyOutputFormat allows you to write the results back into Cassandra. These two classes should be properly set in your code as the format class for input and/or output:

job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
In the MapReduce job, Cassandra rows or row fragments (pairs of key + SortedMap of columns) can be input to Map tasks for processing, as specified by a SlicePredicate that describes which columns to fetch from each row. For example:
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);



Cassandra : On Live

live demo

To insert some data and make some queries, open another terminal window and start irb, the Ruby shell:
irb
In the irb prompt, require the Ruby client library:
require 'rubygems'
require 'cassandra'
include SimpleUUID
Now instantiate a client object:
twitter = Cassandra.new('Twitter')
Let’s insert a few things:
user = {'screen_name' => 'buttonscat'}
twitter.insert(:Users, '5', user)

tweet1 = {'text' => 'Nom nom nom nom nom.', 'user_id' => '5'}
twitter.insert(:Statuses, '1', tweet1)

tweet2 = {'text' => '@evan Zzzz....', 'user_id' => '5', 'reply_to_id' => '8'}
twitter.insert(:Statuses, '2', tweet2)
Notice that the two status records do not have all the same columns. Let’s go ahead and connect them to our user record:
twitter.insert(:UserRelationships, '5', {'user_timeline' => {UUID.new => '1'}})
twitter.insert(:UserRelationships, '5', {'user_timeline' => {UUID.new => '2'}})
The UUID.new call creates a collation key based on the current time; our tweet ids are stored in the values.
Now we can query our user’s tweets:
timeline = twitter.get(:UserRelationships, '5', 'user_timeline', :reversed => true)
timeline.map { |time, id| twitter.get(:Statuses, id, 'text') }
# => ["@evan Zzzz....", "Nom nom nom nom nom."]
Two tweet bodies, returned in recency order—not bad at all. In a similar fashion, each time a user tweets, we could loop through their followers and insert the status key into their follower’s home_timeline relationship, for handling general status delivery.

the data model

Cassandra is best thought of as a 4 or 5 dimensional hash. The usual way to refer to a piece of data is as follows: a keyspace, a column family, a key, an optional super column, and a column. At the end of that chain lies a single, lonely value.
Let’s break down what these layers mean.
  • Keyspace (also confusingly called “table”): the outer-most level of organization. This is usually the name of the application. For example, 'Twitter' and 'Wordpress' are both good keyspaces. Keyspaces must be defined at startup in the storage-conf.xml file.
  • Column family: a slice of data corresponding to a particular key. Each column family is stored in a separate file on disk, so it can be useful to put frequently accessed data in one column family, and rarely accessed data in another. Some good column family names might be :Posts, :Users and :UserAudits. Column families must be defined at startup.
  • Key: the permanent name of the record. You can query over ranges of keys in a column family, like :start => '10050', :finish => '10070'—this is the only index Cassandra provides for free. Keys are defined on the fly.
After the column family level, the organization can diverge—this is a feature unique to Cassandra. You can choose either:
  • A column: this is a tuple with a name and a value. Good columns might be 'screen_name' => 'lisa4718' or 'Google' => 'http://google.com'.It is common to not specify a particular column name when requesting a key; the response will then be an ordered hash of all columns. For example, querying for (:Users, '174927')might return:
    {'name' => 'Lisa Jones',
     'gender' => 'f',
     'screen_name' => 'lisa4718'}
    In this case, name, gender, and screen_name are all column names. Columns are defined on the fly, and different records can have different sets of column names, even in the same keyspace and column family. This lets you use the column name itself as either structure or data. Columns can be stored in recency order, or alphabetical by name, and all columns keep a timestamp.
  • A super column: this is a named list. It contains standard columns, stored in recency order.Say Lisa Jones has bookmarks in several categories. Querying (:UserBookmarks, '174927')might return:
    {'work' => {
        'Google' => 'http://google.com',
        'IBM' => 'http://ibm.com'},
     'todo': {...},
     'cooking': {...}}
    Here, work, todo, and cooking are all super column names. They are defined on the fly, and there can be any number of them per row. :UserBookmarks is the name of the super column family. Super columns are stored in alphabetical order, with their sub columns physically adjacent on the disk.
Super columns and standard columns cannot be mixed at the same (4th) level of dimensionality. You must define at startup which column families contain standard columns, and which contain super columns with standard columns inside them.
Super columns are a great way to store one-to-many indexes to other records: make the sub column names TimeUUIDs (or whatever you’d like to use to sort the index), and have the values be the foreign key. We saw an example of this strategy in the demo, above.
If this is confusing, don’t worry. We’ll now look at two example schemas in depth.

twitter schema

Here is the schema definition we used for the demo, above. It is based on Eric Florenzano’s Twissandra, but updated for 0.7:
{"Twitter":{
    "Users":{
      "comparator_type":"org.apache.cassandra.db.marshal.UTF8Type",
      "column_type":"Standard"},
    "Statuses":{
      "comparator_type":"org.apache.cassandra.db.marshal.UTF8Type",
      "column_type":"Standard"},
    "StatusRelationships":{
      "subcomparator_type":"org.apache.cassandra.db.marshal.TimeUUIDType",
      "comparator_type":"org.apache.cassandra.db.marshal.UTF8Type",
      "column_type":"Super"},
}}
You can load a schema with this command (replace schema.json with your own filename):
bin/cassandra-cli --host localhost --batch < schema.json
The server must be running; as of version 0.7, Cassandra supports updating the schema at runtime.
What could be in StatusRelationships? Maybe a list of users who favorited the tweet? Having a super column family for both record types lets us index each direction of whatever many-to-many relationships we come up with.
Here’s how the data is organized:
Click to enlarge
Cassandra lets you distribute the keys across the cluster either randomly, or in order, via the Partitioner option in the storage-conf.xml file.
For the Twitter application, if we were using the order-preserving partitioner, all recent statuses would be stored on the same node. This would cause hotspots. Instead, we should use the random partitioner.
Alternatively, we could preface the status keys with the user key, which has less temporal locality. If we used user_id:status_id as the status key, we could do range queries on the user fragment to get tweets-by-user, avoiding the need for a user_timeline super column.

multi-blog schema

Here’s a another schema, suggested to me by Jonathan Ellis, the primary Cassandra maintainer. It’s for a multi-tenancy blog platform:
{"Multiblog":{
    "Blogs":{
      "comparator_type":"org.apache.cassandra.db.marshal.TimeUUIDType",
      "column_type":"Standard"},
    "Comments":{
      "comparator_type":"org.apache.cassandra.db.marshal.TimeUUIDType",
      "column_type":"Standard"}
  },}
Imagine we have a blog named 'The Cutest Kittens'. We will insert a row when the first post is made as follows:
require 'rubygems'
require 'cassandra/0.7'
include SimpleUUID

multiblog = Cassandra.new('Multiblog')

multiblog.insert(:Blogs, 'The Cutest Kittens',
  { UUID.new =>
    '{"title":"Say Hello to Buttons Cat","body":"Buttons is a cute cat."}' })
UUID.new generates a unique, sortable column name, and the JSON hash contains the post details. Let’s insert another:
multiblog.insert(:Blogs, 'The Cutest Kittens',
  { UUID.new =>
    '{"title":"Introducing Commie Cat","body":"Commie is also a cute cat"}' })
Now we can find the latest post with the following query:
post = multiblog.get(:Blogs, 'The Cutest Kittens', :reversed => true).to_a.first
On our website, we can build links based on the readable representation of the UUID:
guid = post.first.to_guid
# => "b06e80b0-8c61-11de-8287-c1fa647fd821"
If the user clicks this string in a permalink, our app can find the post directly via:
multiblog.get(:Blogs, 'The Cutest Kittens', :start => UUID.new(guid), :count => 1)
For comments, we’ll use the post UUID as the outermost key:
multiblog.insert(:Comments, guid,
  {UUID.new => 'I like this cat. - Evan'})
multiblog.insert(:Comments, guid,
  {UUID.new => 'I am cuter. - Buttons'})
Now we can get all comments (oldest first) for a post by calling:
multiblog.get(:Comments, guid)
We could paginate them by passing :start with a UUID. See this presentation to learn more about token-based pagination.
We have sidestepped two problems with this data model: we don’t have to maintain separate indexes for any lookups, and the posts and comments are stored in separate files, where they don’t cause as much write contention. Note that we didn’t need to use any super columns, either.

storage layout and api comparison

The storage strategy for Cassandra’s standard model is the same as BigTable’s. Here’s a comparison chart:

multi-file per-file intra-file
Relational server database table* primary key column value

BigTable cluster table column family key column name column value
Cassandra, standard model cluster keyspace column family key column name column value
Cassandra, super column model
* With fixed column names.
Column families are stored in column-major order, which is why people call BigTable a column-oriented database. This is not the same as a column-oriented OLAP database like Sybase IQ—it depends on whether your data model considers keys to span column families or not.
Click to enlarge
In row-orientation, the column names are the structure, and you think of the column families as containing keys. This is the convention in relational databases.
Click to enlarge
In column-orientation, the column names are the data, and the column families are the structure. You think of the key as containing the column family, which is the convention in BigTable. (In Cassandra, super columns are also stored in column-major order—all the sub columns are together.)
In Cassandra’s Ruby API, parameters are expressed in storage order, for clarity:
Relational SELECT `column` FROM `database`.`table` WHERE `id` = key;
BigTable table.get(key, "column_family:column")
Cassandra: standard model keyspace.get("column_family", key, "column")
Cassandra: super column model
Note that Cassandra’s internal Thrift interface mimics BigTable in some ways, but this is being changed.

Configuration Settings for cassandra

Edit configuration files

Cassandra configuration files can be found in conf directory under the top directory of binary and source distributions. If you have installed cassandra from RPM packages, configuration files will be placed into /etc/cassandra/conf.

Step 1: Edit cassandra.yaml

The distribution's sample configuration conf/cassandra.yaml contains reasonable defaults for single node operation, but you will need to make sure that the paths exist for data_file_directories, commitlog_directory, and saved_caches_directory.
Verify storage_port and rpc_port are not conflict with other service on your computer. By default, Cassandra uses 7000 for storage_port, and 9160 for rpc_port. The storage_port must be identical between Cassandra nodes in a cluster. Cassandra client applications will use rpc_port to connect to Cassandra.
It will be a good idea to change cluster_name to avoid unnecessary conflict with existing clusters.
initial_token. You can leave it blank, but I recommend you to set it to 0 if you are configuring your first node.

Step 2: Edit log4j-server.properties

conf/log4j.properties contains a path for the log file. Edit the line if you need.
# Edit the next line to point to your logs directory
log4j.appender.R.File=/var/log/cassandra/system.log

Step 3: Edit cassandra-env.sh

Cassandra has JMX (Java Management Extensions) interface, and the JMX_PORT is defined in conf/cassandra-env.sh. Edit following line if you need.
# Specifies the default port over which Cassandra will be available for
# JMX connections.
JMX_PORT="7199"
By default, Cassandra will allocate memory based on physical memory your system has. For example it will allocate 1GB heap on 2GB system, and 2GB heap on 8GB system. If you want to specify Cassandra heap size, remove leading pound sign(#) on the following lines and specify memory size for them.
#MAX_HEAP_SIZE="4G"
#HEAP_NEWSIZE="800M"
If you are not familiar with Java GC, 1/4 of MAX_HEAP_SIZE may be a good start point for HEAP_NEWSIZE.
Cassandra will need more than few GB heap for production use, but you can run it with smaller footprint for test drive. If you want to assign 128MB as max, edit the lines as following.
MAX_HEAP_SIZE="128M"
HEAP_NEWSIZE="32M"
If you face OutOfMemory exceptions or massive GCs with this configuration, increase these values. Don't start your production service with such tiny heap configuration!


About NOSQL

.

What are No SQL Databases

No SQL databases are the data stores that are non-relational (without any fixed schemas & joins), distributed, horizontally scalable and often don’t adhere to the principles (ACID: atomicity, consistency, isolation, durability) of traditional relational databases.
NoSQL.jpg
Based on type of data storage, No SQL databases are broadly classified as:
  • Column family (Wide column) stores
  • Graph databases
  • Key value/tulip store
  • Document stores
What is JSON
JSON (stands for JavaScript Object Notation) is a lightweight and highly portable data-interchange format. JSON is intuitive to the web as well as the browser. Interoperability with any/all platforms in the current market can be easily achieved using JSON message format. 
According to JSON.org (www.json.org):
“JSON is built on two structures:

·         A collection of name/value pairs. In various languages, this is realized as an object, record, dictionary, structure, keyed list, hash table or associative array.
·         An ordered list of values. In most languages, this is realized as an array, list, vector, or sequence.  

These are universal data structures. Virtually all modern programming languages support them in one form or another. It makes sense that a data format that is interchangeable with programming languages also be based on these structures.”

A typical JSON syntax is as follows:


  • Data is represented in the form of name-value pairs.
  • A name value pair is comprised of a “Member Name” in double quotes, followed by colon “:” and the value in double quotes
  • Each data member (Name-Value pair) is separated by comma
  • Objects are held with-in curly (“{ }”) brackets.
  • Arrays are held with-in square (“[ ]”) brackets.
JSON Example:
{"Author": 
      {
    "First Name": "Phani Krishna",
    "Last Name":  "Kollapur Gandla",
    "Contact": {
        "URL": “in.linkedin.com/in/phanikrishnakollapurgandla",
        "Mail ID": "<a href="mailto:phanikrishna_gandla@xyz.com">phanikrishna_gandla@xyz.com</a>"}
       }
}
JSON and XML  
JSON is significantly like XML:
•  JSON is plain text data format
•  JSON is human readable and self-describing
•  JSON is categorized (contains values within values)
•  JSON can be parsed by scripting languages like Java script
•  JSON data is supported and transported using AJAX
JSON vs. XML
Though JSON & XML are both data formats, JSON has the upper hand over XML because of the following reasons:
•  JSON is lighter compared to XML (No unnecessary/additional tags in JSON)
•  JSON is easier to read and understand by humans.
•  JSON is easier to parse and generate for machines.
•  For AJAX related applications, JSON is quite faster compared to XML
Several No SQL products have provided built-in capabilities/readily available tools for loading data from JSON file format.  Below is the list of import/export utilities for some of the widely held No SQL products

Product
Import/Export Utilities for JSON
Cassandra
json2sstable -> JSON to Cassandra data structure
sstable2JSON -> Cassandra data structure to JSON
MongoDB
mongoimport -> JSON/CSV/TSV to MongoDB data structure
mongoexport -> MongoDB data structure to JSON/CSV
CouchDB
tools/load.py-> JSON to CouchDB data structure
tools/dump.py -> CouchDB data structure to JSON
Riak
bucket_importer:import_data-> JSON to Riak data structure
bucket_exporter:EXport_data -> Riak data structure to JSON






Cassandra Data Model

Understanding Cassandra data model
The Cassandra data model is premeditated for highly distributed and large scale data. It trades off the customary database guidelines (ACID compliant) for important benefits in operational manageability, performance and availability.
An illustration of how a Cassandra data model would like is as below:
Cassandra.jpg
The basic elements of the Cassandra data model are as follows:
•  Column
•  Super Column
•  Column Family
•  Keyspace
•  Cluster
Column: A column is the basic unit of Cassandra data model. A column comprises of name, value and a time stamp (by default). An example of column in JSON format is as follows:

{ // Example of Column
  "name": "EmployeeID",
  "value": "01234",
  "timestamp": 123456789
}

Super Column: A Super column is a dictionary of boundless number of columns, identified by the column name. An example of super column in JSON format is as follows:

{ // Example of Super Column
  "name": "designation",
  "value": {
"role" : {"name": "role", "value": "Architect", "timestamp": 123456789},
"band" : {"name": "band", "value": "6A", "timestamp": 123456789}
} 


The major differences between a column and a super column are:
•  Column’s value is a string but the super column’s value is a record of columns
•  A super column doesn’t include any time stamp (only terms name & value).
Note: Cassandra does not index sub columns, so when a super column is loaded into memory; all of its columns are loaded as well.
Column Family (CF):  A column family resembles an RDBMS table closely and is an assembly of ordered collection of rows which in-turn are ordered collection of columns.  A column family can be a “standard” or a “super” column family.

A row in a standard column family contains collections of name/value pairs whereas the row in a super column family(SCF) holds collections of super columns (group of sub columns).  An example for a column family is described below (in JSON):
Employee = { // Employee Column Family
   "01234" : {   // Row key  for Employee ID - 01234
        // Collection of name value pairs
        "EmpName" : "Jack",
        "mail" : "<a href="mailto:Jack@xyz.com">Jack@xyz.com</a>",
        "phone" : "9999900000"
        //There can be N number of columns
          }, 
   "01235" : {   // Row key  for Employee ID - 01235
        // Collection of name value pairs
        "EmpName" : "Jill",
        "mail" : "<a href="mailto:Jill@xyz.com">Jill@xyz.com</a>",
        "phone" : "9090909090"
        "VOIP" : "0404787829022",
        "OnsiteMail" : "<a href="mailto:jackandjill@abcdef.com">jackandjill@abcdef.com</a>"
    },
}


Note:  Each column would contain “Time Stamp” by default. For easier narration, time stamp is not included here.
The address of a value in a regular column family is a row key pointing to a column name pointing to a value, while the address of a value in a column family of type “super” is a row key pointing to a column name pointing to a sub column name pointing to a value. An example for Super column in JSON format is as follows:
ProjectsExecuted = { // Super column family
    "01234" :  {    // Row key  for Employee ID - 01235
  //Projects executed by the employee with ID - 01234
       "project1" : {"projcode" : "proj1", "start": "01012011", "end": "03082011", "location": "hyderabad"},
       "project2" : {"projcode" : "proj2", "start": "01042010", "end": "12122010", "location": "chennai"},
       "project3" : {"projcode" : "proj3", "start": "06062009", "end": "01012010", "location": "singapore"}
 
      //There can be N number of super columns

     }, 
   "01235" :  {    // Row key  for Employee ID - 01235
    //Projects executed by the employee with ID - 01235
       "projXYZ" : {"projcode" : "Cod1", "start": "01012011", "end": "03082011", "location": "bangalore"},
       "proj123" : {"projcode" : "Cod2", "start": "01042010", "end": "12122010", "location": "mumbai"},
     }, 
 }

Columns are always organized as per the Column‘s name within their rows. The data would be sorted as soon as it is inserted into the data model.


Keyspace:  A keyspace is the outmost grouping for data in Cassandra, closely resembling an RDBMS database. Similar to the relational database, a keyspace has title and properties that describe the keyspace demeanor. The keyspace is a container for a list of one or more column families (without any enforced association between them).


Cluster: Cluster is the outermost structure in Cassandra (also called as ring). Cassandra database is specially designed to be spread across several machines functioning together that act as a single occurrence to the end user. Cassandra allocates data to nodes in the cluster by arranging them in a ring.

Relational data model vs. Cassandra data model
Relational Data Model
Cassandra data model (Standard)
Cassandra data model (Super)
         Server
Cluster
         Database
Key space
         Table
Column Family
         Primary Key
Key
Column Value
Column Name
Super Column Name
Column Value
Column Name
Column Value
Unlike the traditional RDBMS, Cassandra doesn’t support
  • Query language like SQL (T-SQL, PL/SQL etc.). Cassandra provides an API called thrift through which the data could be accessed.
  • Referential Integrity (operations like cascading deletes are not available)
Designing Cassandra data structures
1. Entities – Point of Interest
The finest way to model a Cassandra data structure is to identify the entities on which most queries would be attentive and creating the entire structure around the entity. The activities performed (generally the use cases) by the user applications, how the data is retrieved and displayed would be the areas of interest for designing the Cassandra column families.
For example, a simple employee data model (in any RDMBS) would contain:
·         Employee
·         Employee contact details
·         Employee financial information
·         Employee role information
·         Employee attendance information
·         Employee projects
….
And so on…
Here “Employee” is the entity for point of interest and any application using this design would frame the queries relating to the employee.
EmpDataModel.jpg
2. De-normalization
Normalization is the set of rules established to aid in the design of tables and their relation-ships in any RDBMS. The benefits of normalizing would be:
•   Avoiding repetitive entries
•   Reduction of storage space required
•   Prevention of schema restructuring for future needs.
•   Improved speed and flexibility of SQL queries, joins, sorts, and search results.
Achieving the similar kind of performance for the growing data volume is a challenge in traditional relational data models and the companies could compromise on de-normalization to achieve performance. Cassandra does not support foreign key relationships like a relational database and the better way is to de-normalize the data model. The important fact is that instead of modeling the data first and framing the queries, with Cassandra the queries would be modeled and the data be framed around them.
3. Planning for Concurrent Writes
In Cassandra, every row within a column family is identified by the unique row key (generally a string of unlimited length). Unlike the traditional RDBMS primary key (which enforces uniqueness), Cassandra doesn’t impose uniqueness (Duplicate row key insertion might disturb the existing column structure). So the care must be taken to create the rows with unique row keys. Some of the ways for creating unique row keys is as follows:
•   Surrogate/ UUID type of row keys
•   Natural row keys

Data Migration approach (Using ETL)

MigrationApp.png
There are various ways of porting the data from relational data structures to Cassandra structures, but the migrations involving complex transformations and business validations might accommodate a data processing layer comprising ETL utilities.
In case of using in-built data loaders, the processed data can be extracted to flat files (in JSON format) and then uploaded to the Cassandra data structure’s using these loaders. Custom loaders could be fabricated in case of additional dispensation rules, which could either deal the data from the processed store or the JSON files.
The overall migration approach would be as follows:
  1. Data preparation as per the JSON file format.
  2. Data extractions into flat files as per the JSON file format or extraction of data from the processed data store using custom data loaders.
  3. Data loading using in-built or custom loaders into Cassandra data structure (s).

The various activities for all the different stages in migration are further discussed in detail in below sections.
Data Preparation and Extraction
  • ETL is the standard process for data extraction, transformation and loading
  • At the end of the ETL process, reconciliation forms an important part. This comprises validation of data with the business processes.
  • The ETL process also involves the validation and enrichment of the data before loading into staging tables.
DataPreparation.png
Data Preparation Activities:
The following activities will be executed during data preparation: 
  1.  Creation of database objects
    • Necessary staging tables are to be created as per the requirements based on which will resemble standard open interface / base table structure.
  2. Validate & Transform data before Load from the given source (Dumps/Flat files).
    •  Data Cleansing
      • Filter incorrect data as per the JSON file layout specifications.
      • Filter redundant data as per the JSON file layout specifications.
      • Eliminate obsolete data as per the JSON file layout specifications.
  3. Load data into staging area
  4. Data Enrichment
    • Default incomplete data
    • Derive missing data based on mapping or lookups
    • Differently structured data (1 record in as-is = multiple records in to-be)
Data Extraction Activities (into JSON files):
The following activities will be executed during data extraction into JSON file formats:
  1. Data Selection as per the JSON file layout
  2. Creation of SQL programs based on as the JSON file layout
    • Scripts or PLSQL programs are created based on the data mapping requirements and the ETL processes. These programs shall serve various purposes including the loading of data into staging tables and standard open interface tables.
  3. Data Transformation before extract as per the JSON files layout specification and mapping documents.
  4. Flat files in form of JSON format for data loading
Data Loading
Cassandra data structures can be accessed using different programing languages like (.net, Java, Python, Ruby etc.). Data can be directly loaded from the relational databases (like Access, SQL Server, Oracle, MySQL, IBM DB2, etc.) using these programing languages. Custom loaders could be used to load data into Cassandra data structure(s) based on the enactment rules, customization level and the kind of data processing.