-
git clone, checkout to the spark-version & make
-
export DSS_CONFIG=tpch-kit/dbgen
-
run via spark shell
cd $SPARK_HOME
./bin/spark-shell \
--master "local[4]" \
--conf spark.default.parallelism=40 \
--conf spark.driver.cores=5 \
--conf spark.driver.memory=16g \
--driver-class-path /usr/local/bin/hive/lib/mysql-connector-j-8.0.33.jar \
--jars ~/ResearchHub/repos/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
- inside spark shell => try to summarize it into a program and use the
spark-sql-perf*.jar
as the lib
import com.databricks.spark.sql.perf.tpch.TPCHTables
import com.databricks.spark.sql.perf.Tables
// 1. generate data into HDFS
val dbgenDir_ = "/Users/chenghao/Documents/spark3-tpc/gen-dataset/dataset/tpch-kit/dbgen"
val scaleFactor_ = "1"
val useDoubleForDecimal_ = false
val useStringForDate_ = false
val tables = new TPCHTables(spark.sqlContext,
dbgenDir = dbgenDir_,
scaleFactor = scaleFactor_,
useDoubleForDecimal = useDoubleForDecimal_,
useStringForDate = useStringForDate_,
generatorParams = Nil)
val location_ = s"hdfs://localhost:8020/user/spark_benchmark/tpch_${scaleFactor_}/dataset"
val format_ = "Parquet"
val overwrite_ = true
val partitionTables_ = true
val clusterByPartitionColumns_ = true
val filterOutNullPartitionValues_ = true
val tableFilter_ = ""
val tableNames = tables.tables.map(_.name)
def isPartitioned (tables: Tables, tableName: String) : Boolean =
util.Try(tables.tables.find(_.name == tableName).get.partitionColumns.nonEmpty).getOrElse(false)
def time[R](block: => R): R = {
val t0 = System.currentTimeMillis() //nanoTime()
val result = block // call-by-name
val t1 = System.currentTimeMillis() //nanoTime()
println("Elapsed time: " + (t1 - t0) + "ms")
result
}
tableNames.foreach { tableName =>
// generate data
time {
tables.genData(
location = location_,
format = format_,
overwrite = overwrite_,
partitionTables = true,
// if to coallesce into a single file (only one writter for non partitioned tables = slow)
clusterByPartitionColumns = true, //if (isPartitioned(tables, tableName)) false else true,
filterOutNullPartitionValues = true,
tableFilter = tableName,
// this controlls parallelism on datagen and number of writers (# of files for non-partitioned)
// in general we want many writers to S3, and smaller tasks for large scale factors to avoid OOM and shuffle errors
numPartitions = 8
)
}
}
// 2. load data into database
val databaseName = s"tpch_${scaleFactor_}" // name of database to create.
time {
println(s"Creating external tables at $location_")
tables.createExternalTables(location_, "Parquet", databaseName, overwrite = true, discoverPartitions = true)
}
tableNames.foreach { tableName =>
println(s"Table $tableName has " + util.Try(sql(s"SHOW PARTITIONS $tableName").count() + " partitions").getOrElse(s"no partitions"))
util.Try(sql(s"VACUUM $tableName RETAIN 0.0. HOURS"))getOrElse(println(s"Cannot VACUUM $tableName"))
sql(s"DESCRIBE EXTENDED $tableName").show(999, false)
println
}
tables.analyzeTables(databaseName, analyzeColumns = true)