Spark 读写 MySQL 数据库

记录 spark 读写 mysql 数据库的方式
spark 版本为 2.2.0

spark-shell 交互中连接 mysql

官网的示例

首先来看看官网上的示例:

1
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可以看到,官网上使用的是 postgresql 数据库作为的示例;mysql 其实也是一样的,下面就来介绍 spark 连接 mysql 的方法:

下载 mysql 相应的 jar 包

可以看到上述连接 postgresql 数据库示例中,使用到了一个 jar 包:postgresql-9.4.1207.jar;相应的 mysql 也需要这样的一个 jar 包,可去官网下载;也可以在这里进行下载。

我下载了两个版本的进行了测试,分别为 mysql-connector-java-5.0.8-bin.jarmysql-connector-java-6.0.6-bin.jar
这两个版本都没有问题,下面以 mysql-connector-java-5.0.8-bin.jar 为例。

启动 spark-shell

1
./bin/spark-shell --driver-class-path mysql-connector-java-5.0.8/mysql-connector-java-5.0.8-bin.jar --jars mysql-connector-java-5.0.8/mysql-connector-java-5.0.8-bin.jar

启动过程中如果报错,则尝试删除 metastore_db/dbex.lck 文件;再次运行即可
https://stackoverflow.com/questions/37442910/spark-shell-startup-errors

1
rm metastore_db/dbex.lck

读取 mysql 表数据

https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
scala> val jdbcUsername = "root"
jdbcUsername: String = root
scala> val jdbcPassword = "xxxxxx"
jdbcPassword: String = xxxxxx
scala> val jdbcHostname = "localhost"
jdbcHostname: String = localhost
scala> val jdbcPort = 3306
jdbcPort: Int = 3306
scala> val jdbcDatabase ="xblog"
jdbcDatabase: String = xblog
scala> val connectionProperties = new Properties()
<console>:23: error: not found: type Properties
val connectionProperties = new Properties()
^
scala> import java.util.Properties
import java.util.Properties
scala> val connectionProperties = new Properties()
connectionProperties: java.util.Properties = {}
scala> connectionProperties.put("user", jdbcUsername)
res0: Object = null
scala> connectionProperties.put("password", jdbcPassword)
res1: Object = null
scala> val jdbc_url = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
jdbc_url: String = jdbc:mysql://localhost:3306/xblog
scala> val df_student = spark.read.jdbc(jdbc_url, "student", connectionProperties)
java.sql.SQLException: No suitable driver
at java.sql.DriverManager.getDriver(DriverManager.java:315)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$7.apply(JDBCOptions.scala:84)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$7.apply(JDBCOptions.scala:84)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:83)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:34)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:306)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:193)
... 48 elided
scala> Class.forName("com.mysql.jdbc.Driver")
res2: Class[_] = class com.mysql.jdbc.Driver
scala> val df_student = spark.read.jdbc(jdbc_url, "student", connectionProperties)
df_student: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 1 more field]
scala> df_student.show()
+---+----+---+
| id|name|age|
+---+----+---+
| 2| 张三| 18|
| 4| 李四| 20|
+---+----+---+
scala>

写入数据到 mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
scala> case class Person(name: String, age: Long)
defined class Person
scala> val df = Seq(Person("Andy", 32)).toDF()
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> df.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
scala> df.write.jdbc(jdbc_url, "student", connectionProperties)
org.apache.spark.sql.AnalysisException: Table or view 'student' already exists. SaveMode: ErrorIfExists.;
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)
... 48 elided
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> df.write.mode(SaveMode.Append).jdbc(jdbc_url, "student", connectionProperties)
scala> df_student.show()
+---+----+---+
| id|name|age|
+---+----+---+
| 2| 张三| 18|
| 4| 李四| 20|
| 6|Andy| 32|
+---+----+---+
scala>

命令汇总

总的一套下来,就是如下这些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Class.forName("com.mysql.jdbc.Driver")
// 数据库连接属性信息
val jdbcUsername = "root"
val jdbcPassword = "xxxxxx"
val jdbcHostname = "localhost"
val jdbcPort = 3306
val jdbcDatabase ="xblog"
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", jdbcUsername)
connectionProperties.put("password", jdbcPassword)
// 读取数据库表 student
val jdbc_url = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
val df_student = spark.read.jdbc(jdbc_url, "student", connectionProperties)
df_student.show()
// 创建一个 DataFrame
// https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets
case class Person(name: String, age: Long)
val df = Seq(Person("Andy", 32)).toDF()
df.show()
// 存入数据库表
import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Append).jdbc(jdbc_url, "student", connectionProperties)
df_student.show()

spark 程序中连接 mysql

在未使用 mysql 之前,启动命令如下:

1
./bin/spark-submit --class "SimpleApp" --master spark://172.16.201.213:7077 target/scala-2.11/simple-project_2.11-1.0.jar

要支持 mysql ,启动命令如下:

1
./bin/spark-submit --driver-class-path mysql-connector-java-5.0.8-bin.jar --jars mysql-connector-java-5.0.8-bin.jar --class "SimpleApp" --master spark://172.16.201.213:7077 target/scala-2.11/simple-project_2.11-1.0.jar

可以看到,和前面的在 spark-shell 交互中一样,启动时添加了 --driver-class-path 参数和 --jars 参数。
测试了一下,这两参数缺一不可,且必须在前头,还不能放在后面加入。
build.sbt 文件并不需要修改。