Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Wednesday, November 08, 2017

Using HDP Zeppelin



Apache Zeppelin is a web-based notebook that enables interactive data analytics. With Zeppelin, you can make beautiful data-driven, interactive and collaborative documents with a rich set of pre-built language backends (or interpreters, An interpreter is a plugin that enables you to access processing engines and data sources from the Zeppelin UI.) such as Scala (with Apache Spark), Python (with Apache Spark), SparkSQL, Hive, Markdown, Angular, and Shell. 


Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. Apache Spark is supported in Zeppelin with Spark interpreter group which consists of below  interpreters.

NameClassDescription
%spark2SparkInterpreterCreates a SparkContext and provides a Scala environment
%spark2.pysparkPySparkInterpreterProvides a Python environment
%rSparkRInterpreterProvides an R environment with SparkR support
%sqlSparkSQLInterpreterProvides a SQL environment
%depDepInterpreterDependency loader



Add Zeppelin service using Ambari Admin->Stack and Versions menu. After adding Zeepelin will be available using http://<hostname>:9995
By default you should be able to login with admin/admin.





Create a notebook
1- Under the “Notebook” tab, choose +Create new note. Type a name for the new note (or accept the default) then press Ok

2- Type sc.version into a paragraph in the note, and click the “Play” button (blue triangle). SparkContext, SQLContext, ZeppelinContext will be created automatically. They will be exposed as variable names ‘sc’, ‘sqlContext’ and ‘z’, respectively, in scala and python environments. 


SparkContext is the object that manages the connection to the clusters in Spark and coordinates running processes on the clusters themselves. SparkContext connects to cluster managers, which manage the actual executors that run the specific computations.

The first run will take some time, because it is launching a new Spark job to run against YARN. Subsequent paragraphs will run much faster.

When finished, the status indicator on the right will say “FINISHED”. The output should list the version of Spark in your cluster.




By default Notebook repository Path is /usr/hdp/current/zeppelin-server/notebook/ on the server where Zeppelin service has been added.

Test Shell Interpreter
In the new paragraph, verify emp.csv file on HDFS location /tmp which I already copied there using shell interpreter (%sh).





Test Spark Interpreter


In Spark, datasets are represented as a list of entries, where the list is broken up into many different partitions that are each stored on a different machine. Each partition holds a unique subset of the entries in the list. Spark calls datasets that it stores Resilient Distributed Datasets (RDDs).


Lets create RDD from emp.csv available in HDFS
val file = sc.textFile("hdfs://nn01:8020/tmp/emp.csv")




Now we have a freshly created RDD. We have to use an action operation like collect() to gather up the data into the driver’s memory and then to print out the contents of the file:

file.collect().foreach(println)




Remember doing a collect() action operation on a very large distributed RDD can cause your driver program to run out of memory and crash. So, do not use collect() except for when you are prototyping your Spark program on a small dataset.

Another way to print the content of the RDD is


file.toArray.foreach(println)

you can easily discover other methods that apply to this RDD by auto-completion. Type the name of the RDD followed by a ., in our case it’s file. and then press the crtl + . (dot)
Extract Information
Now let’s extract some information from above data using map operator. I want to extract the employee's salary information along with its key empno. By using the Spark API operator map, we will create or transform our original RDD into a newer one. 

First let’s filter out the blank lines.

val fltr = file.filter( x => x.length > 0 )

Or

val fltr = file.filter(_.length > 0)

_ is a shortcut or wildcard in Scala that essentially means ‘whatever happens to be passed to me’. 
So, in the above code the _ or the x stands for each row of our file RDD: if the row length > 0 is, hence not empty, then assign it to fltr which is a new RDD.

So, we are invoking the method length on an unknown whatever and trusting that Scala will figure out that the thing in each row of the file RDD is actually a String that supports the length operator. In other words within the parenthesis of our filter method we are defining the argument: ‘whatever’, and the logic to be applied to it.

This pattern of constructing a function within the argument to a method is one of the fundamental characteristics of Scala and once you get used to it, it will make sense and speed up your programming a lot. Then let’s split the line into individual columns separated by , and then let’s grab the 6th columns, which means the column with index 5.



val keys = fltr.map(_.split(",")).map(a => a(5))

split function results in an anonymous RDD consisting of arrays. The anonymous RDD is passed to the map function. In this case, each array in the anonymous RDD is assigned to the variable ‘a’.
Then we extract the 6th element from it, which ends up being added to the named RDD called ‘keys’ we declared at the start of the line of code. Then print out the values of the key.

keys.collect().foreach(println)

We need to count how many times each key (sal) appears in the csv.
val stateCnt = keys.map(key => (key,1)) //print stateCnt stateCnt.toArray.foreach(println)


Next, we will iterate through each row of the stateCnt RDD and pass their contents to a utility method available to our RDD that counts the distinct number of rows containing each key

val lastMap = stateCnt.countByKey




Now, let’s print out the result.

lastMap.foreach(println)






Note that at this point you still have access to all the RDDs you have created during this session. You can reprocess any one of them, for instance, again printing out the values contained in the keys RDD:


keys.collect().foreach(println)

Hive Interaction

You can access Hive tables via Zeppelin in two ways

1) Use Zeppelin's native Hive interpreter directly by starting a code block with '%sql' interpreter command and issuing commands like 'show tables' or 'select * from table'



Examples:
Use below in Zeppelin

%sql

select ${groupByColName=host,host|year|month|month_eng|process|event|day_stamp},count(*) Tot_events from flume.hive_vw_syslogs_current_regex2 group by ${groupByColName=host,host|year|month|month_eng,process|event|day_stamp}



%sql

select * from scott.emp where deptno="${dno=10,10|20|30|40}"

${groupByColName=host,host|year|month|month_eng,process|event|day_stamp} or "${dno=10,10|20|30|40}" will create list








2) Via Spark (%spark2) interpreter by creating HiveContext and then loading hive table into DataFrame, like this:


%spark2
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

// Queries are expressed in HiveQL 
sqlContext.sql("FROM scott.emp SELECT ename,job").collect().foreach(println)



val tables = sqlContext.sql("show tables")

tables.show()




PySpark (Spark with Python)


Python is a powerful programming language for handling complex data analysis and data munging tasks. It has several in-built libraries and frameworks to do data mining tasks efficiently. Apache Spark comes with an interactive shell for python as it does for Scala. The shell for python is known as “PySpark” which is available in HDP Zeppelin already.


The PySpark shell is responsible for linking the python API to the spark core and initializing the spark context. This shell is available in Zeppelin as %pyspark interpreter.

%pyspark

# Python + sc
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)# Distribute a local Python collection to form an RDD
f = distData.filter(lambda x: x % 2 == 0) # use filter
f.take(5)
f = distData.map(lambda x: x % 2 == 0) # Now map
f.take(5)


# Reading external dataset/ a file in PySpark Shell in Zeepelin
RDDread = sc.textFile ("/tmp/emp.csv") # SparkContext is already created
RDDread.collect() # see the contents of the RDD
RDDread.first()  # reads the first line of the RDD
RDDread.take(5) # first 5 lines
RDDread.takeSample (False, 10, 2) # takeSample (withReplacement, n, [seed])
RDDread.count()


SparkSQL

Spark introduces a programming module for structured data processing called Spark SQL. It provides a programming abstraction called DataFrame and can act as distributed SQL query engine.


%pyspark

#A simple example demonstrating basic Spark SQL features.
df = spark.read.csv("/tmp/emp.csv",header=True) # spark is an existing SparkSession

df.show() # Displays the content of the DataFrame to stdout
df.printSchema() # Print the schema in a tree format
df.select("ename").show()# Select only the "ename" column
df.select(df['ename'], df['sal'] + 1).show() # Select everybody, but increment the sal by 1
df.filter(df['sal'] > 3000).show() 
df.groupBy("deptno").count().show()

df.createOrReplaceTempView("emp") # Register the DataFrame as a SQL temporary view
sqlDF = spark.sql("SELECT * FROM emp")
sqlDF.show()

df.createOrReplaceGlobalTempView("emp2") # Register the DataFrame as a global temporary view
# Global temporary view is tied to a system preserved database `global_temp`
# Global temporary view is cross-session



%spark2.pyspark

#spark.sql("SELECT * FROM global_temp.emp2").show()
spark.newSession().sql("SELECT * FROM global_temp.emp").show()





You can further use the global_temp tables using %sql interpreter


%sql
SELECT deptno,sum(sal) total FROM global_temp.emp group by deptno

%jdbc Interpreter

The %jdbc interpreter supports access to Apache Hive data, here is a sample paragraph:

%jdbc(hive)
SELECT * FROM scott.dept;


The JDBC interpreter connects via Thrift.



No comments: