Wednesday, November 4, 2015

Applying Spark GraphX to detect influence in corporate networks

Introduction




Graphs are considered the most ubiquitous natural structures. The first scientific paper on graph theory is considered to be a solution of the historical seven bridges of Königsberg problem, written by Swiss mathematician Leohnard Euler in 1774. The seven bridges problem started what is now known as graph theory, and its model and algorithms are now applied across wide scientific and engineering disciplines.
There are a lot of examples of graph usage in modern science. For example, graphs are used to describe the relative positions of the atoms and molecules in chemical compounds.
The laws of atomic connections in physics are also described using graphs. In
biology, the evolution tree is actually a special form of graph. In linguistics, graphs are used to model semantic rules and syntax trees. Network analysis is based on graph theory as well, with applications in traffic modeling, telecommunications, and topology.
In computer science, a lot of problems are modeled as graphs: social networks, access control lists, network topologies, taxonomy hierarchies and recommendation engines among a few.


What is Graph ?


A graph is a set of nodes or vertices {V} and a set of edges or lines .If an edge exists {A,B}  then we can say that nodes A  and B are related to each other.The edges themselves can be unordered pairs of nodes or in a directed graph (digraph ), ordered pairs of nodes where each edge has a direction, sometimes called an arc. Graphs are (generally) non-reflexive; nodes are not related to themselves.
 
Applicability to Social Network of “Corporate Members”



Interrelation of Corporate boards can be looked upon and easily mapped to social network, thus we can apply most algorithms from social network to board member networks.
Board relations ( and for that matter any human capital relation ) can  be viewed as “big data” problem by the fact that  volume , layers and dimensionality pose technical challenges if one wants to mine patterns in such networks. Applying Graph algorithms is a natural choice for handling such challenges. Consider the following example quite often mentioned in social network analysis literature. In 15th century Florence the Medici family emerged as the most powerful and ended up dominating trade in the area. However to start with the family was less powerful than many of the other important families, so how did they achieve so much? It has been proposed [1] that it was their position in the Florentine social network that propelled their success.
Hereby I had attempted to run a “thought experiment” based on dataset of corporate boards as used by “LittleSig.org” in order to establish “most powerful” board members.

Getting the Data

 




The data is on form of SQL dump and was obtained from website "TheyRule.net" (http://www.theyrule.net) which is an effort to provide some public transparency as to the list of powerful board members. Disclaimer: This is very old data ( from 2004 ) , which is stale and it serves hereby just as illustration. However It can be easily applied and adapted to any hierarchical social network. The data comes as SQL inserts and describing how to create or import into database is out of scope. After importing into MySQL the schemas look like that:


mysql> describe us2004_companies;
+-------------+--------------+------+-----+---------+----------------+
| Field       | Type         | Null | Key | Default | Extra          |
+-------------+--------------+------+-----+---------+----------------+
| id          | int(11)      | NO   | PRI | NULL    | auto_increment |
| companyname | varchar(128) | NO   |     |         |                |
| bod         | varchar(255) | NO   |     |         |                |
| url         | varchar(255) | NO   |     |         |                |
| objects     | mediumtext   | NO   |     | NULL    |                |
| BOD_URL     | varchar(255) | NO   |     |         |                |
| symbol      | varchar(50)  | NO   |     |         |                |
+-------------+--------------+------+-----+---------+----------------+
7 rows in set (0.01 sec)

mysql> describe us2004_directors;
+--------------+--------------+------+-----+---------+----------------+
| Field        | Type         | Null | Key | Default | Extra          |
+--------------+--------------+------+-----+---------+----------------+
| id           | int(11)      | NO   | PRI | NULL    | auto_increment |
| firstname    | varchar(128) | NO   |     |         |                |
| lastname     | varchar(128) | NO   |     |         |                |
| gender       | char(1)      | NO   |     | m       |                |
| boards       | varchar(255) | NO   |     |         |                |
| url          | varchar(255) | NO   |     |         |                |
| institutions | varchar(255) | NO   |     |         |                |
+--------------+--------------+------+-----+---------+----------------+
7 rows in set (0.00 sec)


 Loading MySQL data into Spark Dataframes

 

Start Spark Shell.Make sure that you include the relevant JDBC driver in the classpath.In this case.I had used mysql jdbc driver:
$SPARK_HOME/bin/spark-shell --jars mysql-connector-java-5.1.27.jar 
After your spark shell start import the following:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.MurmurHash
import org.apache.spark.{SparkConf, SparkContext}

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._ 
Next we create DataFrame using jdbc specifics as defined in Spark Scala API:
def jdbc(url: String, table: String, predicates: Array[String],
connectionProperties: Properties): DataFrame
Construct a DataFrame representing the database table accessible
via JDBC URL url named table using connection properties.

The best way is to pass a Map for each table with necessary connection details as follows:
val compMap = Map ("url" ->"jdbc:mysql://localhost:3306/sec",
"driver"->"com.mysql.jdbc.Driver",
"dbtable"->"us2004_companies" , 
"user" ->"DBUSER" , 
"password"->"MYSQL PSWD")
val compDF=sqlContext.read.format("jdbc").options(compMap).load()
Out of the created Dataframe we need to select only columns of interest:
val comps=compDF.select(compDF("id"),
compDF("companyname"),compDF("bod"))
The result is
comps: org.apache.spark.sql.DataFrame = 
[id: int, companyname: string, bod: string]

Transforming the Data




When working with dataframes it is good idea to create custom classes to deal with data transformation and manipulation. In this case I had created two such classes: Director and Company and three auxiliary methods to facilitate mapping. Here Murmurhash library is used to generate unique integer out of the provided String.


case class Company 
(companyid:Int,companyName:String,boardMember:Int)
case class Director (id:Int,Name:String,memberOf:Int)
def toCompany(id:Int,name:String,bod:String):Company=
{Company(id.toInt,name,bod.toInt)}
def toDirector(id:Int,name:String,bod:String):Director=
{Director(id.toInt,name,bod.trim.toInt)}
def toEdge(name:String,company:String):Edge[Int]=
{ Edge( MurmurHash.stringHash( company), 
MurmurHash.stringHash( name) , MurmurHash.stringHash( name) ) }
After that we apply chain of transformations to the Dataframe comps:
val compRDD1=comps.map(t=>
(t.getAs[Int]("id"),
t.getAs[String]("companyname"),t.getAs[String]("bod")
.split(",")))
//compRDD1: org.apache.spark.rdd.RDD[(Int, String, Array[String])]

val compMapDF=compRDD1.flatMap(comp=>
{comp._3.map(toCompany(comp._1,comp._2,_))})
.toDF
//"flatMap" creates compMapDF: org.apache.spark.rdd.RDD[Company]
//and "toDF" creates:
//compMapDF: org.apache.spark.sql.DataFrame: 
//[companyid: int, companyName: string, boardMember: int]


What is going on in the above code? We need to be aware of several things when delaing with dataframes. One is that anytime we use "map/flatMap" , the transformation returns an RDD (Returns a new RDD by applying a function to all rows of this DataFrame.) and therfore the need to use toDF call to transfer back to DataFrame. Another is the way access fields/columns of DataFrame : DataFrame consists of spark.sql.Row object and when accessing it we need to explicitly convert it to the expected type. We do the same steps for the us2004_directors table
val options = Map ("url" ->"jdbc:mysql://localhost:3306/sec",
"driver"->"com.mysql.jdbc.Driver",
"dbtable"->"us2004_directors" ,
"user" ->"USER" ,
"password"->"PSWD")
val dirDF=sqlContext.read.format("jdbc").options(options).load()
val dirnames=dirDF.select(dirDF("id"),dirDF("firstname"),
dirDF("lastname"),dirDF("boards"))
val directorsRDD =dirnames.map(t=>(t.getAs[Int]("id"),
t.getAs[String]("lastname")+" "+t.getAs[String]("firstname"),
t.getAs[String]("boards").split(",") )  )
.filter(_._3.length>1)
val dirMapDF=directorsRDD.
flatMap(dir=>{dir._3.map(toDirector(dir._1,dir._2,_))})
.toDF


After we had created the above dataframes, we will register them as tables so that we can run queries on them. First we will combine data using inner join in a very similiar fashion as a typical SQL join query as follows:

compMapDF.registerTempTable("companies")
dirMapDF.registerTempTable("directors")
val joined = 
sqlContext.sql("SELECT distinct Name ,companyName,memberOf 
FROM companies c ,  directors d 
where d.memberOf=c.companyid")
We will also need to do some cleanup and remove duplicates so that we work with clean data:
val companiessdistinct=joined.select(joined("companyName")).distinct
val namesdistinct=joined.select(joined("Name")).distinct
val edges=joined.map(rec=>
(toEdge(rec.getAs[String]("Name") , 
rec.getAs[String]("companyName") ) ) )


In GraphX each Node is represented by unique numerical value - VertexId, followed by some other tuple data, typically String.Here we need to convert the above into unique numbers so that we can represent them as VertexRDDs. We utilize Scala's MurmurHash library to do that, as follows:

val dirVertices:RDD[(VertexId, String)]=
namesdistinct.map(
v=>
(MurmurHash.stringHash(v.getAs[String]("Name")),
v.getAs[String]("Name")))

val compVertices:RDD[(VertexId, String)]=
companiessdistinct.map(
v=>
(MurmurHash.stringHash(v.getAs[String]("companyName")),
v.getAs[String]("companyName")))

Next we combine them so that we have all vertices:
val allVertices=dirVertices.union(compVertices)

Now we are ready to build the graph. In Spark Graph constructor takes some default vertex which has to match the Tuple type of other vertices ( in our case String):
val default= ("NoName")
val graph = Graph(allVertices,edges, default)
graph.persist()

Now we can run some analyticss and obtain individuals with many incoming connection - i.e "popularity" :
val influential=graph.inDegrees.join(allVertices).
sortBy(_._2._1, ascending=false).
take(1)
println("POPULAR:"+influential.mkString(","))
The result is something like:
influential:Array[(org.apache.spark.graphx.VertexId,(Int, String))]=
Array((56914485,(8,Gray, III William H.)))
POPULAR:(56914485,(8,Gray, III William H.))


However popularity does not always mean influence. To measure real impact or clout of Node in a connected Graph, two brainy guys - Larry Page and Sergey Brin back in the day came up with an algorithm called "PageRank". We can apply the mechanism of PafeRank to find which board member has the biggest "clout" based on calculated PageRank score. To use PageRank in Spark one need to specify tolerance which actually represents the convergence. We had used "0.0001"

val ranks = graph.pageRank(0.0001).vertices
This calculation might take a while depending on how big the graph is. Once the result is obtained we can use it to join with the population of all vertices and sort in descending order to obtain the "biggest" influencer:
val ranksDirectors = ranks.join(allVertices).
sortBy(_._2._1,ascending=false).map(_._2._2)
scala> ranksDirectors.take(1)
res5: Array[String] = Array(Ward Jackie M.)