We use ElasticSearch at my job for web front-end searches. Performance is critical, and for our purposes, the data is mostly static. We update the search indexes daily, but have no problems running on old indexes for weeks. The majority of the traffic to this cluster is search; it is a "read heavy" cluster. We had some performance hiccups at the beginning, but we worked closely with Shay Bannon of ElasticSearch to eliminate those problems. Now our front end clusters are very reliable, resilient, and fast.
I am now working to implement a centralized logging infrastructure that meets compliance requirements, but is also useful. The goal of the logging infrastructure is to emulate as much of the Splunk functionality as possible. My previous write-up on logging explains why we decided against Splunk.
After evaluating a number of options, I've decided to utilize ElasticSearch as the storage back-end for that system. This type of cluster is very different from the cluster we've implemented for heavy search loads.
Translations
A Russian translation of this post provided by EveryCloud.
A Chinese translation of this post provided by FangPeishi.
A Ukrainian translation of this post provided by Open Source Initiative.
Index Layouts
The two popular open source log routing systems are Graylog2 and LogStash. As of this writing, the stable Graylog2 release supports only writing/reading from a single index. As I pointed out in a prior article, this presents enormous scaling issues for Graylog2. The 0.10.0 release of Graylog2 will include the ability to index to multiple indexes. However, my experience has been with LogStash indexes as that was the only scalable option in the past.
In order to get the most out of ElasticSearch for logging, you need to use multiple indexes. There are a few ways to handle when to rollover the index, but LogStash's default automatic daily rotation turns out to make the most sense. So, you'll have something like:
- logstash-2012.12.19
- logstash-2012.12.20
- logstash-2012.12.21
- logstash-THE_WORLD_HAS_ENDED
You could keep track of how many documents are in each index. Then roll after after a million or billion or whatever arbitrary number you decide, but you're just creating more work for yourself later. There are some edge cases where other indexing schemes maybe more efficient, but for most users, an index a day is the simplest, most efficient use of your resources.
Get serious, or go home.
Both LogStash and Graylog2 ship with built-in ElasticSearch implementations. This is great for demonstration or development purposes. DO NOT USE THIS BUILT-IN SERVER FOR REAL PURPOSES! I am surprised by the number of LogStash and Graylog2 users ending up in #elasticsearch on irc.freenode.org who are using the built-in ElasticSearch storage engine and surprised that it falls over!
Run a standalone ElasticSearch Cluster!
You will need separate hardware for this. Java applications like LogStash and ElasticSearch are memory and disk-cache intensive. Commit the hardware to the log processing boxes and separate boxes to the ElasticSearch cluster. Java has some weird issues with memory. We've found that you don't want to go past 32 GB of RAM dedicated to ElasticSearch and reserve atleast 8 GB to the OS for file-system caching.
My cluster is handling ~60 GB of log data a day in my development environment with 3 search nodes at 24 GB of RAM each and is underwhelmed. This brings up the next question, how many servers for my cluster? Start with 3 servers in your ElasticSearch cluster. This gives you the flexibility to shutdown a server and maintain full use of your cluster. You can always add more hardware!
Installing ElasticSearch
I'm not going to cover installing ElasticSearch, you can read more about it on the documentation site. You may even decided to utilize the .deb or possibly roll an rpm and create a recipe for managing ElasticSearch with Puppet or Chef. The only thing I will say about installation, is despite how much it hurts, it's best to run ElasticSearch under the Sun JVM. This is how the developers of ElasticSearch run ElasticSearch and so can you!
ElasticSearch Configuration: OS and Java
There are some things you really need to configure on the host system. I'm assuming you're running Linux as the host system here. You should run ElasticSearch as an unprivileged user. My cluster runs as the 'elasticsearch' user, so we tweak the kernel limits on processes and memory in '/etc/security/limits.conf':
# Ensure ElasticSearch can open files and lock memory!
elasticsearch soft nofile 65536
elasticsearch hard nofile 65536
elasticsearch - memlock unlimited
You should also configure ElasticSearch's minimum and maximum pool of memory be set to the same value. This takes care of all the memory allocation at startup, so you don't have threads waiting to get more memory from the kernel. I've built ElasticSearch on a RedHat system and have this in my '/etc/sysconfig/elasticsearch' which sets environment variables for the daemon at startup:
# Allocate 14 Gigs of RAM
ES_MIN_MEM=14g
ES_MAX_MEM="$ES_MIN_MEM"
This file is managed by Puppet and sets the memory equal to 50% of the RAM + 2 gigs. This isn't rocket science, and it's covered in every ElasticSearch tuning guide.
ElasticSearch Configuration: elasticsearch.yml
There are some things we can tune in the 'elasticsearch.yml' file which will
dramatically improve performance for write-heavy nodes. The first is to set
bootstrap.mlockall
to true. This forces the JVM to allocate all of
ES_MIN_MEM
immediately. This means Java has all the memory it needs at
start up! Another concern of a write heavy cluster is the imbalance of
memory allocating to the indexing/bulk engine.
ElasticSearch is assuming you're going to be using it mostly for searches,
so the majority of your memory allocation is safe guarded for those
searches. This isn't the case with this cluster, so by tweaking
indices.memory.index_buffer_size
to 50% we can restore the balance we need
for this use case. In my setup, I also up the refresh interval and the
transaction count for log flushing. Otherwise, ElasticSearch would be
flushing the translog nearly every second.
The other thing we need to tweak to avoid catastrophic fail is the threadpool settings. ElasticSearch will do what it believes is best to achieve the best performance. We've found out, in production, that this can mean spawning thousands upon thousands of threads to handle incoming requests. This will knock your whole cluster over quickly under heavy load. To avoid this, we set the max number of threads per pool; search, index, and bulk. The majority of our operations will be bulks, so we give that 60 threads, and other operations 20. We also set the maximum number of requests that can queue for processing to 200 for bulk, and 100 for everything else. This way, if the cluster becomes overloaded it will turn down new requests, but it will leave you enough file descriptors and PID's to ssh into the boxes and figure out what went wrong.
Pulling that all together, here's my config file:
##################################################################
# /etc/elasticsearch/elasticsearch.yml
#
# Base configuration for a write heavy cluster
#
# Cluster / Node Basics
cluster.name: logng
# Node can have abritrary attributes we can use for routing
node.name: logsearch-01
node.datacenter: amsterdam
# Force all memory to be locked, forcing the JVM to never swap
bootstrap.mlockall: true
## Threadpool Settings ##
# Search pool
threadpool.search.type: fixed
threadpool.search.size: 20
threadpool.search.queue_size: 100
# Bulk pool
threadpool.bulk.type: fixed
threadpool.bulk.size: 60
threadpool.bulk.queue_size: 300
# Index pool
threadpool.index.type: fixed
threadpool.index.size: 20
threadpool.index.queue_size: 100
# Indices settings
indices.memory.index_buffer_size: 30%
indices.memory.min_shard_index_buffer_size: 12mb
indices.memory.min_index_buffer_size: 96mb
# Cache Sizes
indices.fielddata.cache.size: 15%
indices.fielddata.cache.expire: 6h
indices.cache.filter.size: 15%
indices.cache.filter.expire: 6h
# Indexing Settings for Writes
index.refresh_interval: 30s
index.translog.flush_threshold_ops: 50000
# Minimum nodes alive to constitute an operational cluster
discovery.zen.minimum_master_nodes: 2
# Unicast Discovery (disable multicast)
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: [ "logsearch-01", "logsearch-02", "logsearch-03" ]
ElasticSearch Configuration: Index Templates
As I stated, I developed this cluster based on LogStash due to the short comings of the Graylog2 implementation at the time. This section will contain the word "logstash", but you can easily adapt this to a Graylog2 or homemade index mapping.
Since we've decided to create an index a day, there's two ways to configure the mapping and features of each index. We can either create the indexes explicitly with the settings we want, or we can use a template such that any index created implicitly by writing data to it, has the features and configurations we want! Templates make the most sense in this case, you we'll create them on the now running cluster!
My template settings are:
{
"template": "logstash-*",
"settings" : {
"index.number_of_shards" : 3,
"index.number_of_replicas" : 1,
"index.query.default_field" : "@message",
"index.routing.allocation.total_shards_per_node" : 2,
"index.auto_expand_replicas": false
},
"mappings": {
"_default_": {
"_all": { "enabled": false },
"_source": { "compress": false },
"dynamic_templates": [
{
"fields_template" : {
"mapping": { "type": "string", "index": "not_analyzed" },
"path_match": "@fields.*"
}
},
{
"tags_template" : {
"mapping": { "type": "string", "index": "not_analyzed" },
"path_match": "@tags.*"
}
}
],
"properties" : {
"@fields": { "type": "object", "dynamic": true, "path": "full" },
"@source" : { "type" : "string", "index" : "not_analyzed" },
"@source_host" : { "type" : "string", "index" : "not_analyzed" },
"@source_path" : { "type" : "string", "index" : "not_analyzed" },
"@timestamp" : { "type" : "date", "index" : "not_analyzed" },
"@type" : { "type" : "string", "index" : "not_analyzed" },
"@message" : { "type" : "string", "analyzer" : "whitespace" }
}
}
}
}
To apply the settings to the cluster, we create or update the template with a PUT:
curl -XPUT 'http://localhost:9200/_template/template_logstash/' -d @logstash-template.json
Setting to the template to logstash-*
means all new indexes created that
start with 'logstash-' will have these settings applied. I override the
default search behavior by disabling the _all
fields search and set the
default attribute to @message
. This field will be the raw syslog message.
It's also the only field that doesn't have the analyzer disabled. Don't
freak out. This is saving space and indexing time. It means searching
other fields in the document will match using exact matches rather than
fuzzy searches, but that's O.K. We can still get that warm fuzzy feeling by
searching the @message
field! This will dramatically reduce the storage
size.
In previous write-ups, before ElasticSearch 0.19, you may have seen the
"_source": { "compress": true }
attribute set. This is not recommended
for logging data. This attribute determines whether each document (read:
log message) is stored using compression. As these documents tend to be
very small, compression doesn't really save much space. It does cost
extra processing at the time of indexing and retrieval. It's best to
explicitly disable compression for a logging cluster. The setting which
enabled store compression in our elasticsearch.yml
uses block level
compression which is much more efficient.
Index Settings
The index settings are tuned to a 3 node cluster. We can change everything
but the index.number_of_shards
on the fly if we need to grow or shrink the
cluster. This setup isn't exactly perfect, as we sometimes end up with
orphaned (unallocated) shards. This is easy enough to correct by moving
shards around with the ElasticSearch
API.
Instead of replicating the entire index to the entire cluster, we add storage capacity as we add nodes. This way we have a "RAID like" setup for shard allocation. I have a 3 node cluster, and I create 3 shards per index. This means the master or "write" shard can be balanced to one on each node. For redundancy, I set the number of replicas to one. This means there are 6 shards for each index. Each node is only allowed to have 2 shards per index.
You'll need to experiment with these settings for your needs. Take into
account how many nodes you can afford to lose before you lose functionality.
You'll need to adjust the number of replicas based on that. I've gone with
a simple recipe here of simply having 1 shard replica. This means I can
only spare to have a single node out of the cluster. So far, I've found
that having number_of_replicas
equal to ( 2/3 * number of nodes) - 1 to be
a good number, YMMV.
Automatically Expand Replicas
It's also best to disable ElasticSearch's default behavior to automatically expand the number of replicas based on how many nodes are in the cluster. We assume responsibility for managing this manually and gain performance, especially when we need to stop or restart a node in the cluster. Auto-expansion is a great feature for search-heavy indexes with small to medium data sets. Without reconfiguring, adding another node will increase performance. However, if you have a lot of data in your indexes and this feature is enabled here's what happens when a node restarts:
- Everything is good. Number of replicas = 1.
- Node A shuts down
- Cluster notices node down, goes yellow
- replicas = 0, expected 1
- number of nodes now = 1
- number of replicas expected = 0 now
- Cluster health upgraded to green, Everything Spiffy
- Node A comes back online
- Cluster sends number of replicas expected and actual for all indexes
- Node A realizes it's shards are unnecessary, and deletes data
- Cluster increments number of nodes, replicas expected = 1, actual = 0
- Node A is notified that number of replicas is not yet met
- Node A replicates every shard back into it's index, over the network
As you can see, this is less than desirable, especially with a busy cluster. Please be aware of this behavior in production and watch your network graphs when you add/remove nodes from your cluster. If you see spikes, you may want to manage this manually. You lose some of the magic, but you may find it to be black magic anyways. By disabling the auto expansion of replicas, this happens:
- Everything is good.
- Node A shuts down
- Cluster notices node down, cluster status yellow
- Cluster health does not recover, expected replicas != actual replicas
- Node A comes back up
- Cluster sends number of replicas expected and actual for all indexes
- Node A notifies cluster that it has copies of shards
- Cluster expected and actual replicas now equal, health green
- Cluster checksums the shards and replicates any out-of-date shards to Node A
This is what most people expect the cluster to do by default, but the logic
involved in determining cluster state makes it difficult to accomplish.
Again, the magic behavior of auto_expand_replicas
makes sense in most use
cases, but not in our case.
Maintenance and Monitoring
I wrote a few scripts for working with ElasticSearch in a production environment. It includes exporting metrics to Graphite and Cacti. There is also a Nagios monitoring check which is very configurable. We use these utilities to keep track of the performance and health of our various clusters including the logging cluster. I'll be updating that in the next few days to include my logstash index maintenance script.
As you write your data to the log cluster, ElasticSearch is creating Lucence indexes of the log messages in the background. There is a buffer of incoming documents and based on your settings, that data is flushed to a Lucene index. Lucene indexes are expensive to create/update, but fast to search. This means a single shard may contain hundreds of Lucence indexes, often referred to as segments. These segments can each be searched quickly, but only one can be processed per thread. This can begin to have negative effect on performance. We have seen a 10% degradation in search speed with indexes with 20+ segments.
Luckily, ElasticSearch provides an API for optimizing the Lucene
segments.
You shouldn't optimize an index that's currently indexing data. The new
data will just create more segments on those shards. So how do we know that
we're done writing to an index? Well, if you remember, I recommended using
daily indexes. This means, you can run a cron job daily (or hourly) to
check for any indexes with yesterday's date or older and make sure they're
optimized (or max_num_segments = 1
). If you've chosen some other schema
for creating index names, you've just created more work for yourself.
Future explorations
This post is substantially longer than I expected. I'm just scratching the surface on the design and implementation of ElasticSearch clusters for logging data. My cluster will be moving from development into production soon (thought it currently provides production functionality). When I do, I'm going to face some additional challenges and I have a notebook full of ideas on how to structure indexes and the cluster to handle the load and some of the privacy-related problems that arise when you suddenly provide simple, fast access to massive amounts of data.