Operations

Indexing

Let’s try and index some twitter like information (demo from Elasticsearch). First, let’s create a twitter user, and add some tweets (the twitter index will be created automatically, see automatic index and mapping creation in Elasticsearch documentation):

curl -XPUT -H 'Content-Type: application/json' 'http://localhost:9200/twitter/user/kimchy' -d '{ "name" : "Shay Banon" }'
curl -XPUT -H 'Content-Type: application/json' 'http://localhost:9200/twitter/tweet/1' -d '
{
    "user": "kimchy",
    "postDate": "2009-11-15T13:12:00",
    "message": "Trying out Elassandra, so far so good?"
}'
curl -XPUT -H 'Content-Type: application/json' 'http://localhost:9200/twitter/tweet/2' -d '
{
    "user": "kimchy",
    "postDate": "2009-11-15T14:12:12",
    "message": "Another tweet, will it be indexed?"
}'

You now have two rows in the Cassandra twitter.tweet table.

cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 2.1.8 | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh> select * from twitter.tweet;
 _id | message                                    | postDate                     | user
-----+--------------------------------------------+------------------------------+------------
   2 |     ['Another tweet, will it be indexed?'] | ['2009-11-15 15:12:12+0100'] | ['kimchy']
   1 | ['Trying out Elassandra, so far so good?'] | ['2009-11-15 14:12:00+0100'] | ['kimchy']
(2 rows)

Apache Cassandra is a column store that only support upsert operation. This means that deleting a cell or a row involves the creation of a tombstone (insert a null) kept until the compaction later removes both the obsolete data and the tombstone (See this blog about Cassandra tombstones).

By default, when using the Elasticsearch API to replace a document with a new one, Elassandra inserts a row corresponding to the new document including null for unset fields. Without these null (cell tombstones), old fields not present in the new document would be kept at the Cassandra level as zombie cells. If you store immutable data, you can set the index setting index.index_insert_only to true, to avoid the storage overhead generated by these tombstones.

Moreover, indexing with op_type=create (See Elasticsearch indexing ) requires a Cassandra PAXOS transaction to check if the document exists in the underlying datacenter. This comes with an unnecessary performance cost if you use an automatically generated document ID (See Automatic ID generation. ), as this ID will be the Cassandra primary key.

Depending on the op_type and document ID, CQL requests are issued as follows when indexing with the Elasticsearch API:

op_type Generated ID Provided ID Comment
create INSERT INTO … VALUES(…) INSERT INTO … VALUES(…) IF NOT EXISTS (1) Index a new document.
index INSERT INTO … VALUES(…) DELETE FROM … WHERE … INSERT INTO … VALUES(…) Replace a document that may already exists

(1) The IF NOT EXISTS comes with the cost of the PAXOS transaction. If you don’t need to check the uniqueness of the provided ID, add parameter check_unique_id=false.

GETing

Now, let’s see if the information was added by GETting it:

curl -XGET 'http://localhost:9200/twitter/user/kimchy?pretty=true'
curl -XGET 'http://localhost:9200/twitter/tweet/1?pretty=true'
curl -XGET 'http://localhost:9200/twitter/tweet/2?pretty=true'

Elasticsearch state now reflects the new twitter index. Because we are currently running on one node, the token_ranges routing attribute matches 100% of the ring from Long.MIN_VALUE to Long.MAX_VALUE.

curl -XGET 'http://localhost:9200/_cluster/state/?pretty=true'
{
  "cluster_name" : "Test Cluster",
  "version" : 5,
  "master_node" : "74ae1629-0149-4e65-b790-cd25c7406675",
  "blocks" : { },
  "nodes" : {
    "74ae1629-0149-4e65-b790-cd25c7406675" : {
      "name" : "localhost",
      "status" : "ALIVE",
      "transport_address" : "inet[localhost/127.0.0.1:9300]",
      "attributes" : {
        "data" : "true",
        "rack" : "RAC1",
        "data_center" : "DC1",
        "master" : "true"
      }
    }
  },
  "metadata" : {
    "version" : 3,
    "uuid" : "74ae1629-0149-4e65-b790-cd25c7406675",
    "templates" : { },
    "indices" : {
      "twitter" : {
        "state" : "open",
        "settings" : {
          "index" : {
            "creation_date" : "1440659762584",
            "uuid" : "fyqNMDfnRgeRE9KgTqxFWw",
            "number_of_replicas" : "1",
            "number_of_shards" : "1",
            "version" : {
              "created" : "1050299"
            }
          }
        },
        "mappings" : {
          "user" : {
            "properties" : {
              "name" : {
                "type" : "string"
              }
            }
          },
          "tweet" : {
            "properties" : {
              "message" : {
                "type" : "string"
              },
              "postDate" : {
                "format" : "dateOptionalTime",
                "type" : "date"
              },
              "user" : {
                "type" : "string"
              }
            }
          }
        },
        "aliases" : [ ]
      }
    }
  },
  "routing_table" : {
    "indices" : {
      "twitter" : {
        "shards" : {
          "0" : [ {
            "state" : "STARTED",
            "primary" : true,
            "node" : "74ae1629-0149-4e65-b790-cd25c7406675",
            "token_ranges" : [ "(-9223372036854775808,9223372036854775807]" ],
            "shard" : 0,
            "index" : "twitter"
          } ]
        }
      }
    }
  },
  "routing_nodes" : {
    "unassigned" : [ ],
    "nodes" : {
      "74ae1629-0149-4e65-b790-cd25c7406675" : [ {
        "state" : "STARTED",
        "primary" : true,
        "node" : "74ae1629-0149-4e65-b790-cd25c7406675",
        "token_ranges" : [ "(-9223372036854775808,9223372036854775807]" ],
        "shard" : 0,
        "index" : "twitter"
      } ]
    }
  },
  "allocations" : [ ]
}

Updates

In Cassandra, an update is an upsert operation (if the row does not exists, it’s an insert). As Elasticsearch, Elassandra issues a GET operation before any update. Then, to keep the same semantics as Elasticsearch, update operations are converted to upserts with the ALL consistency level. Thus, later GET operations are consistent. (You should consider the CQL UPDATE operation to avoid this performance cost)

Scripted updates, upsert (scripted_upsert and doc_as_upsert) are also supported.

Searching

Let’s find all the tweets that kimchy posted:

curl -XGET 'http://localhost:9200/twitter/tweet/_search?q=user:kimchy&pretty=true'

We can also use the JSON query language Elasticsearch provides instead of a query string:

curl -XGET 'http://localhost:9200/twitter/tweet/_search?pretty=true' -d '
{
    "query" : {
        "match" : { "user": "kimchy" }
    }
}'

To avoid duplicate results when the Cassandra replication factor is greater than one, Elassandra adds a token_ranges filter to every query distributed to all nodes. Because every document contains a _token fields computed at index-time, this ensures that a node only retrieves documents for the requested token ranges. The token_ranges parameter is a conjunction of Lucene NumericRangeQuery built from the Elasticsearch routing tables to cover the entire Cassandra ring. .. code:

curl -XGET 'http://localhost:9200/twitter/tweet/_search?pretty=true&token_ranges=(0,9223372036854775807)' -d '
{
    "query" : {
        "match" : { "user": "kimchy" }
    }
}'

Of course, if the token range filter covers all ranges (Long.MIN_VALUE to Long.MAX_VALUE), Elassandra automatically removes the useless filter.

Finally, you can restrict a query to the coordinator node with preference=_only_local parameter, for all token_ranges as shown below :

curl -XGET 'http://localhost:9200/twitter/tweet/_search?pretty=true&preference=_only_local&token_ranges=' -d '
{
    "query" : {
        "match" : { "user": "kimchy" }
    }
}'

Optimizing search requests

The search strategy

Elassandra supports various search strategies to distribute a search request over the Elasticsearch cluster. A search strategy is configured at index-level with the index.search_strategy_class dynamic parameter.

Strategy Description
org.elassandra.cluster.routing.PrimaryFirstSearchStrategy (Default) Search on all alive nodes in the datacenter. All alive nodes respond for their primary token ranges, and for replica token ranges when there are some unavailable nodes. This strategy is always used to build the routing table in the cluster state.
org.elassandra.cluster.routing.RandomSearchStrategy For each query, randomly distribute a search request to a minimum of nodes to reduce the network traffic. For example, if your underlying keyspace replication factor is N, a search only invloves 1/N of the nodes.

You can create an index with the RandomSearchStrategy as shown below (or change it dynamically).

curl -XPUT -H "Content-Type: application/json" "http://localhost:9200/twitter/" -d '{
   "settings" : {
      "index.search_strategy_class":"RandomSearchStrategy"
   }
}'

Tip

When changing a keyspace replication factor, you can force an Elasticsearch routing table update by closing and re-opening all associated Elasticsearch indices. To troubleshoot search request routing, set the logging level to DEBUG for class org.elassandra.cluster.routing in the conf/logback.xml file.

Caching features

Compared to Elasticsearch, Elassandra adds to each query a token ranges filter and by fetching fields through a CQL request at the Cassandra layer.

Token Ranges Query Cache

Token ranges filter depends on the node or vnodes configuration, are quite stable and shared for all keyspaces having the same replication factor. These filters only change when the datacenter topology changes, for example when a node is temporarily down or when a node is added to the datacenter. So, Elassandra uses a cache to keep these queries, a conjunction of Lucene NumericRangeQuery often reused for every search requests.

As a classic caching strategy, the token_ranges_query_expire controls the expiration time of useless token ranges filter queries into memory. The default is 5 minutes.

Token Ranges Bitset Cache

When enabled, the token ranges bitset cache keeps in memory the results of the token range filter for each Lucene segment. This in-memory bitset, acting as the liveDocs Lucene tombstones mechanism, is then reused for subsequent Lucene search queries. For each Lucene segment, this document bitset is updated when the Lucene tombstones count increases (it’s a bitwise AND between the actual Lucene thumbstones and the token range filter result), or removed if the corresponding token ranges query is removed because unused from the token range query cache.

You can enable the token range bitset cache at index level by setting index.token_ranges_bitset_cache to true (Default is false), or configure the its default value for newly created indices at cluster or system levels.

You can also bypass this cache by adding token_ranges_bitset_cache=false in your search request :

curl -XGET "http://localhost:9200/twitter/_search?token_ranges_bitset_cache=false&q=*:*"

Finally, you can check the in-memory size of the token ranges bitset cache with the Elasticsearch stats API, and clear it when clearing the Elasticsearch query_cache :

curl -XGET "http://localhost:9200/_stats?pretty=true"
...
"segments" : {
       "count" : 3,
       "memory_in_bytes" : 26711,
       "terms_memory_in_bytes" : 23563,
       "stored_fields_memory_in_bytes" : 1032,
       "term_vectors_memory_in_bytes" : 0,
       "norms_memory_in_bytes" : 384,
       "doc_values_memory_in_bytes" : 1732,
       "index_writer_memory_in_bytes" : 0,
       "index_writer_max_memory_in_bytes" : 421108121,
       "version_map_memory_in_bytes" : 0,
       "fixed_bit_set_memory_in_bytes" : 0,
       "token_ranges_bit_set_memory_in_bytes" : 240
     },
 ...

Cassandra Key and Row Cache

To improve CQL fetch requests response time, Cassandra provides key and row caching features configured for each Cassandra table as follows :

ALTER TABLE ... WITH caching = {'keys': 'ALL', 'rows_per_partition': '1'};

To enable Cassandra row caching, set the row_cache_size_in_mb parameter in your conf/cassandra.yaml, and set row_cache_class_name: org.apache.cassandra.cache.OHCProvider to use off-heap memory.

Tip

Elasticsearch also provides a Lucene query cache, used for segments having more than 10k documents, and for some frequent queries (queries done more than 5 or 20 times depending of the nature of the query). The shard request cache, can also be enabled if the token range bitset cache is disabled.

Create, delete and rebuild index

In order to create an Elasticsearch index from an existing Cassandra table, you can specify the underlying keyspace. In the following example, all columns but message are automatically mapped with the default mapping, and the message is explicitly mapped with a custom mapping.

curl -XPUT -H 'Content-Type: application/json' 'http://localhost:9200/twitter_index' -d '{
    "settings": { "keyspace":"twitter" }
    "mappings": {
        "tweet" : {
            "discover":"^(?!message).*",
            "properties" : {
               "message" : { "type":"keyword", "cql_collection":"singleton" }
            }

        }
    }
}'

Caution

Elassandra requires keyspaces configured with the NetworkTopologyStrategy in order to map the Elasticsearch index.number_of_replicas to the cassandra replication factor minus one. You can change your Cassandra replication factor as explained here.

Tip

By default, as the standard Elasticsearch, index creation only returns a response to the client when all primary shards have been started, or the request times out (default is 30 seconds). To emulate the Elasticsearch routing table, shards hosted by dead nodes are primary or not according to the underlying Cassandra replication factor. So, when there are some dead nodes, if the number of dead nodes is lower than the number of replicas in your create index request, index creation succeeds immediately with shards_acknowledged=true and index status is yellow, otherwise, index creation times out, shards_acknowledged=false and the index status is red, meaning that search requests will be inconsistent. Finally, the Elasticsearch parameter wait_for_active_shards is useless in Elassandra, because Cassandra ensurea write consistency.

Deleting an Elasticsearch index does not remove any Cassandra data, it keeps the underlying Cassandra tables but removes Elasticsearch index files.

curl -XDELETE 'http://localhost:9200/twitter_index'

To re-index your existing data, for example after a mapping change to index a new column, run a nodetool rebuild_index as follows :

nodetool rebuild_index [--threads <N>] <keyspace> <table> elastic_<table>_idx

Tip

By default, rebuild index runs on a single thread. In order to improve re-indexing performance, Elassandra comes with a multi-threaded rebuild_index implementation. The –threads parameter allows to specify the number of threads dedicated to re-index a Cassandra table. Number of indexing threads should be tuned carefully to avoid CPU exhaustion. Moreover, indexing throughput is limited by locking at the lucene level, but this limit can be exceeded by using a partitioned index invloving many independent shards.

Re-index existing data relies on the Cassandra compaction manager. You can trigger a Cassandra compaction when :

  • Creating the first Elasticsearch index on a Cassandra table with existing data automatically involves an index rebuild executed by the compaction manager,
  • Running a nodetool rebuild_index command,
  • Running a nodetool repair on a keyspace having indexed tables (a repair actually creates new SSTables triggering index build).

If the compaction manager is busy, secondary index rebuild is added as a pending task and executed later on. You can check current running compactions with a nodetool compactionstats and check pending compaction tasks with a nodetool tpstats.

nodetool -h 52.43.156.196 compactionstats
pending tasks: 1
                                  id         compaction type   keyspace      table   completed       total    unit   progress
052c70f0-8690-11e6-aa56-674c194215f6   Secondary index build     lastfm   playlist    66347424   330228366   bytes     20,09%
Active compaction remaining time :   0h00m00s

To stop a compaction task (including a rebuild index task), you can either use a nodetool stop –compaction-id <uuid> or use the JMX management operation stopCompactionById (on MBean org.apache.cassandra.db.CompactionManager).

Open, close index

Open and close operations allow an Elasticsearch index to be opened and closed. Even if the Cassandra secondary index remains in the CQL schema while the index is closed, it has no overhead. It’s just a dummy function call. Obviously, when several Elasticsearch indices are associated with the same Cassandra table, data is indexed in opened indices, but not in closed ones.

curl -XPOST 'localhost:9200/my_index/_close'
curl -XPOST 'localhost:9200/my_index/_open'

Warning

Elasticsearch translog is disabled in Elassandra, so you might lose some indexed documents when closing an index if index.flush_on_close is false.

Flush, refresh index

A refresh makes all index updates performed since the last refresh available for search. By default, refresh is scheduled every second. By design, setting refresh=true on a index operation has no effect with Elassandra, because write operations are converted to CQL queries and documents are indexed later by a custom secondary index. So, the per-index refresh interval should be set carefully according to your needs.

curl -XPOST 'localhost:9200/my_index/_refresh'

A flush basically write a lucene index to disk. Because document _source is stored in the Cassandra table in Elassandra, it make sense to execute a nodetool flush <keyspace> <table> to flush both Cassandra Memtables to SSTables and lucene files for all associated Elasticsearch indices. Moreover, remember that a nodetool snapshot also involves a flush before creating a snapshot.

curl -XPOST 'localhost:9200/my_index/_flush'

Tip

Elasticsearch automatically triggers a flush when an index shard is inactive for more than indices.memory.shard_inactive_time (default is 5 minutes) or when Translog size is greater than index.translog.flush_threshold_size (Default is 512Mb). Elassandra implements a dummy Translog to track the size of indexed data and triggers a flush on the same size threashold. Elassandra also triggers an Elasticsearch flush when flushing Cassandra SSTables.

Managing Elassandra nodes

You can add, remove or replace an Elassandra node by using the same procedure as for Cassandra (see Adding nodes to an existing cluster). Even if it’s technically possible, you should never boostrap more than one node at a time,

During the bootstrap process, pulled data from existing nodes are automatically indexed by Elasticsearch on the new node, involving a kind of an automatic Elasticsearch resharding. You can monitor and resume the Cassandra boostrap process with the nodetool bootstrap command.

After boostrap successfully ends, you should cleanup nodes to throw out any data that is no longer owned by that node, with a nodetool cleanup. Because cleanup involves by a Delete-by-query in Elasticsearch indices, it is recommended to smoothly schedule cleanups one at a time in you datacenter.

Backup and restore

By design, Elassandra synchronously updates Elasticsearch indices on the Cassandra write path. Flushing a Cassandra table involves a flush of all associated Elasticsearch indices. Therefore, Elassandra can backup data by taking a snapshot of Cassandra SSTables and Elasticsearch Lucene files on the same time on each node, as follows :

  1. nodetool snapshot --tag <snapshot_name> <keyspace_name>

  2. For all indices associated to <keyspace_name>

    cp -al $CASSANDRA_DATA/elasticsearch.data/<cluster_name>/nodes/0/indices/<index_name>/0/index/(_*|segment*) $CASSANDRA_DATA/elasticsearch.data/snapshots/<index_name>/<snapshot_name>/

Restoring a snapshot

Restoring Cassandra SSTable and Elasticsearch Lucene files allows recovery of a keyspace and its associated Elasticsearch indices without stopping any node (but it is not intended to duplicate data to another virtual datacenter or cluster, this kind of operatio requires the sstableloader <https://docs.datastax.com/en/cassandra/3.0/cassandra/tools/toolsBulkloader.html>`_).

To perform a hot restore of Cassandra keyspace and its Elasticsearch indices :

1. Depending on your situation: * If you want to overwrite existing elasticsearch index, first truncate the underlying cassandra tables. * If you want to restore a deleted index or keyspace, first restore the CQL schema of the keyspace and lost tables by applying the schema.cql files from your snapshot. This re-creates empty elasticsearch indices. 2. Close the associated elasticsearch indices. 3. Restore the Cassandra table with your snapshot on each node. 4. Restore Elasticsearch snapshot data on each node (if ES index is open during nodetool refresh, this causes Elasticsearch index rebuild by the compaction manager, usually 2 threads). 5. Load restored SSTables with a nodetool refresh 6. Open all indices associated to the keyspace.

Point in time recovery

Point-in-time recovery is intended to recover the data at any time. This requires a restore of the last available Cassandra and Elasticsearch snapshot before your recovery point and then applies the commitlogs from this restore point to the recovery point. In this case, replaying commitlogs on startup also re-indexes data in Elasticsearch indices, ensuring consistency at the recovery point.

Of course, when stopping a production cluster is not possible, you should restore on a temporary cluster, make a full snapshot, and restore it on your production cluster as described by the hot restore procedure.

To perform a point-in-time-recovery of a Cassandra keyspace and its Elasticsearch indices, for all nodes at the same time :

  1. Stop all the datacenter nodes.
  2. Restore the last Cassandra snapshot before the restore point and commitlogs from that point to the restore point
  3. Restore the last Elasticsearch snapshot before the restore point.
  4. Restart your nodes

Restoring to a different cluster

When restoring data from another cluster, data distribution is not preserved, and the sstableloader send each restored rows to the appropriate nodes depending on token ranges distribution. If Elasticsearch indices are STARTED before restoring, data are automatically re-indexed in elasticsearch on each nodes while restoring with sstableloader.

To restore a Cassandra keyspace and its associated Elasticsearch indices from/to another cluster:

  1. On the target cluster, create the same Cassandra schema without any custom secondary indices.
  2. From the source cluster, extract the mapping of your associated indices and apply it to your destination cluster. Your keyspace and indices should be open and empty at this step.

If you are restoring into a new cluster having the same number of nodes, configure it with the same token ranges (see https://docs.datastax.com/en/Cassandra/2.1/cassandra/operations/ops_snapshot_restore_new_cluster.html). In this case, you can restore from Cassandra and Elasticsearch snapshots as described in steps 1, 3 and 4 of the snapshot restore procedure.

Otherwise, when the number of nodes and the token ranges from the source and destination cluster do not match, use the sstableloader to restore your Cassandra snapshots (see https://docs.datastax.com/en/cassandra/2.0/cassandra/tools/toolsBulkloader_t.html ). In this approach, all rows are read from the sstables and injected into the Cassandra cluster, causing a full Elasticsearch index rebuild.

Data migration

Migrating from Cassandra to Elassandra

Because Elassandra is Cassandra, you can upgrade an existing Cassandra cluster or just a datacenter to Elassandra, as soon as your Cassandra version is compatible with the Elassandra one :

  • Stop your Cassandra nodes.
  • Start Elassandra with your existing data directory (containing data, commitlog, saved_caches).

Before creating your first Elasticsearch index, deploy the following classes in a jar on all your Cassandra-only nodes to avoid a ClassNotFoundException. You can extract these classes from lib/elasticsearch-<version>.jar :

  • org/elassandra/index/ExtendedElasticSecondaryIndex$DummySecondaryIndex.class
  • org/elassandra/index/ExtendedElasticSecondaryIndex.class

You can move back to standard Cassandra by restarting Cassandra binaries or just starting Cassandra from your Elassandra installation:

  • For tarball installation, run bin/cassandra (don’t use the -e flag to enable Elasticsearch)
  • For APT installation, set CASSANDRA_DAEMON in /etc/default/cassandra
  • For RPM installation, set CASSANDRA_DAEMON in /etc/sysconfig/cassandra

Cassandra automatically builds new secondary indices with one thread. If you want to rebuild faster, stop the on-going rebuild on each node and restart it with the desired number of threads.

Migrating from Elasticsearch to Elassandra

Because of data distribution and because Elassandra stores the _source document in Cassandra SSTables, restoring an Elasticsearch snapshot won’t work. In order to import data from an existing Elasticsearch cluster to Elassandra, you can use the logstash elasticsearch input plugin and the cassandra output plugin.