Kudu integrates with Spark through the Data Source API, I downloaded the jar files from below locaton
https://jar-download.com/artifacts/org.apache.kudu/kudu-spark2_2.11/1.10.0/source-code
you can place the jar files in $SPARK_HOME/jars (eg; /opt/progs/spark-2.4.5-bin-hadoop2.7/jars)if you dont want to use --jars option with spark shell
[solr@te1-hdp-rp-nn01 ~]$ spark-shell --jars /opt/progs/kudu-spark2_2.11-1.10.0.jar
import org.apache.kudu.spark.kudu._
Query from Kudu
// Create a DataFrame that points to the Kudu table we want to query.
val df = spark.read.options(Map("kudu.master" -> "x.x.44.134:7051","kudu.table" -> "syslog")).format("kudu").load;
df.printSchema();
// Create a view from the DataFrame to make it accessible from Spark SQL.
df.createOrReplaceTempView("spsyslog");
// Run Spark SQL queries against our view of the Kudu table.
spark.sql("select * from spsyslog limit 10").show();
Delete Rows from Kudu
{
import org.apache.kudu.spark.kudu._
//Define related variables
val kuduMasters = Seq("x.x.44.134:7051").mkString(",")
println()
println("################### INPUT ###################")
val tableName = readLine("Enter Table Name eg; syslogdest : ")
println(tableName)
val deleteCriteria = readLine("Enter Deletion Criteria eg; '2020-01-20T00:10:00' :")
println(deleteCriteria)
val deleteCondition ="msgtimestamp < " + deleteCriteria
val kuduContext = new KuduContext(kuduMasters, sc)
val df = spark.sqlContext.read.options(Map("kudu.master" -> kuduMasters,"kudu.table" -> tableName)).kudu
// Query using the Spark API and get the count of rows to be deleted
var idToDelete = df.select("msgtimestamp","beat_hostname","component_name","uuid") // contains ids for existing rows.
idToDelete = idToDelete.filter(deleteCondition)
val rowCount = idToDelete.count()
println()
println("################### INFO ###################")
println(s"Delete Condition: $deleteCondition");
println(s"Number of rows to be deleted: $rowCount");
println()
println("################### PROCESS ###################")
//Now Delete rows
val option = readLine(s"Are you sure to delete $rowCount row(s) from $tableName Y/n :")
println(option)
println()
if( option == "Y" ){
val t1 = System.nanoTime;
println(s"Rows are being deleted from Kudu Table $tableName .....");
kuduContext.deleteRows(idToDelete, tableName)
val duration = (System.nanoTime - t1) / 1e9d;
println(s"$rowCount rows deleted from $tableName");
println();
println("Elapsed time: %1f sec".format(duration));
}else
println("Rows were not deleted")
System.exit(0)
}
You could save above code in file /home/solr/KuduDeletion.scala and run by spark-shell
[solr@te1-hdp-rp-nn01 ~]$ spark-shell -i KuduDeletion.scala
OR
:load PATH_TO_FILE
:load /home/solr/KuduDeletion.scala
1 comment:
Very nice explanation.Thank you.
Keep update more blogs.
big data hadoop course
Post a Comment