$ cat pi.scala
val NUM_SAMPLES = 100000
val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
val x = math.random
val y = math.random
x*x + y*y < 1
}.count()
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")
// running:
scala> :load pi.scala
Loading pi.scala...
NUM_SAMPLES: Int = 100000
count: Long = 78560
Pi is roughly 3.1424
scala> val textFile = sc.textFile("/Users/mchinnappan/text-docs/t8.shakespeare.txt")
textFile: org.apache.spark.rdd.RDD[String] = /Users/mchinnappan/text-docs/t8.shakespeare.txt MapPartitionsRDD[16] at textFile at :24
scala> val lineLengths = textFile.map(s => s.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[17] at map at :26
scala> val totalLength = lineLengths.reduce(_ + _)
totalLength: Int = 5333743
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder().appName("Spark SQL").getOrCreate()
18/02/06 14:55:07 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1a1c308b
scala> import spark.implicits._
import spark.implicits._
scala> val df = spark.read.json("/Users/mchinnappan/text-docs/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+---+--------------+
|age| name|
+---+--------------+
| 76| Ken Thompson|
| 76|Dennis Ritchie|
| 62| James Gosling|
| 83| Niklaus Wirth|
| 63| Bill Joy|
+---+--------------+
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.select("name").show()
+--------------+
| name|
+--------------+
| Ken Thompson|
|Dennis Ritchie|
| James Gosling|
| Niklaus Wirth|
| Bill Joy|
+--------------+
scala> // Select everybody, but increment the age by 1 scala> df.select($"name", $"age" + 1).show() +--------------+---------+ | name|(age + 1)| +--------------+---------+ | Ken Thompson| 77| |Dennis Ritchie| 77| | James Gosling| 63| | Niklaus Wirth| 84| | Bill Joy| 64| +--------------+---------+ scala> // Select people older than 70 scala> df.filter($"age" > 70).show() +---+--------------+ |age| name| +---+--------------+ | 76| Ken Thompson| | 76|Dennis Ritchie| | 83| Niklaus Wirth| +---+--------------+
scala> // Count people by age
scala> df.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 63| 1|
| 83| 1|
| 62| 1|
| 76| 2|
+---+-----+
scala> // Register the DataFrame as a SQL temporary view
scala> df.createOrReplaceTempView("people")
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+---+--------------+
|age| name|
+---+--------------+
| 76| Ken Thompson|
| 76|Dennis Ritchie|
| 62| James Gosling|
| 83| Niklaus Wirth|
| 63| Bill Joy|
+---+--------------+
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("Ken Thompson", 76)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show()
+------------+---+
| name|age|
+------------+---+
|Ken Thompson| 76|
+------------+---+
scala> val jsonPath = "/Users/mchinnappan/text-docs/people.json" jsonPath: String = /Users/mchinnappan/text-docs/people.json scala> val peopleDS = spark.read.json(jsonPath).as[Person] peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string] scala> peopleDS.show() +---+--------------+ |age| name| +---+--------------+ | 76| Ken Thompson| | 76|Dennis Ritchie| | 62| James Gosling| | 83| Niklaus Wirth| | 63| Bill Joy| +---+--------------+
val peopleDF = spark.sparkContext.textFile("/Users/mchinnappan/text-docs/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> peopleDF.show()
+--------------+---+
| name|age|
+--------------+---+
| Ken Thompson| 76|
|Dennis Ritchie| 76|
| James Gosling| 62|
| Niklaus Wirth| 83|
| Bill Joy| 63|
+--------------+---+
scala> peopleDF.createOrReplaceTempView("people2")
scala> var seventyPlusDF = spark.sql("SELECT name,age FROM people2 WHERE age BETWEEN 70 AND 90")
seventyPlusDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> seventyPlusDF.show()
+--------------+---+
| name|age|
+--------------+---+
| Ken Thompson| 76|
|Dennis Ritchie| 76|
| Niklaus Wirth| 83|
+--------------+---+
scala> val usersDF = spark.read.load("/Users/mchinnappan/text-docs/users.parquet")
usersDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> usersDF.show()
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
scala> usersDF.createOrReplaceTempView("users")
scala> val uDF = spark.sql("SELECT * FROM users")
uDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> uDF.show()
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
scala> usersDF.select("name", "favorite_color").show()
+------+--------------+
| name|favorite_color|
+------+--------------+
|Alyssa| null|
| Ben| red|
+------+--------------+
// Parquet is a columnar format that is supported by many other data processing systems.
val peopleDF = spark.read.format("json")
.load("/Users/mchinnappan/text-docs/people.json")
peopleDF.select("name", "age")
.write.format("parquet")
.save("name_and_age.parquet")
$ tree name_and_age.parquet/
name_and_age.parquet/
├── _SUCCESS
└── part-00000-1a7c7e94-95c4-44ae-895a-b7711c510a30-c000.snappy.parquet
0 directories, 2 files
scala> val sqlUsersDF = spark.sql("SELECT * FROM parquet.`/Users/mchinnappan/text-docs/name_and_age.parquet`")
18/02/06 16:26:04 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException
sqlUsersDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> sqlUsersDF.show()
+--------------+---+
| name|age|
+--------------+---+
| Ken Thompson| 76|
|Dennis Ritchie| 76|
| James Gosling| 62|
| Niklaus Wirth| 83|
| Bill Joy| 63|
+--------------+---+
scala> val sqlPeopleJsonDF = spark.sql("SELECT * FROM json.`/Users/mchinnappan/text-docs/people.json`")
18/02/06 16:28:12 WARN ObjectStore: Failed to get database json, returning NoSuchObjectException
sqlPeopleJsonDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlPeopleJsonDF.show()
+---+--------------+
|age| name|
+---+--------------+
| 76| Ken Thompson|
| 76|Dennis Ritchie|
| 62| James Gosling|
| 83| Niklaus Wirth|
| 63| Bill Joy|
+---+--------------+
// A DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
scala> val otherPeopleDataset = spark.createDataset(
| """{"name":"Mohan Chinnappan","address":{"city":"New Found City","state":"NH"}}""" :: Nil)
otherPeopleDataset: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople: org.apache.spark.sql.DataFrame = [address: struct, name: string]
scala> otherPeople.show()
+-------------------+----------------+
| address| name|
+-------------------+----------------+
|[New Found City,NH]|Mohan Chinnappan|
+-------------------+----------------+
scala> usersDF.show()
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
scala> usersDF.write.partitionBy("favorite_color").saveAsTable("people_partitioned")
$ tree spark-warehouse/
spark-warehouse/
└── people_partitioned
├── _SUCCESS
├── favorite_color=__HIVE_DEFAULT_PARTITION__
│ └── part-00000-e6dadc4f-24b6-4bee-b764-8bbc7ac2053e.c000.snappy.parquet
└── favorite_color=red
└── part-00000-e6dadc4f-24b6-4bee-b764-8bbc7ac2053e.c000.snappy.parquet
scala> val numBuckets = 10
numBuckets: Int = 10
scala> usersDF.write.partitionBy("favorite_color")
.bucketBy(numBuckets, "name")
.saveAsTable("people_partitioned_bucketed")
18/02/06 16:46:31 WARN HiveExternalCatalog: Persisting bucketed data source table `default`.`people_partitioned_bucketed` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
$ tree spark-warehouse/
spark-warehouse/
├── people_partitioned
│ ├── _SUCCESS
│ ├── favorite_color=__HIVE_DEFAULT_PARTITION__
│ │ └── part-00000-e6dadc4f-24b6-4bee-b764-8bbc7ac2053e.c000.snappy.parquet
│ └── favorite_color=red
│ └── part-00000-e6dadc4f-24b6-4bee-b764-8bbc7ac2053e.c000.snappy.parquet
└── people_partitioned_bucketed
├── _SUCCESS
├── favorite_color=__HIVE_DEFAULT_PARTITION__
│ └── part-00000-d9ec5ec3-98ea-4bb6-90ed-e370c22fba3e_00004.c000.snappy.parquet
└── favorite_color=red
└── part-00000-d9ec5ec3-98ea-4bb6-90ed-e370c22fba3e_00008.c000.snappy.parquet
scala> usersDF.write
.partitionBy("favorite_color")
.bucketBy(numBuckets, "name")
.sortBy("favorite_numbers")
.saveAsTable("people_partitioned_bucketed_sorted")
18/02/06 16:53:20 WARN HiveExternalCatalog: Persisting bucketed data source table `default`.`people_partitioned_bucketed_sorted` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
$ tree spark-warehouse/
spark-warehouse/
└── people_partitioned_bucketed_sorted
├── _SUCCESS
├── favorite_color=__HIVE_DEFAULT_PARTITION__
│ └── part-00000-4d811338-dda7-48ff-882d-cd4dbbf44205_00004.c000.snappy.parquet
└── favorite_color=red
└── part-00000-4d811338-dda7-48ff-882d-cd4dbbf44205_00008.c000.snappy.parque
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
scala> squaresDF.show()
+-----+------+
|value|square|
+-----+------+
| 1| 1|
| 2| 4|
| 3| 9|
| 4| 16|
| 5| 25|
+-----+------+
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
scala> cubesDF.show()
+-----+----+
|value|cube|
+-----+----+
| 6| 216|
| 7| 343|
| 8| 512|
| 9| 729|
| 10|1000|
+-----+----+
scala> // Read the partitioned table
scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths root
scala> mergedDF.printSchema()
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- cube: integer (nullable = true)
|-- key: integer (nullable = true)
$ tree data
data
└── test_table
├── key=1
│ ├── _SUCCESS
│ ├── part-00000-1a8f3060-7618-472c-80f3-b0c815201fe1-c000.snappy.parquet
│ ├── part-00001-1a8f3060-7618-472c-80f3-b0c815201fe1-c000.snappy.parquet
│ ├── part-00002-1a8f3060-7618-472c-80f3-b0c815201fe1-c000.snappy.parquet
│ ├── part-00003-1a8f3060-7618-472c-80f3-b0c815201fe1-c000.snappy.parquet
│ ├── part-00004-1a8f3060-7618-472c-80f3-b0c815201fe1-c000.snappy.parquet
│ ├── part-00005-1a8f3060-7618-472c-80f3-b0c815201fe1-c000.snappy.parquet
│ ├── part-00006-1a8f3060-7618-472c-80f3-b0c815201fe1-c000.snappy.parquet
│ └── part-00007-1a8f3060-7618-472c-80f3-b0c815201fe1-c000.snappy.parquet
└── key=2
├── _SUCCESS
├── part-00000-bc60e4d1-e390-4aba-83a5-8aad324b73f7-c000.snappy.parquet
├── part-00001-bc60e4d1-e390-4aba-83a5-8aad324b73f7-c000.snappy.parquet
├── part-00002-bc60e4d1-e390-4aba-83a5-8aad324b73f7-c000.snappy.parquet
├── part-00003-bc60e4d1-e390-4aba-83a5-8aad324b73f7-c000.snappy.parquet
├── part-00004-bc60e4d1-e390-4aba-83a5-8aad324b73f7-c000.snappy.parquet
├── part-00005-bc60e4d1-e390-4aba-83a5-8aad324b73f7-c000.snappy.parquet
├── part-00006-bc60e4d1-e390-4aba-83a5-8aad324b73f7-c000.snappy.parquet
└── part-00007-bc60e4d1-e390-4aba-83a5-8aad324b73f7-c000.snappy.parquet
// start the shell
// $ spark-shell --driver-class-path postgresql-9.4-1201-jdbc41.jar --jars postgresql-9.4-1201-jdbc41.jar
scala> var jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:postgresql:dbname")
.option("dbtable","zipcodes")
.option("user", "postgres")
.option("password","password-here")
.load()
jdbcDF: org.apache.spark.sql.DataFrame = [zipcode: string, city: string ... 3 more fields]
scala> jdbcDF.show()
+-------+------------------+-----+--------+---------+
|zipcode| city|state|latitude|longitude|
+-------+------------------+-----+--------+---------+
| 00705| AIBONITO| PR| 18.14| -66.26|
| 00610| ANASCO| PR| 18.28| -67.14|
| 00612| ARECIBO| PR| 18.45| -66.73|
| 00601| ADJUNTAS| PR| 18.16| -66.72|
| 00602| AGUADA| PR| 18.38| -67.18|
| 00603| AGUADILLA| PR| 18.43| -67.15|
| 00703| AGUAS BUENAS| PR| 18.25| -66.1|
| 00704| AGUIRRE| PR| 17.96| -66.22|
| 07675| WESTWOOD| NJ| 40.98| -74.03|
| 07677| WOODCLIFF LAKE| NJ| 41.02| -74.05|
| 07885| WHARTON| NJ| 40.89| -74.58|
| 07981| WHIPPANY| NJ| 40.82| -74.41|
| 07999| WHIPPANY| NJ| 40.82| -74.41|
// db
dbname=# \d testtable
Table "public.testtable"
Column | Type | Modifiers
--------+------+-----------
name | text |
dbname=# table testtable; name
----------
testname
(1 row)
scala> import java.util.Properties
import java.util.Properties
scala> val connectionProperties = new Properties()
connectionProperties: java.util.Properties = {}
scala> connectionProperties.put("user", "postgres")
res4: Object = null
scala> connectionProperties.put("password", "pwd")
res5: Object = null
scala> val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbname", "testtable", connectionProperties)
jdbcDF2: org.apache.spark.sql.DataFrame = [name: string]
scala> jdbcDF2.show()
+--------+
| name|
+--------+
|testname|
+--------+
var df = spark.read.json("/tmp/farming/fruitStock.json")
df.show()
+----------+-------+---+
| date| fruit| id|
+----------+-------+---+
|2018-01-01| Apple| 1f|
|2018-01-02| Apple| 2f|
|2018-02-01| Pear| 3f|
|2018-03-01| Mango| 4f|
|2018-04-01| Pear| 5f|
|2018-05-01|Apricot| 6f|
|2018-06-01| Peach| 7f|
|2018-06-01| Apple| 8f|
+----------+-------+---+
scala> val result = df.withColumn("date", date_format(col("date"), "MMM yyyy"))
.groupBy("fruit")
.pivot("date")
.agg(countDistinct("id").alias("count")).na.fill(0)
scala> result.show()
+-------+--------+--------+--------+--------+--------+--------+
| fruit|Apr 2018|Feb 2018|Jan 2018|Jun 2018|Mar 2018|May 2018|
+-------+--------+--------+--------+--------+--------+--------+
| Peach| 0| 0| 0| 1| 0| 0|
| Pear| 1| 1| 0| 0| 0| 0|
| Mango| 0| 0| 0| 0| 1| 0|
|Apricot| 0| 0| 0| 0| 0| 1|
| Apple| 0| 0| 2| 1| 0| 0|
+-------+--------+--------+--------+--------+--------+--------+
# ref: https://databricks.com/session/pivoting-data-with-sparksql
val ratings_raw = sc.textFile("/Users/mchinnappan/farming/ml-1m/ratings.dat")
case class Rating(user:Int, movie:Int, rating: Int)
val ratings = ratings_raw.map(_.split("::").map(_.toInt)).map(r => Rating(r(0),r(1),r(2))).toDF
scala> ratings.show
+----+-----+------+
|user|movie|rating|
+----+-----+------+
| 1| 1193| 5|
| 1| 661| 3|
| 1| 914| 3|
| 1| 3408| 4|
| 1| 2355| 5|
| 1| 1197| 3|
| 1| 1287| 5|
| 1| 2804| 5|
| 1| 594| 4|
...
val users_raw = sc.textFile("/Users/mchinnappan/farming/ml-1m/users.dat")
case class User(user:Int, gender:String, age: Int)
val users = users_raw.map(_.split("::")).map( u => User(u(0).toInt, u(1), u(2)toInt)).toDF
scala> users.show
+----+------+---+
|user|gender|age|
+----+------+---+
| 1| F| 1|
| 2| M| 56|
| 3| M| 25|
| 4| M| 45|
| 5| M| 25|
| 6| F| 50|
...
val sample_users = users.where( expr("gender = 'F' or ( rand() * 5 < 2)"))
scala> sample_users.groupBy("gender").count().show()
+------+-----+
|gender|count|
+------+-----+
| F| 1709|
| M| 1691|
# 100 most popular movies
scala> val popular = ratings.groupBy("movie").count().orderBy($"count".desc).limit(100)
scala> popular.show()
+-----+-----+
|movie|count|
+-----+-----+
| 2858| 3428|
| 260| 2991|
| 1196| 2990|
val df = Seq(
("G",Some(4),2,None),
("H",None ,4,Some(5))
)
.toDF("A","X","Y", "Z")
df.show()
+---+----+---+----+
| A| X| Y| Z|
+---+----+---+----+
| G| 4| 2|null|
| H|null| 4| 5|
+---+----+---+----+
df.select($"A", expr("stack(3, 'X', X, 'Y', Y, 'Z', Z) as (B, C)"))
.where("C is not null").show
+---+---+---+
| A| B| C|
+---+---+---+
| G| X| 4|
| G| Y| 2|
| H| Y| 4|
| H| Z| 5|
+---+---+---+
# Ref: https://svds.com/pivoting-data-in-sparksql/
Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.
main function