Spark GraphX

Learning Objectives

  • Understand composition of a graph in Spark GraphX.
  • Being able to create a graph.
  • Being able to use the built-in graph algorithm.

In this section we begin by creating a graph with patient and diagnostic codes. Later we will show how to run graph algorithms on the the graph you will create.

Basic concept

Spark GraphX abstracts a graph as a concept named Property Graph, which means that each edge and vertex is associated with some properties. The Graph class has the following definition

class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

Where VD and ED define property types of each vertex and edge respectively. We can regard VertexRDD[VD] as RDD of (VertexID, VD) tuple and EdgeRDD[ED] as RDD of (VertexID, VertexID, ED).

Graph construction

Let's create a graph of patients and diagnostic codes. For each patient we can assign its patient id as vertex property, and for each diagnostic code, we will use the code as vertex property. For the edge between patient and diagnostic code, we will use number of times the patient is diagnosed with given disease as edge property.

Define class

Let's first define necessary data structure and import

import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

abstract class VertexProperty extends Serializable

case class PatientProperty(patientId: String) extends VertexProperty

case class DiagnosticProperty(icd9code: String) extends VertexProperty

case class PatientEvent(patientId: String, eventName: String, date: Int, value: Double)

Load raw data

Load patient event data and filter out diagnostic related events only

val allEvents = sc.textFile("data/").
  map(_.split(",")).
  map(splits => PatientEvent(splits(0), splits(1), splits(2).toInt, splits(3).toDouble))

// get and cache diagnosticEvents as we will reuse
val diagnosticEvents = allEvents.
  filter(_.eventName.startsWith("DIAG")).cache()

Create vertex

Patient vertex

Let's create patient vertex

// create patient vertex
val patientVertexIdRDD = diagnosticEvents.
  map(_.patientId).
  distinct.      // get distinct patient ids
  zipWithIndex     // assign an index as vertex id

val patient2VertexId = patientVertexIdRDD.collect.toMap
val patientVertex = patientVertexIdRDD.
  map{case(patientId, index) => (index, PatientProperty(patientId))}.
  asInstanceOf[RDD[(VertexId, VertexProperty)]]

In order to use the newly created vertex id, we finally collect all the patient to VertrexID mapping.

WARNING

Theoretically, collecting RDD to driver is not an efficient practice. One can obtain uniqueness of ID by calculating ID directly with a Hash.

Diagnostic code vertex

Similar to patient vertex, we can create diagnostic code vertex with

// create diagnostic code vertex
val startIndex = patient2VertexId.size
val diagnosticVertexIdRDD = diagnosticEvents.
  map(_.eventName).
  distinct.
  zipWithIndex.
  map{case(icd9code, zeroBasedIndex) =>
    (icd9code, zeroBasedIndex + startIndex)} // make sure no conflict with patient vertex id

val diagnostic2VertexId = diagnosticVertexIdRDD.collect.toMap

val diagnosticVertex = diagnosticVertexIdRDD.
  map{case(icd9code, index) => (index, DiagnosticProperty(icd9code))}.
  asInstanceOf[RDD[(VertexId, VertexProperty)]]

Here we assign vertex id by adding the result of zipWithIndex with an offset obtained from previous patient vertex to avoid ID confliction between patient and diagnostic code.

Create edge

In order to create edge, we will need to know vertext id of vertices we just created.

val bcPatient2VertexId = sc.broadcast(patient2VertexId)
val bcDiagnostic2VertexId = sc.broadcast(diagnostic2VertexId)

val edges = diagnosticEvents.
  map(event => ((event.patientId, event.eventName), 1)).
  reduceByKey(_ + _).
  map{case((patientId, icd9code), count) => (patientId, icd9code, count)}.
  map{case(patientId, icd9code, count) => Edge(
    bcPatient2VertexId.value(patientId), // src id
    bcDiagnostic2VertexId.value(icd9code), // target id
    count // edge property
  )}

We first broadcast patient and diagnostic code to vertext id mapping. Broadcast can avoid unnecessary copy in distributed setting thus will be more effecient. Then we count occurrence of (patient-id, icd-9-code) pairs with map and reduceByKey, finally we translate them to proper VertexID.

Assemble vertex and edge

We will need to put vertices and edges together to create the graph

val vertices = sc.union(patientVertex, diagnosticVertex)
val graph = Graph(vertices, edges)

Graph operation

Given the graph we created, we can run some basic graph operations.

Connected components

Connected component can help find disconnected subgraphs. GraphX provides the API to get connected components as below

val connectedComponents = graph.connectedComponents

The return result is a graph and assigned components of original graph is stored as VertexProperty. For example

scala> connectedComponents.vertices.take(5)
Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = 
Array((2556,0), (1260,0), (1410,0), (324,0), (180,0))

The first element of the tuple is VertexID identical to original graph. The second element in the tuple is connected component represented by the lowest-numbered VertexID in that component. In above example, five vertices belong to same component.

We can easily get number of connected components using operations on RDD as below.

scala> connectedComponents.vertices.map(_._2).distinct.collect
Array[org.apache.spark.graphx.VertexId] = Array(0, 169, 239)

Degree

The property graph abstraction of GraphX is a directed graph. It provides computation of in-dgree, out-degree and total degree. For example, we can get degrees as

val inDegrees = graph.inDegrees
val outDegrees = graph.outDegrees
val totalDegrees = graph.degrees

PageRank

GraphX also provides implementation of the famous PageRank algorithm, which can compute the 'importance' of a vertex. The graph we generated above is a bipartite graph and not suitable for PageRank. To gve an example of PageRank, we randomly generate a graph and run fixed iteration of PageRank algorithm on it.

import org.apache.spark.graphx.util.GraphGenerators


val randomGraph:Graph[Long, Int] = 
   GraphGenerators.logNormalGraph(sc, numVertices = 100)


val pagerank = randomGraph.staticPageRank(20)

Or, we can run PageRank until converge with tolerance as 0.01 using randomGraph.pageRank(0.01)

Application

Next, we show some how we can ultilize the graph operations to solve some practical problems in the healthcare domain.

Explore comorbidities

Comorbidity is additional disorders co-occuring with primary disease. We know all the case patients have heart failure, we can explore possible comorbidities as below (see comments for more explaination)

// get all the case patients
val casePatients = allEvents.
  filter(event => event.eventName == "heartfailure" && event.value == 1.0).
  map(_.patientId).
  collect.
  toSet

// broadcast
val scCasePatients = sc.broadcast(casePatients)

//filter the graph with subGraph operation
val filteredGraph = graph.subgraph(vpred = {case(id, attr) =>
    val isPatient = attr.isInstanceOf[PatientProperty]
    val patient = if(isPatient) attr.asInstanceOf[PatientProperty] else null
    // return true iff. isn't patient or is case patient
    !isPatient || (scCasePatients.value contains patient.patientId)
  })

//calculate indegrees and get top vertices
val top5ComorbidityVertices = filteredGraph.inDegrees.
    takeOrdered(5)(scala.Ordering.by(-_._2))

We have

top5ComorbidityVertices: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((3129,86), (335,63), (857,58), (2048,49), (669,48))

And we can check the vertex of index 3129 in original graph is

scala> graph.vertices.filter(_._1 == 3129).collect
Array[(org.apache.spark.graphx.VertexId, VertexProperty)] = 
Array((3129,DiagnosticProperty(DIAG4019)))

The 4019 code correponds to Hypertension, which is reasonable.

Similar patients

Given a patient diagnostic graph, we can also find similar patients. One of the most straightforward approach is shortest path on the graph.

val sssp = graph.
  mapVertices((id, _) => if (id == 0L) 0.0 else Double.PositiveInfinity).
  pregel(Double.PositiveInfinity)(
    (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
    triplet => {  // Send Message
      var msg: Iterator[(org.apache.spark.graphx.VertexId, Double)] = Iterator.empty
      if (triplet.srcAttr + 1 < triplet.dstAttr) {
        msg = msg ++ Iterator((triplet.dstId, triplet.srcAttr + 1))
      }

      if (triplet.dstAttr + 1 < triplet.srcAttr) {
        msg = msg ++ Iterator((triplet.srcId, triplet.dstAttr + 1))
      }
      println(msg)
      msg
    },
    (a,b) => math.min(a,b) // Merge Message
  )

// get top 5 most similar
sssp.vertices.filter(_._2 < Double.PositiveInfinity).
    filter(_._1 < 300).
    takeOrdered(5)(scala.Ordering.by(-_._2))