在Spark中读取PostgreSQL的json数据,并进行结构化处理
Spark支持对json的结构化读取,并转化为它自己可以理解的schema;PostgreSQL支持json的数据类型。把这二者结合在一起,可以形成一个强大的数据分析架构。本文介绍在Spark中读取PostgreSQL的json数据,并进行结构化处理的方法。
关于PostgreSQL的json数据类型,可以查看这篇文档:
我们可以创建一个数据库表,它的字段类型是json
的数据类型。下面是例子:
$ psql astro
psql (9.6.2)
Type "help" for help.
astro=# \d items;
Table "public.items"
Column | Type | Modifiers
-----------+-----------------------------+----------------------------------------------------
id | integer | not null default nextval('items_id_seq'::regclass)
timestamp | timestamp without time zone | not null default now()
info | json | not null
Indexes:
"items_pkey" PRIMARY KEY, btree (id)
可以看到,其中info
字段是json
类型的。这样类型的字段,PostgreSQL是可以结构化读取的。下面是样例数据:
astro=# select info from items limit 1;
-[ RECORD 1 ]----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
info | {"distinct_id":"SDK为每个用户生成的唯一ID","info":{"token":"当前产品的token","timestamp":"事件发生的时间戳","type":"track","event":"PageView","fields":{"$lib":"javascript","$lib_version":"0.0.1","$screen_width":375,"$screen_height":667,"$os":"MacOSX10.1","$ip":true}}}
可以看到info
字段里面的json数据,接下来是结构化读取里面的内容:
astro=# select info -> 'distinct_id' from items limit 1;
-[ RECORD 1 ]-------------------------
?column? | "SDK为每个用户生成的唯一ID"
astro=#
这是postgresql提供的json数据的支持能力,它在内部实际上使用树的结构来保存json数据,所以查询效率会非常高效。
Spark这边需要的是字串形式的json数据。关于从数据库读取数据进spark,可以查看之前写的文章:
按照上面的文章,把PostgreSQL数据库里面的数据读取进来:
$ spark-shell --driver-class-path postgresql-42.2.5.jar --jars /usr/local/lib/postgresql-42.2.5.jar
2018-09-23 19:16:19 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-09-23 19:16:23 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://ovpn-12-118.pek2.redhat.com:4041
Spark context available as 'sc' (master = local[*], app id = local-1537701383911).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val url = "jdbc:postgresql://localhost:5432/astro?user=weli"
url: String = jdbc:postgresql://localhost:5432/astro?user=weli
scala> val driver = "org.postgresql.Driver"
driver: String = org.postgresql.Driver
scala> val dbDataFrame = spark.read.format("jdbc").option("url", url).option("dbname", "astro").option("dbtable", "items").option("driver", driver).load()
2018-09-23 19:16:59 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
dbDataFrame: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 1 more field]
scala> val infoDf = dbDataFrame.select("info")
infoDf: org.apache.spark.sql.DataFrame = [info: string]
scala>
以上生成了infoDf
的dataFrame,但是这个dataFrame里面,json数据是做为字符串的Row格式存在的:
scala> infoDf.take(1)
res0: Array[org.apache.spark.sql.Row] = Array([{"distinct_id":"SDK为每个用户生成的唯一ID","info":{"token":"当前产品的token","timestamp":"事件发生的时间戳","type":"track","event":"PageView","fields":{"$lib":"javascript","$lib_version":"0.0.1","$screen_width":375,"$screen_height":667,"$os":"MacOSX10.1","$ip":true}}}])
scala> jsonDf.count()
res21: Long = 30
这样的数据并没有解析里面的json结构。我们需要把它转化成Spark的json结构。下面是一个初步探讨的路子,后续可能有更优化的解法。
首先是把数据从Row的列表转成字串的列表,下面是代码:
scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders
scala> val jsons = infoDf.as(Encoders.STRING).collectAsList();
jsons: java.util.List[String] = [{"distinct_id":"SDK为每个用户生成的唯一ID","info":{"token":"当前产品的token","timestamp":"事件发生的时间戳","type":"track","event":"PageView","fields":{"$lib":"javascript","$lib_version":"0.0.1","$screen_width":375,"$screen_height":667,"$os":"MacOSX10.1","$ip":true}}}, {"event":"$web_event","properties":{"$os":"Mac OS X","$browser":"Safari","$current_url":"http://astro.hianalyst.com/","$browser_version":11.1,"$screen_height":1050,"$screen_width":1680,"$lib":"web","$lib_version":"0.0.1","distinct_id":"165cde64d485e-03a10a70743ec88-49183707-1aeaa0-165cde64d49b48","$initial_referrer":"$direct","$initial_referring_domain":"$direct","$title":"Astrology","$event_type":"pageview","$ce_version":1,"$host":"astro.hianalyst.com","$pathname":"/"},"distinct_id":"165cde64d485e-03a10a70743ec...
接下来是把这个jsons
的字串列表转化成spark的dataframe。需要用到一些Spark的底层API,下面是代码:
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> val jsonRdd = sc.parallelize(Seq(jsons))
jsonRdd: org.apache.spark.rdd.RDD[java.util.List[String]] = ParallelCollectionRDD[3] at parallelize at <console>:30
scala> val flatJsonRdd = jsonRdd.flatMap(x => x)
flatJsonRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at <console>:29
scala> val jsonDf = spark.read.json(flatJsonRdd)
warning: there was one deprecation warning; re-run with -deprecation for details
jsonDf: org.apache.spark.sql.DataFrame = [distinct_id: string, event: string ... 4 more fields]
scala>
通过上面的代码,我们最终得到了jsonDf
这个dataframe,这个里面有结构化处理后的json数据:
scala> jsonDf.take(1)
res3: Array[org.apache.spark.sql.Row] = Array([SDK为每个用户生成的唯一ID,null,[PageView,[true,javascript,0.0.1,MacOSX10.1,667,375],事件发生的时间戳,当前产品的token,track],null,null,null])
上面是dataframe里面的数据。接下来可以看看jsonDf
的结构:
scala> jsonDf.printSchema()
root
|-- distinct_id: string (nullable = true)
|-- event: string (nullable = true)
|-- info: struct (nullable = true)
| |-- event: string (nullable = true)
| |-- fields: struct (nullable = true)
| | |-- $ip: boolean (nullable = true)
| | |-- $lib: string (nullable = true)
| | |-- $lib_version: string (nullable = true)
| | |-- $os: string (nullable = true)
| | |-- $screen_height: long (nullable = true)
| | |-- $screen_width: long (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- token: string (nullable = true)
| |-- type: string (nullable = true)
|-- ip: long (nullable = true)
|-- properties: struct (nullable = true)
| |-- $browser: string (nullable = true)
| |-- $browser_version: double (nullable = true)
| |-- $ce_version: long (nullable = true)
| |-- $current_url: string (nullable = true)
| |-- $device: string (nullable = true)
| |-- $el_attr__href: boolean (nullable = true)
| |-- $el_text: string (nullable = true)
| |-- $elements: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- attr__aria-disabled: string (nullable = true)
| | | |-- attr__aria-selected: string (nullable = true)
| | | |-- attr__class: string (nullable = true)
| | | |-- attr__id: string (nullable = true)
| | | |-- attr__role: string (nullable = true)
| | | |-- attr__style: string (nullable = true)
| | | |-- classes: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- nth_child: long (nullable = true)
| | | |-- nth_of_type: long (nullable = true)
| | | |-- tag_name: string (nullable = true)
| |-- $event_type: string (nullable = true)
| |-- $host: string (nullable = true)
| |-- $initial_referrer: string (nullable = true)
| |-- $initial_referring_domain: string (nullable = true)
| |-- $lib: string (nullable = true)
| |-- $lib_version: string (nullable = true)
| |-- $os: string (nullable = true)
| |-- $pathname: string (nullable = true)
| |-- $screen_height: long (nullable = true)
| |-- $screen_width: long (nullable = true)
| |-- $title: string (nullable = true)
| |-- distinct_id: string (nullable = true)
| |-- yyks_browser: string (nullable = true)
| |-- yyks_page: string (nullable = true)
| |-- yyks_platform: string (nullable = true)
|-- timestamp: long (nullable = true)
scala>
有了解析后的结构,我们还可以用sql引擎来分析数据:
scala> jsonDf.createOrReplaceTempView("jsonTbl")
scala> spark.sql("select count() from jsonTbl")
res14: org.apache.spark.sql.DataFrame = [properties: struct<$browser: string, $browser_version: double ... 21 more fields>]
scala> spark.sql("select count(*) from jsonTbl").show()
+--------+
|count(1)|
+--------+
| 30|
+--------+
scala> spark.sql("select properties.distinct_id from jsonTbl").show()
+--------------------+
| distinct_id|
+--------------------+
| null|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
+--------------------+
only showing top 20 rows
scala>
可以看到,我们已经可以结构化处理json数据了。以上就是整个分析的全过程。
- 上一篇 学习编译原理的脉络
- 下一篇 Spark的流式数据处理