Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Monday, May 04, 2020

Kudu Integration with Spark


Kudu integrates with Spark through the Data Source API, I downloaded the jar files from below locaton

https://jar-download.com/artifacts/org.apache.kudu/kudu-spark2_2.11/1.10.0/source-code
you can place the jar files in $SPARK_HOME/jars (eg; /opt/progs/spark-2.4.5-bin-hadoop2.7/jars)if you dont want to use --jars option with spark shell

[solr@te1-hdp-rp-nn01 ~]$ spark-shell --jars /opt/progs/kudu-spark2_2.11-1.10.0.jar

import org.apache.kudu.spark.kudu._

Query from Kudu

// Create a DataFrame that points to the Kudu table we want to query.

val df = spark.read.options(Map("kudu.master" -> "x.x.44.134:7051","kudu.table" -> "syslog")).format("kudu").load;

df.printSchema();

// Create a view from the DataFrame to make it accessible from Spark SQL.

df.createOrReplaceTempView("spsyslog");

// Run Spark SQL queries against our view of the Kudu table.

spark.sql("select * from spsyslog limit 10").show();


Delete Rows from Kudu

{
import org.apache.kudu.spark.kudu._

//Define related variables 

val kuduMasters = Seq("x.x.44.134:7051").mkString(",")
println()
println("###################  INPUT ###################")
val tableName = readLine("Enter Table Name eg; syslogdest : ")
println(tableName)

val deleteCriteria = readLine("Enter Deletion Criteria eg; '2020-01-20T00:10:00' :")
println(deleteCriteria)
val deleteCondition ="msgtimestamp < " + deleteCriteria 

val kuduContext = new KuduContext(kuduMasters, sc)
val df = spark.sqlContext.read.options(Map("kudu.master" -> kuduMasters,"kudu.table" -> tableName)).kudu

// Query using the Spark API and get the count of rows to be deleted
var idToDelete = df.select("msgtimestamp","beat_hostname","component_name","uuid")  // contains ids for existing rows.
idToDelete = idToDelete.filter(deleteCondition)
val rowCount = idToDelete.count()
println()
println("###################  INFO ###################")
println(s"Delete Condition: $deleteCondition");
println(s"Number of rows to be deleted: $rowCount");

println()
println("###################  PROCESS ###################")
//Now Delete rows
val option = readLine(s"Are you sure to delete $rowCount row(s) from $tableName Y/n  :")
println(option)
println()
if( option == "Y" ){
val t1 = System.nanoTime;
println(s"Rows are being deleted from Kudu Table $tableName .....");
kuduContext.deleteRows(idToDelete, tableName)
val duration = (System.nanoTime - t1) / 1e9d;
println(s"$rowCount rows deleted from $tableName");
println();
println("Elapsed time: %1f sec".format(duration));
}else
println("Rows were not deleted")
System.exit(0)
}


You could save above code  in file /home/solr/KuduDeletion.scala and run by spark-shell

[solr@te1-hdp-rp-nn01 ~]$ spark-shell -i KuduDeletion.scala
OR
:load PATH_TO_FILE
:load /home/solr/KuduDeletion.scala

1 comment:

veera said...

Very nice explanation.Thank you.
Keep update more blogs.
big data hadoop course