ESRI-GIS Tools for Hadoop

The ESRI GIS Tools for Hadoop are a collection of GIS tools for spatial analysis of big data.

References:

https://github.com/Esri/gis-tools-for-hadoop/tree/master/samples/point-in-polygon-aggregation-hive

 

Aggregation Sample for Hive: point-in-polygon-aggregation-hive

The following steps are taken from the above reference.

Step-1: Make a folder anywhere in your local server where hive is installed:

$mkdir /tmp/esri-git

Step-2: Bring down the git repository:

esri-git# git clone https://github.com/Esri/gis-tools-for-hadoop.git
Cloning into ‘gis-tools-for-hadoop’…
Receiving objects: 100% (1677/1677), 18.46 MiB | 1.14 MiB/s, done.

esri-git# ls gis-tools-for-hadoop/
CONTRIBUTING.md header.PNG license.txt README.md samples

$cd /tmp/esri-git

Make an earthquake-demo directory in hadoop:

$sudo -u hive hadoop fs -mkdir /tmp/earthquake-demo

$sudo -u hive hadoop fs -put gis-tools-for-hadoop/samples/data/counties-data /tmp/earthquake-demo

$sudo -u hive hadoop fs -put gis-tools-for-hadoop/samples/data/earthquake-data  /tmp/earthquake-demo

Check that it worked:

$sudo -u hive hadoop fs -ls /tmp/earthquake-demo

Start the Hive Command line (Hive CLI):

$sudo -u hive hive

Add the required external libraries and create temporary functions for the geometry api calls.

hive> add jar /tmp/esri-git/gis-tools-for-hadoop/samples/lib/esri-geometry-api-2.0.0.jar;
hive> add jar /tmp/esri-git/gis-tools-for-hadoop/samples/lib/spatial-sdk-hive-2.0.0.jar;
hive> add jar /tmp/esri-git/gis-tools-for-hadoop/samples/lib/spatial-sdk-json-2.0.0.jar;

———————————-

We will see output: Added [/tmp/esri-git/gis-tools-for-hadoop/samples/lib/spatial-sdk-json-2.0.0.jar] to class path

hive> create temporary function ST_Point as ‘com.esri.hadoop.hive.ST_Point’;

hive> create temporary function ST_Point as ‘com.esri.hadoop.hive.ST_Point’;
hive> create temporary function ST_Contains as ‘com.esri.hadoop.hive.ST_Contains’;
OK
Time taken: 0.052 seconds

This is a minimum implementation the ST_Geometry user defined functions found in the Hive Spatial Library. The full list of functions is available in the linked repository.

Drop the tables named counties and earthquakes:
hive> drop table earthquakes;
hive> drop table counties;

Define a schema for the earthquake data. The earthquake data is in CSV (comma-separated values) format, which is natively supported by Hive.

hive> CREATE TABLE earthquakes ( earthquake_date STRING, latitude DOUBLE, longitude DOUBLE, depth DOUBLE, mae string, eventid string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ STORED AS TEXTFILE;

hive> CREATE TABLE counties (Area string, Perimeter string, State string, County string, Name string, BoundaryShape binary) ROW FORMAT SERDE ‘com.esri.hadoop.hive.serde.EsriJsonSerDe’ STORED AS INPUTFORMAT ‘com.esri.json.hadoop.EnclosedEsriJsonInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’;

Time taken: 0.097 seconds

Load data into the respective tables:

First we need to change the ownership of the data file otherwise we will get a permission error:

sudo -u hdfs hdfs dfs -chown hive /tmp/earthquake-demo/earthquake-data/earthquakes.csv

sudo -u hdfs hdfs dfs -chown hive /tmp/earthquake-demo/counties-data/california-counties.json

hive> LOAD DATA INPATH ‘/tmp/earthquake-demo/earthquake-data/earthquakes.csv’ OVERWRITE INTO TABLE earthquakes;
Loading data to table default.earthquakes
Table default.earthquakes stats: [numFiles=1, numRows=0, totalSize=5742716, rawDataSize=0]
OK
Time taken: 0.273 seconds

hive> LOAD DATA INPATH ‘/tmp/earthquake-demo/counties-data/california-counties.json’ OVERWRITE INTO TABLE counties;
Loading data to table default.counties
Table default.counties stats: [numFiles=1, numRows=0, totalSize=1028330, rawDataSize=0]
OK
Time taken: 0.253 seconds

 

Check the rows loaded into the tables:
hive> select count(*) from earthquakes;

77037 rows
Time taken: 23.797 seconds, Fetched: 1 row(s)

 

hive> select count(*) from counties;

30 rows
Time taken: 20.637 seconds, Fetched: 1 row(s)

 

hive> SELECT counties.name, count(*) cnt FROM counties JOIN earthquakes WHERE ST_Contains(counties.boundaryshape, ST_Point(earthquakes.longitude, earthquakes.latitude)) GROUP BY counties.name ORDER BY cnt desc;
Warning: Map Join MAPJOIN[18][bigTable=earthquakes] in task ‘Stage-2:MAPRED’ is a cross product
Query ID = hive_20180601160707_e2ec6cb9-d0cd-4e6f-a972-6044116313dc
Total jobs = 2
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask

 

To overcome this error set below option:

hive> SET hive.auto.convert.join=false;

After running query again we get:

hive> SELECT counties.name, count(*) cnt FROM counties JOIN earthquakes WHERE ST_Contains(counties.boundaryshape, ST_Point(earthquakes.longitude, earthquakes.latitude)) GROUP BY counties.name ORDER BY cnt desc;
Warning: Shuffle Join JOIN[4][tables = [counties, earthquakes]] in Stage ‘Stage-1:MAPRED’ is a cross product
Query ID = hive_20180601162121_a40bbcd0-9d1b-4ad3-95c2-b9b2c152f164
Total jobs = 3
Launching Job 1 out of 3

MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 15.26 sec HDFS Read: 6785965 HDFS Write: 541 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 5.31 sec HDFS Read: 4592 HDFS Write: 541 SUCCESS
Stage-Stage-3: Map: 1 Reduce: 1 Cumulative CPU: 5.08 sec HDFS Read: 5462 HDFS Write: 199 SUCCESS
Total MapReduce CPU Time Spent: 25 seconds 650 msec
OK

—————————————————————————————–

Kern 36
San Bernardino 35
Imperial 28
Inyo 20
Los Angeles 18
Riverside 14
Monterey 14
Santa Clara 12
Fresno 11
San Benito 11
San Diego 7
Santa Cruz 5
San Luis Obispo 3
Ventura 3
Orange 2
San Mateo 1
Time taken: 63.476 seconds, Fetched: 16 row(s)

 

Next again run using hive on spark engine:

hive> set hive.execution.engine=spark;

hive> SELECT counties.name, count(*) cnt FROM counties JOIN earthquakes WHERE ST_Contains(counties.boundaryshape, ST_Point(earthquakes.longitude, earthquakes.latitude)) GROUP BY counties.name ORDER BY cnt desc;

Status: Running (Hive on Spark job[2])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2018-06-19 10:34:45,537 Stage-10_0: 0/4 Stage-11_0: 0/1 Stage-8_0: 2/2 Finished Stage-9_0: 0(+1)/1
2018-06-19 10:34:46,543 Stage-10_0: 0(+4)/4 Stage-11_0: 0/1 Stage-8_0: 2/2 Finished Stage-9_0: 1/1 Finished
2018-06-19 10:34:47,549 Stage-10_0: 4/4 Finished Stage-11_0: 1/1 Finished Stage-8_0: 2/2 Finished Stage-9_0: 1/1 Finished
Status: Finished successfully in 3.02 seconds.

So Hive on Spark is about 20 times faster for this query.

 

REFERENCES:

https://community.hortonworks.com/articles/44319/geo-spatial-queries-with-hive-using-esri-geometry.html

 

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.