Pages

Monday, May 04, 2020

Kudu Integration with Spark


Kudu integrates with Spark through the Data Source API, I downloaded the jar files from below locaton

https://jar-download.com/artifacts/org.apache.kudu/kudu-spark2_2.11/1.10.0/source-code
you can place the jar files in $SPARK_HOME/jars (eg; /opt/progs/spark-2.4.5-bin-hadoop2.7/jars)if you dont want to use --jars option with spark shell

[solr@te1-hdp-rp-nn01 ~]$ spark-shell --jars /opt/progs/kudu-spark2_2.11-1.10.0.jar

import org.apache.kudu.spark.kudu._

Query from Kudu

// Create a DataFrame that points to the Kudu table we want to query.

val df = spark.read.options(Map("kudu.master" -> "x.x.44.134:7051","kudu.table" -> "syslog")).format("kudu").load;

df.printSchema();

// Create a view from the DataFrame to make it accessible from Spark SQL.

df.createOrReplaceTempView("spsyslog");

// Run Spark SQL queries against our view of the Kudu table.

spark.sql("select * from spsyslog limit 10").show();


Delete Rows from Kudu

{
import org.apache.kudu.spark.kudu._

//Define related variables 

val kuduMasters = Seq("x.x.44.134:7051").mkString(",")
println()
println("###################  INPUT ###################")
val tableName = readLine("Enter Table Name eg; syslogdest : ")
println(tableName)

val deleteCriteria = readLine("Enter Deletion Criteria eg; '2020-01-20T00:10:00' :")
println(deleteCriteria)
val deleteCondition ="msgtimestamp < " + deleteCriteria 

val kuduContext = new KuduContext(kuduMasters, sc)
val df = spark.sqlContext.read.options(Map("kudu.master" -> kuduMasters,"kudu.table" -> tableName)).kudu

// Query using the Spark API and get the count of rows to be deleted
var idToDelete = df.select("msgtimestamp","beat_hostname","component_name","uuid")  // contains ids for existing rows.
idToDelete = idToDelete.filter(deleteCondition)
val rowCount = idToDelete.count()
println()
println("###################  INFO ###################")
println(s"Delete Condition: $deleteCondition");
println(s"Number of rows to be deleted: $rowCount");

println()
println("###################  PROCESS ###################")
//Now Delete rows
val option = readLine(s"Are you sure to delete $rowCount row(s) from $tableName Y/n  :")
println(option)
println()
if( option == "Y" ){
val t1 = System.nanoTime;
println(s"Rows are being deleted from Kudu Table $tableName .....");
kuduContext.deleteRows(idToDelete, tableName)
val duration = (System.nanoTime - t1) / 1e9d;
println(s"$rowCount rows deleted from $tableName");
println();
println("Elapsed time: %1f sec".format(duration));
}else
println("Rows were not deleted")
System.exit(0)
}


You could save above code  in file /home/solr/KuduDeletion.scala and run by spark-shell

[solr@te1-hdp-rp-nn01 ~]$ spark-shell -i KuduDeletion.scala
OR
:load PATH_TO_FILE
:load /home/solr/KuduDeletion.scala

1 comment: