1. git clone, checkout to the spark-version & make

  2. export DSS_CONFIG=tpch-kit/dbgen

  3. run via spark shell

cd $SPARK_HOME
./bin/spark-shell \
--master "local[4]" \
--conf spark.default.parallelism=40 \
--conf spark.driver.cores=5 \
--conf spark.driver.memory=16g \
--driver-class-path /usr/local/bin/hive/lib/mysql-connector-j-8.0.33.jar \
--jars ~/ResearchHub/repos/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar

  1. inside spark shell => try to summarize it into a program and use the spark-sql-perf*.jar as the lib
import com.databricks.spark.sql.perf.tpch.TPCHTables
import com.databricks.spark.sql.perf.Tables

// 1. generate data into HDFS

val dbgenDir_ = "/Users/chenghao/Documents/spark3-tpc/gen-dataset/dataset/tpch-kit/dbgen"
val scaleFactor_ = "1"
val useDoubleForDecimal_ = false
val useStringForDate_ = false
val tables = new TPCHTables(spark.sqlContext,
                            dbgenDir = dbgenDir_,
                            scaleFactor = scaleFactor_,
                            useDoubleForDecimal = useDoubleForDecimal_,
                            useStringForDate = useStringForDate_,
                            generatorParams = Nil)

val location_ = s"hdfs://localhost:8020/user/spark_benchmark/tpch_${scaleFactor_}/dataset"
val format_ = "Parquet"
val overwrite_ = true
val partitionTables_ = true
val clusterByPartitionColumns_ = true
val filterOutNullPartitionValues_ = true
val tableFilter_ = ""

val tableNames = tables.tables.map(_.name)

def isPartitioned (tables: Tables, tableName: String) : Boolean = 
  util.Try(tables.tables.find(_.name == tableName).get.partitionColumns.nonEmpty).getOrElse(false)

def time[R](block: => R): R = {  
    val t0 = System.currentTimeMillis() //nanoTime()
    val result = block    // call-by-name
    val t1 = System.currentTimeMillis() //nanoTime()
    println("Elapsed time: " + (t1 - t0) + "ms")
    result
}

tableNames.foreach { tableName =>
  // generate data
  time {
    tables.genData(
      location = location_, 
      format = format_, 
      overwrite = overwrite_, 
      partitionTables = true, 
      // if to coallesce into a single file (only one writter for non partitioned tables = slow) 
      clusterByPartitionColumns = true, //if (isPartitioned(tables, tableName)) false else true, 
      filterOutNullPartitionValues = true, 
      tableFilter = tableName,
      // this controlls parallelism on datagen and number of writers (# of files for non-partitioned)
      // in general we want many writers to S3, and smaller tasks for large scale factors to avoid OOM and shuffle errors    
      numPartitions = 8
    )
  }
}


// 2. load data into database
val databaseName = s"tpch_${scaleFactor_}" // name of database to create.
time { 
  println(s"Creating external tables at $location_")
	tables.createExternalTables(location_, "Parquet", databaseName, overwrite = true, discoverPartitions = true)  
}

tableNames.foreach { tableName =>
    println(s"Table $tableName has " + util.Try(sql(s"SHOW PARTITIONS $tableName").count() + " partitions").getOrElse(s"no partitions"))
    util.Try(sql(s"VACUUM $tableName RETAIN 0.0. HOURS"))getOrElse(println(s"Cannot VACUUM $tableName"))
    sql(s"DESCRIBE EXTENDED $tableName").show(999, false)
    println
  }

tables.analyzeTables(databaseName, analyzeColumns = true)

Chenghao Lyu
Chenghao Lyu
Ph.D. Candidate

My research interests include big data analytics systems, machine learning and multi-objective optimizations.