初识图的连通组件(Connected Component)算法

数据科学

#1

前言

"Many practical computing problems concern large graphs."

工作中会遇到一些直接或间接的需要 Connected Component 算法才得以解决的问题,以我的工作经历为例:

1. 基站聚类:切换频繁的基站扇区自动化聚类,以方面专人重点维护聚类簇内的基站
2. 设备群落:根据设备停电故障数据,推算出电力上下游关系的树状结构
3. 上网统计:上网间隔大于两小时的才算两次上网,否则算到一次之内

这三种场景下都用到了Connected Component算法。 先来直观看看上面三个问题所属的领域:

1. 社区发掘
2. 层次聚类
3. SQL分组统计

在您的工作或者研究中,Connected Component 算法又会以什么面目出现呢?

Connected Component 定义

如上图,该图有三个子图,每个子图内的每个点至少与该子图内其余的点中一个连接,且子图之间的点没有边接,这三个子图就是上图的三个 Connected Component.

Connected Component 常见算法

DFS

DFS(depth-first search)是最直接的算法之一,其步骤如下:

1) Initialize all vertices as not visited.
2) Do following for every vertex 'v'.
       (a) If 'v' is not visited before, call DFSUtil(v)
       (b) Print new line character

DFSUtil(v)
1) Mark 'v' as visited.
2) Print 'v'
3) Do following for every adjacent 'u' of 'v'.
     If 'u' is not visited, then recursively call DFSUtil(u)

对于上图,DFS 输出的一种可能为:

0 1 2
3 4

BFS

BFS(breadth-first search)也是最直接的算法之一,这里就不赘述了。

Pregel

“Many practical computing problems concern large graphs”,Pregel 的论文开篇的这个观点是Google工程师的痛的领悟?

Programs are expressed as a sequence of iterations, in each of which a vertex can receive messages sent in the previous iteration, send messages to other vertices, and modify its own state and that ofits outgoing edges or mutate graph topology.

Pregel 是一个迭代的过程,在每次迭代中一个点根据现在的值与接收到的各个 message 来更新其值,并发送 message 给其指向的其余的点。论文给出的找出子图的点的最大值算法示例如下:

  • 上图的每个 Superstep 即是一次迭代
  • 其中灰色的点其状态为halt状态
  • 在一个迭代中,当点收到的 message 不会导致其状态更新的时候,其状态为 halt

为对 Pregel 有个更直观的认识,再举个子图的点的最大值例子:

至于消息发送规则、点的值更新规则,用户可以自定义。

Spark Graphx 实现的 Pregel

* Implements a Pregel-like bulk-synchronous message-passing API.
*
* Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage
* computation over edges, enables the message sending computation to read both vertex * attributes, and constrains messages to the graph structure.  These changes allow
* for substantially more efficient distributed execution while also exposing greater * flexibility for graph-based computation.
...
* Execute a Pregel-like iterative vertex-parallel abstraction.  The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex.  The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
...
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value.  On the first iteration the vertex program is invoked on
* all vertices and is passed the default message.  On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* * messages of type A and merges them into a single message of type
* A.  ''This function must be commutative and associative and
* ideally the size of A should not increase.''
...
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
     (graph: Graph[VD, ED],
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)
     (vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] =
  {
  ...
  }

Spark GraphX 在 Pregel 的基础上做了优化:通过边而不是点来完成点的值读取和消息的传递。这样做的好处是:消息传递的同时,读取了点的属性,且把消息约束在图的结构上。API 的两个主要参数是 vprog 和 sendMsg,分别对应了 Pregel 的点的消息接收、值更新与消息发送。mergeMsg 主要是接收消息的整合,比如接收N个消息怎么变为1个,用户自定义的规则要满足交换律和结合律,也即接收的消息无先后之分,且接收的消息可以整合为一个。

Spark 之 Connected components
object ConnectedComponents {
/*
* Compute the connected component membership of each vertex and return a graph with *
* the vertex value containing the lowest vertex id in the connected component 
* containing that vertex.
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
* @param graph the graph for which to compute the connected components
* @param maxIterations the maximum number of iterations to run for
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
                                     maxIterations: Int): Graph[VertexId, ED] = {
    require(maxIterations > 0, s"Maximum of iterations must be greater than 0," +
      s" but got ${maxIterations}")

    val ccGraph = graph.mapVertices { case (vid, _) => vid }
    def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
      if (edge.srcAttr < edge.dstAttr) {
        Iterator((edge.dstId, edge.srcAttr))
      } else if (edge.srcAttr > edge.dstAttr) {
        Iterator((edge.srcId, edge.dstAttr))
      } else {
        Iterator.empty
      }
    }
    val initialMessage = Long.MaxValue
    val pregelGraph = Pregel(ccGraph, initialMessage,
      maxIterations, EdgeDirection.Either)(
      vprog = (id, attr, msg) => math.min(attr, msg),
      sendMsg = sendMessage,
      mergeMsg = (a, b) => math.min(a, b))
    ccGraph.unpersist()
    pregelGraph
  } // end of connectedComponents

vprog 比较接收到的 message 与其属性值大小来更新其属性值;sendMsg 通过比较边的两端点的属性值大小来确定消息发送的方向与发送的值。mergeMsg 是满足上述提到的交换律和结合律的。

Connected Components in MapReduce and Beyond

Pregel 是应对单节点限制的分布式的解决方案,这个 Beyond 算法是应对极大图的一个解决方案:

Although, GraphX implementation of the algorithm works reasonably well on smaller graphs (we tested up to ~10 million nodes and ~100 million edges), but its performance quickly degraded as we tried to scale to higher numbers. Our cluster consisted of 30 nodes, each node with 24 vcores and 150GB memory, and 20Gbps of network connectivity.

图极大的情况下,数据倾斜问题就变成性能问题的主要矛盾(bootle neck):

Data Skew: – Most datasets are heavy tailed – Naive data distribution can be disastrous – In synchronous environments must wait for slowest shard • “Curse of the last reducer”

你的情况,你的选择:

Want to optimize for very large graphs – Billions of nodes, 100s of billions of edges – Typically sparse – Do not fit in memory (10s+ TBs)

Beyond 算法组件

Beyond 算法的主要思想是把图转换为星形结构,有两个算法组件:LargeStar、SmallStar.

LargeStar

对于每一个点V ,其 ID 大于 V 的 ID 的邻居连接 V 的具有最小 ID 的邻居:

现举例说明

初始状态:

以8为中心:

以7为中心:

以5为中心:

以1为中心:

以9为中心:

最终的状态:

证明这种转换算法的正确性

这里仅仅给出一个示例证明:

SmallStar

对于每一个点 V,其 ID 小于 V 的 ID 的邻居连接 V 的具有最小 ID 的邻居:

过程(略)

最终的状态:

最终算法

Repeat until convergence
  Repeat until convergence
    LargeStar
  SmallStar

改进的算法

不再等待 LargeStar 合并结束,直接交替使用两个算法组件,其迭代的轮数由 O((logn)^2) 降到 O(log(n)).

Repeat Until Convergence:
  SmallStar
  LargeStar

结语

祝大家新年快乐!
欢迎交流与指导~
参考资料有更详细的信息

参考资料

  1. https://www.geeksforgeeks.org/connected-components-in-an-undirected-graph/
  2. https://www.linkedin.com/pulse/connected-component-using-map-reduce-apache-spark-shirish-kumar
  3. https://blog.acolyer.org/2015/05/26/pregel-a-system-for-large-scale-graph-processing/
  4. https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
  5. https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
  6. https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
  7. https://kowshik.github.io/JPregel/pregel_paper.pdf
  8. https://spark-summit.org/wp-content/uploads/2015/03/SSE15-23-Ankur-Dave.pdf
  9. https://spark.apache.org/docs/latest/graphx-programming-guide.html
  10. https://en.wikipedia.org/wiki/Connected_component_(graph_theory)