Connect ElasticSearch to Cloudera Hadoop using ES-Hadoop.

[CAUTION: Currently the ES-Hadoop jars are giving errors with Cloudera CDH and Hue throwing errors saying multiple jars found and so the below process is not working. Use these instructions at your own risk as they may not work and so far not able to get a solution yet.]

Environment:

Cloudera CDH 5.12.x

elasticsearch-hadoop-6.2.1

 

Methods to connect ElasticSearch to Hadoop:

There are 3 ways to connect from Hadoop Hive to ElasticSearch.

We can set ES-hadoop jar in the command:

hive -hiveconf hive.aux.jars.path=/opt/elastic/elasticsearch-hadoop-6.2.1/dist/elasticsearch-hadoop-hive-6.2.1.jar;

Other option for doing same thing is to open hive session and then calling following command as first thing:


ADD JAR /opt/elastic/elasticsearch-hadoop-6.2.1/dist/elasticsearch-hadoop-hive-6.2.1.jar;

Problem with both these approaches is that you will have to keep letting hive know the full path to elasticsearch jars every single time. Instead you can take care of this issue by copying elasticsearch-hadoop-hive-<eshadoopversion>.jar into same directory on every node in your hadoop cluster. In my case i copied it to /opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist directory by downloading the https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-6.2.1.zip and unzipping it and moving only the single elasticsearch-hadoop-6.2.1.jar file to the /opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist directory .

Then in Cloudera Manager set the value of Hive Auxiliary JARs Directory hive.aux.jars.path property to /opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist directory.

================================================

Install Steps:

First download the ES-Hadoop jar file to your hadoop master node:

Create a directory /opt/elastichadoop in a hadoop master node . Download the ES-Hadoop zip file into this directory:

/opt/elastichadoop> $ wget https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-6.2.1.zip

$ unzip elasticsearch-hadoop-6.2.1.zip

Now let us find out if the ElasticSearch service is running (note: use the correct hostname if elastic is running on a different host other than localhost):

$ curl -XGET ‘localhost:9200/_cat/health?v&pretty’
epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks
1518444681 09:11:21 elasticsearch red 1 1 5 5 0 0 54 0

$ curl -XGET http://localhost:9200
{
“status” : 200,
“name” : “xyz”,
“cluster_name” : “elasticsearch”,
“version” : {
“number” : “5.3.0”,
“build_hash” : “622039121e53e5f520b5ff8720fdbd3d0cb5326b”,
“build_timestamp” : “2017-03-23T03:31:50.652Z”,
“build_snapshot” : false,
“lucene_version” : “6.4.1”
},
“tagline” : “You Know, for Search”
}

Next list all the indexes created in ES:

$ curl -XGET 'localhost:9200/_cat/indices?v&pretty'
health status index pri rep docs.count docs.deleted store.size pri.store.size
red  open     .marvel-kibana 1 1
red  open     testindex 5 0
yellow open   index1 5 1 145274 157564 669mb 669mb

 

If we need to restart Elastic (optional):

$ sudo service elasticsearch restart

Stopping elasticsearch: [ OK ]
Starting elasticsearch: [ OK ]

To find the column mappings for an index in ES:

$ curl -XGET ‘localhost:9200/index1/_mapping?pretty’

 

Next start a hive command line and add the ES-hadoop jar file (need to sudo as hive user otherwise the Create External table may give permission error later):

$ sudo -u hive hive

Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/jars/hive-common-1.1.0-cdh5.12.1.jar!/hive-log4j.properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.

hive> ADD JAR file:///home/userxyz/downloads/elasticsearch/elasticsearch-hadoop-6.2.1/dist/elasticsearch-hadoop-6.2.1.jar;
Added [file:///home/userxyz/downloads/elasticsearch/elasticsearch-hadoop-6.2.1/dist/elasticsearch-hadoop-6.2.1.jar] to class path
Added resources: [file:///home/userxyz/downloads/elasticsearch/elasticsearch-hadoop-6.2.1/dist/elasticsearch-hadoop-6.2.1.jar]

Next create a Hive external table that is linked to a ES index:

hive> CREATE EXTERNAL TABLE gcsampledata1 (line int,value bigint,message string)
STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
TBLPROPERTIES (‘es.resource’ = ‘gcsampledata1/line’,
‘es.index.auto.create’ = ‘true’,
‘es.nodes’=’localhost’);

OK
Time taken: 1.448 seconds

The external hive table will get created in the default database. The es.resource parameter specifies the index/type to create in Elasticsearch. Note type will be deprecated in ES 7.x

hive> describe gcsampledata1;
OK
line          int       from deserializer
value       bigint from deserializer
message string  from deserializer
Time taken: 0.593 seconds, Fetched: 3 row(s)

Next load data into the ES index:

hive> INSERT INTO TABLE gcsampledata1 VALUES (1,100,’msg line1 data1′),(2,200,’msg line2 data2′);
Query ID = hive_20180212105050_718ead65-e352-48df-adf4-a12cd0778843
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there’s no reduce operator
Starting Job = job_1512577646298_0072, Tracking URL = http://

OK
Time taken: 16.503 seconds

Now try to query the data in Elastic:

$ curl -XGET ‘localhost:9200/gcsampledata1/line/_search?pretty=true’
{
“took” : 1,
“timed_out” : false,
“_shards” : {
“total” : 5,
“successful” : 5,
“failed” : 0
},
“hits” : {
“total” : 2,
“max_score” : 1.0,
“hits” : [
{
“_index” : “gcsampledata1”,
“_type” : “line”,
“_id” : “AWGK2kNZs-JaAcDdF2WF”,
“_score” : 1.0,
“_source” : {
“line” : 2,
“value” : 200,
“message” : “msg line2 data2”
}
},
{
“_index” : “gcsampledata1”,
“_type” : “line”,
“_id” : “AWGK2kNZs-JaAcDdF2WE”,
“_score” : 1.0,
“_source” : {
“line” : 1,
“value” : 100,
“message” : “msg line1 data1”
}
}
]
}
}

Congratulations! You have successfully created an index in ElasticSearch and loaded data from Hive to ES using SQL commands.

===============================================

Adding the JAR directly to Cloudera Manager:

One of the problem with above approach is that you will have to keep letting Hive know the full path to elasticsearch jars every single time using ADD JAR. Instead you can take care of this issue by copying elasticsearch-hadoop-hive-<eshadoopversion>.jar into same directory on every node in your hadoop cluster. In my case i copied it to /opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist/ directory by unzipping the downloaded ES-Hadoop zip file as given above.

CAUTION: Only keep the single jar file elasticsearch-hadoop-6.2.1.jar in the directory /opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist  and move the rest of the jars to another location like /opt/elastichadoop/backup . Otherwise you will get an error like:

Exception in thread “main” java.lang.Error: Multiple ES-Hadoop versions detected in the classpath; please use only one
jar:file:/opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist/elasticsearch-hadoop-6.2.1.jar
jar:file:/opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist/notneeded/elasticsearch-spark-20_2.11-6.2.1.jar
jar:file:/opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist/notneeded/elasticsearch-hadoop-mr-6.2.1.jar
jar:file:/opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist/notneeded/elasticsearch-hadoop-cascading-6.2.1.jar
jar:file:/opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist/notneeded/elasticsearch-hadoop-pig-6.2.1.jar
jar:file:/opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist/notneeded/elasticsearch-hadoop-hive-6.2.1.jar
jar:file:/opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist/notneeded/elasticsearch-storm-6.2.1.jar

Next login to Cloudera Manager->Clusters->Hive->Advanced  and set the value of:

Hive Auxiliary JARs Directory to

/opt/elastichadoop/elasticsearch-hadoop-6.2.1/dist

Restart Hive and now the ES-Hadoop jar will be available automatically without having to do ADD JAR to hive explicitly.

 

Next to query the table from Hive:

hive> select * from gcsampledata1;

This gives an error:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row [Error getting row data with exception java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable

The reason for this error is given in the ES documentation:

It is important to remember that automatic mapping uses the payload values to identify its type, using the first document creates the mapping. elasticsearch-hadoop communicates with Elasticsearch through JSON which does not provide any type information, rather only the field names and their values. One can think of it as type erasure or information loss; for example JSON does not differentiate integer numeric types – byteshortintlong are all placed in the same long bucket. this can have unexpected side-effects since the type information is guessed such as:

numbers mapped only as long/doubleedit

Whenever Elasticsearch encounters a number, it will allocate the largest type for it since it does not know the exact number type of the field. Allocating a small type (such as byteint or float) can lead to problems if a future document is larger, so Elasticsearch uses a safe default.

To avoid this problem we need to explicitly create the ES index first:

curl -XPUT ‘localhost:9200/gcsampledata1’ -d ‘{ “mappings” : {“line” : { “properties” : { “line” : { “type” : “integer” }, “value” :{“type” : “long” }, “message” : {“type” : “string”} } } }}’

{“acknowledged”:true,”shards_acknowledged”:true}

Next create the hive table with ‘es.index.auto.create’ = ‘false’

CREATE EXTERNAL TABLE gcsampledata1 (line int, value bigint, message string)
STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
TBLPROPERTIES (‘es.resource’ = ‘gcsampledata1/line’,
‘es.index.auto.create’ = ‘false‘,
‘es.nodes’= ‘localhost’);

OK

Time taken: 2.005 seconds

hive> INSERT INTO TABLE gcsampledata1 VALUES (1,100,’msg line1 data1′),(2,200,’msg line2 data2′);

Stage-Stage-0: Map: 1 Cumulative CPU: 1.53 sec HDFS Read: 4406 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 530 msec
OK
Time taken: 16.503 seconds

Next Search a string: data2 in message field:

/root>curl -XGET ‘localhost:9200/gcsampledata1/_search?q=message:data2&pretty’

{

“took” : 1,

“timed_out” : false,

“_shards” : {

“total” : 5,

“successful” : 5,

“failed” : 0

},

“hits” : {

“total” : 1,

“max_score” : 0.25316024,

“hits” : [

{

“_index” : “gcsampledata1”,

“_type” : “line”,

“_id” : “AWGPmNq6s-JaAcDdF2WJ”,

“_score” : 0.25316024,

“_source” : {

“line” : 2,

“value” : 200,

“message” : “msg line2 data2”

}

}

]

}

}

 

This shows that the INSERT to the Hive external table automatically populated the Elastic index and we can use ES query to search the data in the index. Thus we have successfully connected Hive with Elastic using an external table.

 

REFERENCES:

https://qbox.io/blog/how-to-offload-elasticsearch-indices-to-hive-hadoop

https://db-blog.web.cern.ch/blog/prasanth-kothuri/2016-03-integrating-hadoop-and-elasticsearch-part-1-loading-and-querying

https://db-blog.web.cern.ch/blog/prasanth-kothuri/2016-05-integrating-hadoop-and-elasticsearch-%E2%80%93-part-2-%E2%80%93-writing-and-querying

https://www.linkedin.com/pulse/how-use-hive-table-elasticsearch-index-khan-arshad/

https://www.rittmanmead.com/blog/2014/11/analytics-with-kibana-and-elasticsearch-through-hadoop-part-1-introduction/

http://www.idata.co.il/2016/06/integrating-elasticsearch-with-hadoop-using-hive/

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/index.html

 

 

 

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.