在Spark中读取PostgreSQL的json数据,并进行结构化处理

Spark支持对json的结构化读取,并转化为它自己可以理解的schema;PostgreSQL支持json的数据类型。把这二者结合在一起,可以形成一个强大的数据分析架构。本文介绍在Spark中读取PostgreSQL的json数据,并进行结构化处理的方法。

关于PostgreSQL的json数据类型,可以查看这篇文档:

我们可以创建一个数据库表,它的字段类型是json的数据类型。下面是例子:

$ psql astro
psql (9.6.2)
Type "help" for help.
astro=# \d items;
                                     Table "public.items"
  Column   |            Type             |                     Modifiers
-----------+-----------------------------+----------------------------------------------------
 id        | integer                     | not null default nextval('items_id_seq'::regclass)
 timestamp | timestamp without time zone | not null default now()
 info      | json                        | not null
Indexes:
    "items_pkey" PRIMARY KEY, btree (id)

可以看到,其中info字段是json类型的。这样类型的字段,PostgreSQL是可以结构化读取的。下面是样例数据:

astro=# select info from items limit 1;
-[ RECORD 1 ]----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
info | {"distinct_id":"SDK为每个用户生成的唯一ID","info":{"token":"当前产品的token","timestamp":"事件发生的时间戳","type":"track","event":"PageView","fields":{"$lib":"javascript","$lib_version":"0.0.1","$screen_width":375,"$screen_height":667,"$os":"MacOSX10.1","$ip":true}}}

可以看到info字段里面的json数据,接下来是结构化读取里面的内容:

astro=# select info -> 'distinct_id' from items limit 1;
-[ RECORD 1 ]-------------------------
?column? | "SDK为每个用户生成的唯一ID"

astro=#

这是postgresql提供的json数据的支持能力,它在内部实际上使用树的结构来保存json数据,所以查询效率会非常高效。

Spark这边需要的是字串形式的json数据。关于从数据库读取数据进spark,可以查看之前写的文章:

按照上面的文章,把PostgreSQL数据库里面的数据读取进来:

$ spark-shell --driver-class-path postgresql-42.2.5.jar --jars /usr/local/lib/postgresql-42.2.5.jar
2018-09-23 19:16:19 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-09-23 19:16:23 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://ovpn-12-118.pek2.redhat.com:4041
Spark context available as 'sc' (master = local[*], app id = local-1537701383911).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val url = "jdbc:postgresql://localhost:5432/astro?user=weli"
url: String = jdbc:postgresql://localhost:5432/astro?user=weli

scala> val driver = "org.postgresql.Driver"
driver: String = org.postgresql.Driver
scala> val dbDataFrame = spark.read.format("jdbc").option("url", url).option("dbname", "astro").option("dbtable", "items").option("driver",  driver).load()
2018-09-23 19:16:59 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
dbDataFrame: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 1 more field]
scala> val infoDf = dbDataFrame.select("info")
infoDf: org.apache.spark.sql.DataFrame = [info: string]

scala>

以上生成了infoDf的dataFrame,但是这个dataFrame里面,json数据是做为字符串的Row格式存在的:

scala> infoDf.take(1)
res0: Array[org.apache.spark.sql.Row] = Array([{"distinct_id":"SDK为每个用户生成的唯一ID","info":{"token":"当前产品的token","timestamp":"事件发生的时间戳","type":"track","event":"PageView","fields":{"$lib":"javascript","$lib_version":"0.0.1","$screen_width":375,"$screen_height":667,"$os":"MacOSX10.1","$ip":true}}}])
scala> jsonDf.count()
res21: Long = 30

这样的数据并没有解析里面的json结构。我们需要把它转化成Spark的json结构。下面是一个初步探讨的路子,后续可能有更优化的解法。

首先是把数据从Row的列表转成字串的列表,下面是代码:

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> val jsons = infoDf.as(Encoders.STRING).collectAsList();
jsons: java.util.List[String] = [{"distinct_id":"SDK为每个用户生成的唯一ID","info":{"token":"当前产品的token","timestamp":"事件发生的时间戳","type":"track","event":"PageView","fields":{"$lib":"javascript","$lib_version":"0.0.1","$screen_width":375,"$screen_height":667,"$os":"MacOSX10.1","$ip":true}}}, {"event":"$web_event","properties":{"$os":"Mac OS X","$browser":"Safari","$current_url":"http://astro.hianalyst.com/","$browser_version":11.1,"$screen_height":1050,"$screen_width":1680,"$lib":"web","$lib_version":"0.0.1","distinct_id":"165cde64d485e-03a10a70743ec88-49183707-1aeaa0-165cde64d49b48","$initial_referrer":"$direct","$initial_referring_domain":"$direct","$title":"Astrology","$event_type":"pageview","$ce_version":1,"$host":"astro.hianalyst.com","$pathname":"/"},"distinct_id":"165cde64d485e-03a10a70743ec...

接下来是把这个jsons的字串列表转化成spark的dataframe。需要用到一些Spark的底层API,下面是代码:

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> val jsonRdd = sc.parallelize(Seq(jsons))
jsonRdd: org.apache.spark.rdd.RDD[java.util.List[String]] = ParallelCollectionRDD[3] at parallelize at <console>:30

scala> val flatJsonRdd = jsonRdd.flatMap(x => x)
flatJsonRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at <console>:29

scala> val jsonDf = spark.read.json(flatJsonRdd)
warning: there was one deprecation warning; re-run with -deprecation for details
jsonDf: org.apache.spark.sql.DataFrame = [distinct_id: string, event: string ... 4 more fields]

scala>

通过上面的代码,我们最终得到了jsonDf这个dataframe,这个里面有结构化处理后的json数据:

scala> jsonDf.take(1)
res3: Array[org.apache.spark.sql.Row] = Array([SDK为每个用户生成的唯一ID,null,[PageView,[true,javascript,0.0.1,MacOSX10.1,667,375],事件发生的时间戳,当前产品的token,track],null,null,null])

上面是dataframe里面的数据。接下来可以看看jsonDf的结构:

scala> jsonDf.printSchema()
root
 |-- distinct_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- info: struct (nullable = true)
 |    |-- event: string (nullable = true)
 |    |-- fields: struct (nullable = true)
 |    |    |-- $ip: boolean (nullable = true)
 |    |    |-- $lib: string (nullable = true)
 |    |    |-- $lib_version: string (nullable = true)
 |    |    |-- $os: string (nullable = true)
 |    |    |-- $screen_height: long (nullable = true)
 |    |    |-- $screen_width: long (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- token: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- ip: long (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- $browser: string (nullable = true)
 |    |-- $browser_version: double (nullable = true)
 |    |-- $ce_version: long (nullable = true)
 |    |-- $current_url: string (nullable = true)
 |    |-- $device: string (nullable = true)
 |    |-- $el_attr__href: boolean (nullable = true)
 |    |-- $el_text: string (nullable = true)
 |    |-- $elements: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- attr__aria-disabled: string (nullable = true)
 |    |    |    |-- attr__aria-selected: string (nullable = true)
 |    |    |    |-- attr__class: string (nullable = true)
 |    |    |    |-- attr__id: string (nullable = true)
 |    |    |    |-- attr__role: string (nullable = true)
 |    |    |    |-- attr__style: string (nullable = true)
 |    |    |    |-- classes: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- nth_child: long (nullable = true)
 |    |    |    |-- nth_of_type: long (nullable = true)
 |    |    |    |-- tag_name: string (nullable = true)
 |    |-- $event_type: string (nullable = true)
 |    |-- $host: string (nullable = true)
 |    |-- $initial_referrer: string (nullable = true)
 |    |-- $initial_referring_domain: string (nullable = true)
 |    |-- $lib: string (nullable = true)
 |    |-- $lib_version: string (nullable = true)
 |    |-- $os: string (nullable = true)
 |    |-- $pathname: string (nullable = true)
 |    |-- $screen_height: long (nullable = true)
 |    |-- $screen_width: long (nullable = true)
 |    |-- $title: string (nullable = true)
 |    |-- distinct_id: string (nullable = true)
 |    |-- yyks_browser: string (nullable = true)
 |    |-- yyks_page: string (nullable = true)
 |    |-- yyks_platform: string (nullable = true)
 |-- timestamp: long (nullable = true)


scala>

有了解析后的结构,我们还可以用sql引擎来分析数据:

scala> jsonDf.createOrReplaceTempView("jsonTbl")
scala> spark.sql("select count() from jsonTbl")
res14: org.apache.spark.sql.DataFrame = [properties: struct<$browser: string, $browser_version: double ... 21 more fields>]
scala> spark.sql("select count(*) from jsonTbl").show()
+--------+
|count(1)|
+--------+
|      30|
+--------+
scala> spark.sql("select properties.distinct_id from jsonTbl").show()
+--------------------+
|         distinct_id|
+--------------------+
|                null|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
|165cde64d485e-03a...|
+--------------------+
only showing top 20 rows


scala>

可以看到,我们已经可以结构化处理json数据了。以上就是整个分析的全过程。