Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Monday, July 10, 2017

Working with Apache Spark SQL


What is Spark?
Apache Spark is a lightning-fast cluster (in-memory cluster )computing technology, designed for fast computation. Spark does not depend upon Hadoop because it has its own cluster management, Hadoop is just one of the ways to implement Spark, it uses Hadoop for storage purpose. It extends the MapReduce model to efficiently use it for more types of computations, which includes interactive queries and stream processing.




Components of Spark

The following illustration depicts the different components of Spark.



Spark Core

Spark Core contains the basic functionality of Spark, including components for task scheduling, memory management, fault recovery, interacting with storage systems, and more. Spark Core is also home to the API that defines resilient distributed datasets (RDDs), which are Spark’s main programming abstraction.


Spark SQL
Spark SQL is Spark’s package for working with structured data. It allows querying data via SQL as well as the Apache Hive variant of SQL, it supports many sources of data, including Hive tables, Parquet, and JSON.

Spark Streaming

It enables processing of live streams of data. Examples of data streams include logfiles generated by production web servers, or queues of messages containing status updates posted by users of a web service. It ingests data in mini-batches and performs RDD (Resilient Distributed Datasets) transformations on those mini-batches of data.

MLlib

Spark comes with a library containing common machine learning (ML) functionality, called MLlib. MLlib provides multiple types of machine learning algorithms, including classification, regression, clustering, and collaborative filtering, as well as supporting functionality such as model evaluation and data import.

GraphX
GraphX is a library for manipulating graphs (e.g., a social network’s friend graph) and performing graph-parallel computations.

Cluster Managers
Under the hood, Spark is designed to efficiently scale up from one to many thousands of compute nodes. To achieve this while maximizing flexibility, Spark can run over a variety of cluster managers, including Hadoop YARN, Apache Mesos, and a simple cluster manager included in Spark itself called the Standalone Scheduler. If you are just installing Spark on an empty set of machines, the Standalone Scheduler provides an easy way to get started; if you already have a Hadoop YARN or Mesos cluster, however, Spark’s support for these cluster managers allows your applications to also run on them. 


RDD — Resilient Distributed Dataset                                   

Resilient Distributed Dataset (aka RDD) is the primary data abstraction in Apache Spark and the core of Spark. It represents an immutable, partitioned collection of elements/records that can be operated on in parallel.
Spark makes use of the concept of RDD to achieve faster and efficient MapReduce operations which are slow when it comes to data sharing (between jobs) due to replication, serialization, and disk IO. Regarding storage system, most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations. When user runs ad-hoc queries on the same subset of data. Each query will do the disk I/O on the stable storage (eg; HDFS), which can dominates application execution time.



RDD supports distributed in-memory processing computation which means, it stores the state of memory as an object across the jobs and the object is sharable between those jobs. Data sharing in memory is 10 to 100 times faster than network and Disk. If the Distributed memory (RAM) is not sufficient to store intermediate results (State of the JOB), then it will store those results on the disk.


Installing Spark

You must have java and Scala installed first so install them if they are not already installed.
[hdpsysuser@dn04 ~]$ java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
[hdpsysuser@dn04 ~]$ scala -version
bash: scala: command not found...

Download Scala (scala-2.12.2.tgz) from below URL and extract to install
http://www.scala-lang.org/download/


[root@dn04 hadoopsw]# tar sxvf scala-2.12.2.tgz

[hdpsysuser@dn04 ~]$ vi ~/.bash_profile
[hdpsysuser@dn04 ~]$ cat ~/.bash_profile
# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
        . ~/.bashrc
fi

# User specific environment and startup programs

PATH=$PATH:$HOME/.local/bin:$HOME/bin

export PATH

export PATH=$PATH:/usr/hadoopsw/scala-2.12.2/bin

[hdpsysuser@dn04 ~]$ source ~/.bash_profile

[hdpsysuser@dn04 ~]$ scala -version
Scala code runner version 2.12.2 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.


Download Spark form location and extract to install then verify.
http://spark.apache.org/downloads.html

[hdpsysuser@dn04 ~]$ tar xvf spark-2.1.0-bin-hadoop2.7.tgz
[hdpsysuser@dn04 ~]$ vi ~/.bash_profile
[hdpsysuser@dn04 ~]$ cat ~/.bash_profile
# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
        . ~/.bashrc
fi

# User specific environment and startup programs

PATH=$PATH:$HOME/.local/bin:$HOME/bin

export PATH


##Spark related Variablese
export PATH=$PATH:/usr/hadoopsw/scala-2.12.2/bin
export PATH=$PATH:/usr/hadoopsw/spark-2.1.0-bin-hadoop2.7/bin

[hdpsysuser@dn04 ~]$ source ~/.bash_profile
[hdpsysuser@dn04 ~]$ spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/03 11:40:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/03 11:40:15 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/07/03 11:40:15 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/07/03 11:40:17 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.44.138:4040
Spark context available as 'sc' (master = local[*], app id = local-1499071203229).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Working with Spark Shell

Spark Shell is a powerful tool to analyze data interactively. It is available in either Scala or Python language.

Create a simple RDD from the text file.
scala> val inputfile = sc.textFile("test.txt")
inputfile: org.apache.spark.rdd.RDD[String] = test.txt MapPartitionsRDD[1] at textFile at <console>:24

RDD Transformations
RDD transformations returns pointer to new RDD and allows you to create dependencies between RDDs. Each RDD in dependency chain (String of Dependencies) has a function for calculating its data and has a pointer (dependency) to its parent RDD.

Spark is lazy, so nothing will be executed unless you call some transformation or action that will trigger job creation and execution. RDD transformation is not a set of data but is a step in a program (might be the only step) telling Spark how to get data and what to do with it.



S.NoTransformations & Meaning
1
map(func)
Returns a new distributed dataset, formed by passing each element of the source through a function func.
2
filter(func)
Returns a new dataset formed by selecting those elements of the source on which func returns true.
3
flatMap(func)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
4
mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> ⇒ Iterator<U> when running on an RDD of type T.
5
mapPartitionsWithIndex(func)
Similar to map Partitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) ⇒ Iterator<U> when running on an RDD of type T.
6
sample(withReplacement, fraction, seed)
Sample a fraction of the data, with or without replacement, using a given random number generator seed.
7
union(otherDataset)
Returns a new dataset that contains the union of the elements in the source dataset and the argument.
8
intersection(otherDataset)
Returns a new RDD that contains the intersection of elements in the source dataset and the argument.
9
distinct([numTasks])
Returns a new dataset that contains the distinct elements of the source dataset.
10
groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note − If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
11
reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V, V) ⇒ V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
12
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different from the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
13
sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the Boolean ascending argument.
14
join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
15
cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called group With.
16
cartesian(otherDataset)
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
17
pipe(command, [envVars])
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
18
coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
19
repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
20
repartitionAndSortWithinPartitions(partitioner)
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.


Actions

The following table gives a list of Actions, which return values.


S.NoAction & Meaning
1
reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
2
collect()
Returns all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
3
count()
Returns the number of elements in the dataset.
4
first()
Returns the first element of the dataset (similar to take (1)).
5
take(n)
Returns an array with the first n elements of the dataset.
6
takeSample (withReplacement,num, [seed])
Returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
7
takeOrdered(n, [ordering])
Returns the first n elements of the RDD using either their natural order or a custom comparator.
8
saveAsTextFile(path)
Writes the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark calls toString on each element to convert it to a line of text in the file.
9
saveAsSequenceFile(path) (Java and Scala)
Writes the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
10
saveAsObjectFile(path) (Java and Scala)
Writes the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
11
countByKey()
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
12
foreach(func)
Runs a function func on each element of the dataset. This is usually, done for side effects such as updating an Accumulator or interacting with external storage systems.
Note − modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Know about current RDD
scala> inputfile.toDebugString
res2: String =
(2) test.txt MapPartitionsRDD[1] at textFile at <console>:24 []
 |  test.txt HadoopRDD[0] at textFile at <console>:24 []

Caching the Transformations
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes.
scala> inputfile.cache()
res3: inputfile.type = test.txt MapPartitionsRDD[1] at textFile at <console>:24

Applying the Action
scala> inputfile.count()
res9: Long = 4

scala> inputfile.saveAsTextFile("output1")

It will create a folder output1, check its contents

[hdpsysuser@dn04 ~]$ ls output1
part-00000  part-00001  _SUCCESS

See the storage space for application
If you want to see the storage space that is used for this application, then use the following URL in your browser.

http://localhost:4040


Un-Persist the Storage
If you want to UN-persist the storage space of particular RDD, then use the following command.
scala> inputfile.unpersist()
res12: inputfile.type = test.txt MapPartitionsRDD[3] at textFile at <console>:24


Spark SQL                                                      

Now we have working knowledge of Spark , lets move to our original topic Spark SQL

Spark introduces a programming module for structured data processing called Spark SQL. It provides a programming abstraction called DataFrame and can act as distributed SQL query engine. Spark SQL originated as Apache Hive to run on top of Spark and is now integrated with the Spark stack. Apache Hive had certain limitations as mentioned below. Spark SQL was built to overcome these drawbacks and replace Apache Hive.

Limitations With Hive

Hive launches MapReduce jobs internally for executing the ad-hoc queries. MapReduce lags in the performance when it comes to the analysis of medium sized datasets (10 to 200 GB).
Hive has no resume capability. This means that if the processing dies in the middle of a workflow, you cannot resume from where it got stuck.

Features of Spark SQL

The following are the features of Spark SQL 

Integrated − Seamlessly mix SQL queries with Spark programs. Spark SQL lets you query structured data as a distributed dataset (RDD) in Spark, with integrated APIs in Python, Scala and Java. This tight integration makes it easy to run SQL queries alongside complex analytic algorithms.

Unified Data Access − Load and query data from a variety of sources. Schema-RDDs provide a single interface for efficiently working with structured data, including Apache Hive tables, parquet files and JSON files.

Hive Compatibility − Run unmodified Hive queries on existing warehouses. Spark SQL reuses the Hive frontend and MetaStore, giving you full compatibility with existing Hive data, queries, and UDFs. Simply install it alongside Hive.

Standard Connectivity − Connect through JDBC or ODBC. Spark SQL includes a server mode with industry standard JDBC and ODBC connectivity.

Scalability − Use the same engine for both interactive and long queries. Spark SQL takes advantage of the RDD model to support mid-query fault tolerance, letting it scale to large jobs too. Do not worry about using a different engine for historical data.




Working with Spark SQL - DataFrames

A DataFrame (inspiration from DataFrame in R) is a distributed collection of data, which is organized into named columns. It  can be constructed from an array of different sources such as Hive tables, Structured Data files, external databases, or existing RDDs. It supports different data formats (Avro, csv, elastic search, and Cassandra) and storage systems (HDFS, HIVE tables, mysql, etc).

SQLContext
SQLContext is a class and is used for initializing the functionalities of Spark SQL. SparkContext class object (sc) is required for initializing SQLContext class object. By default, the SparkContext object is initialized with the name sc when the spark-shell starts.

You can  create SQLContext as below

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlcontext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@7f2542f

Read CSV File
scala> val dfcsv = sqlcontext.read.csv("/data/emp.csv")
dfcsv: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 6 more fields]

you could use .json method if file is in JSON format.

Show Data
scala> dfcsv.show()
+----+------+---------+----+---------+----+----+---+
| _c0|   _c1|      _c2| _c3|      _c4| _c5| _c6|_c7|
+----+------+---------+----+---------+----+----+---+
|7369| SMITH|    CLERK|7902|17-DEC-80| 800|null| 20|
|7499| ALLEN| SALESMAN|7698|20-FEB-81|1600| 300| 30|
|7521|  WARD| SALESMAN|7698|22-FEB-81|1250| 500| 30|
|7566| JONES|  MANAGER|7839|02-APR-81|2975|null| 20|
|7654|MARTIN| SALESMAN|7698|28-SEP-81|1250|1400| 30|
|7698| BLAKE|  MANAGER|7839|01-MAY-81|2850|null| 30|
|7782| CLARK|  MANAGER|7839|09-JUN-81|2450|null| 10|
|7788| SCOTT|  ANALYST|7566|19-APR-87|3000|null| 20|
|7839|  KING|PRESIDENT|null|17-NOV-81|5000|null| 10|
|7844|TURNER| SALESMAN|7698|08-SEP-81|1500|   0| 30|
|7876| ADAMS|    CLERK|7788|23-MAY-87|1100|null| 20|
|7900| JAMES|    CLERK|7698|03-DEC-81| 950|null| 30|
|7902|  FORD|  ANALYST|7566|03-DEC-81|3000|null| 20|
|7934|MILLER|    CLERK|7782|23-JAN-82|1300|null| 10|
+----+------+---------+----+---------+----+----+---+

Show Schema Info
scala> dfcsv.printSchema()
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)

Select Specific Column

scala> dfcsv.select("_c1").show()
+------+
|   _c1|
+------+
| ename|
| SMITH|
| ALLEN|
|  WARD|
| JONES|
|MARTIN|
| BLAKE|
| CLARK|
| SCOTT|
|  KING|
|TURNER|
| ADAMS|
| JAMES|
|  FORD|
|MILLER|
+------+

Use Filter
scala> dfcsv.filter(dfcsv("_c5") > 3000).show()
+----+----+---------+----+---------+----+----+---+
| _c0| _c1|      _c2| _c3|      _c4| _c5| _c6|_c7|
+----+----+---------+----+---------+----+----+---+
|7839|KING|PRESIDENT|null|17-NOV-81|5000|null| 10|
+----+----+---------+----+---------+----+----+---+

Use Group By 
scala> dfcsv.groupBy(dfcsv("_c7")).count().show()
+------+-----+
|   _c7|count|
+------+-----+
|    30|    6|
|    20|    5|
|    10|    3|
+------+-----+

Starting a Cluster Manually                                                  

Start Master
You can start a standalone master server by executing:
cd /usr/hadoopsw/spark-2.1.0-bin-hadoop2.7/sbin/[hdpsysuser@dn04 sbin]$ ./start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/hadoopsw/spark-2.1.0-bin-hadoop2.7/logs/spark-hdpsysuser-org.apache.spark.deploy.master.Master-1-dn04.out

Check the log file and and run the browser for Master UI ie; localhost:8071 in my case. You will find "Successfully started service 'sparkMaster' on port 7077" in the log which will be used to connect by workers.




Once started, you can connect workers to master using sparkMaster URL mentioned in UI (above) ie; spark://dn04:7077 , or pass as the “master” argument to SparkContext. You can also find this URL on the master’s web UI, which is http://localhost:8081.


Similarly, you can start one or more workers and connect them to the master via:
Start Slave
[hdpsysuser@dn04 sbin]$ ./start-slave.sh spark://dn04:7077
starting org.apache.spark.deploy.worker.Worker, logging to /usr/hadoopsw/spark-2.1.0-bin-hadoop2.7/logs/spark-hdpsysuser-org.apache.spark.deploy.worker.Worker-1-dn04.out

Connecting an Application to the Cluster                           
To run an application on the Spark cluster, simply pass the spark://IP:PORT URL of the master as to the SparkContext constructor.

To run an interactive Spark shell against the cluster, run the following command:

[hdpsysuser@dn04 sbin]$ spark-shell --master spark://dn04:7077
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/04 09:49:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/04 09:49:27 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/07/04 09:49:27 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/07/04 09:49:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.44.138:4040
Spark context available as 'sc' (master = spark://dn04:7077, app id = app-20170704094914-0000).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>:quit


[hdpsysuser@dn04 sbin]$ ./stop-slave.sh
stopping org.apache.spark.deploy.worker.Worker


Reading JSON file in HDFS
scala> val df = spark.read.json("/tmp/rawTweets.json") //HDFS file
scala> df.show(2)
scala> df.select("text").show(2)
scala> df.select($"text",$"id").show(2)
scala> df.select().count()

Running SQL Queries Programmatically

The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.
scala> df.createOrReplaceTempView("tweets")
scala> val sqlDF = spark.sql("SELECT * FROM tweets")
scala> sqlDF.show(2)


Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.

scala> df.createGlobalTempView("tweets")
scala> spark.sql("SELECT * FROM global_temp.tweets").show(2)
scala> spark.newSession().sql("SELECT * FROM global_temp.tweets").show(2)




Hive Tables                                                     

Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.

Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.



When working with Hive, one must instantiate SparkSession with Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions. Users who do not have an existing Hive deployment can still enable Hive support. When not configured by the hive-site.xml, the context automatically creates metastore_db in the current directory and creates a directory configured by spark.sql.warehouse.dir, which defaults to the directory spark-warehouse in the current directory that the Spark application is started. Note that the hive.metastore.warehouse.dir property in hive-site.xml is deprecated since Spark 2.0.0. Instead, use spark.sql.warehouse.dir to specify the default location of database in warehouse. You may need to grant write privilege to the user who starts the Spark application.



Remote Metastore Server
In remote metastore setup, all Hive Clients will make a connection to a metastore server which in turn queries the datastore (MySQL in this example) for metadata. Metastore server and client communicate using Thrift Protocol. Starting with Hive 0.5.0, you can start a Thrift server by executing the following command:
hive --service metastore

[hdpclient@en01 ~]$ export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native/:$LD_LIBRARY_PATH
Remove below from SPARK_HOME/conf/hive-stie.xml
<property>
<name>hive.metastore.local</name>
<value>faslse</value>
</property>

Add the below in SPARK_HOME/conf/hive-stie.xml
<property>
  <name>hive.metastore.uris</name>
  <value>thrift://en01:9083</value>
  <description>IP address (or fully-qualified domain name) and port of the metastore host</description>
</property>

Start the worker/slave node[hdpclient@en01 ~]$ cd /usr/hadoopsw/spark-2.1.0-bin-hadoop2.7/sbin/
[hdpclient@en01 sbin]$ ./start-slave.sh spark://dn04:7077
starting org.apache.spark.deploy.worker.Worker, logging to /usr/hadoopsw/spark-2.1.0-bin-hadoop2.7/logs/spark-hdpclient-org.apache.spark.deploy.worker.Worker-1-en01.out

Start Spark Shell
[hdpclient@en01 ~]$ spark-shell --master spark://dn04:7077

You can stop worker as below
[hdpclient@en01 sbin]$ ./stop-slave.sh

Note: you can add more nodes if you want with the same start-slave.sh script after installing Spark 


Example Using Scala:
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> case class Record(key: Int, value: String)
defined class Record
scala> val warehouseLocation = "spark-warehouse"
warehouseLocation: String = spark-warehouse
scala> val spark = SparkSession
spark: org.apache.spark.sql.SparkSession.type = org.apache.spark.sql.SparkSession$@44e3f3e5
scala> .builder()
res0: spark.Builder = org.apache.spark.sql.SparkSession$Builder@2b66bf1c
scala> .appName("Spark Hive Example")
res1: spark.Builder = org.apache.spark.sql.SparkSession$Builder@2b66bf1c
scala> .config("spark.sql.warehouse.dir", warehouseLocation)
res2: spark.Builder = org.apache.spark.sql.SparkSession$Builder@2b66bf1c
scala> .enableHiveSupport()
res3: spark.Builder = org.apache.spark.sql.SparkSession$Builder@2b66bf1c
scala> .getOrCreate()
17/07/10 11:12:48 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
res4: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2a666c8e


scala> sql("SELECT * FROM scott.dept").show()
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+
scala> sql("SELECT count(*) FROM scott.dept").show()
+--------+
|count(1)|
+--------+
|       4|
+--------+



Performance comparison Hive, Spark and Presto (Same Query, Same data volume, same HW)

hive (default)> SELECT count(*) FROM scott.wordcount limit 10;
.....
OK
c0
295379535
Time taken: 47.643 seconds, Fetched: 1 row(s)
hive (default)>

scala> sql("SELECT count(*) FROM scott.wordcount limit 10").show();
+---------+
| count(1)|
+---------+
|295379535|
+---------+


scala> val duration = (System.nanoTime - t1) / 1e9d;
duration: Double = 31.702014249

scala> println("Elapsed time: %1f ms".format(duration));
Elapsed time: 8.885186 ms


presto> SELECT count(*) FROM hive.scott.wordcount limit 10;
   _col0
-----------
 295379535
(1 row)

Query 20170710_093143_00004_uzmfy, FINISHED, 3 nodes
Splits: 384 total, 384 done (100.00%)
0:05 [295M rows, 16.6GB] [57.3M rows/s, 3.22GB/s]


Connect to Spark SQL via JDBC/ODBC                                

1- Run thirft server on Spark SQL Master.

[hdpsysuser@en01 sbin]$ ./start-thriftserver.sh
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /usr/hadoopsw/spark-2.1.0-bin-hadoop2.7/logs/spark-hdpsysuser-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-en01.out

Start beeline and connect with using thirft server port ie; 10000 by default. If you want to change this port you could set the below env variable and run the thirft server. The Spark Thrift Server must run in the same host as HiveServer2.

export HIVE_SERVER2_THRIFT_PORT=10005


2- Connect with beeline
1: jdbc:hive2://en01:10000> !connect jdbc:hive2://en01:10005
Connecting to jdbc:hive2://en01:10005
Enter username for jdbc:hive2://en01:10005:
Enter password for jdbc:hive2://en01:10005:
Connected to: Spark SQL (version 2.1.0)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
2: jdbc:hive2://en01:10005> show databases;
+---------------+--+
| databaseName  |
+---------------+--+
| default       |
| flume         |
| scott         |
+---------------+--+
3 rows selected (0.518 seconds)


3- Run the SQL statements

2: jdbc:hive2://en01:10005> select count(*) from scott.emp;
+-----------+--+
| count(1)  |
+-----------+--+
| 14        |
+-----------+--+
1 row selected (1.424 seconds)


4- Download Spark Driver from Micsosfot and create the DSN and use in desired application eg; Excel

https://www.microsoft.com/en-us/download/details.aspx?id=49883



No comments: