-
Notifications
You must be signed in to change notification settings - Fork 3
/
girvan_newman_fast_version.py
422 lines (318 loc) · 16.2 KB
/
girvan_newman_fast_version.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
from pyspark import SparkContext, StorageLevel
from pyspark.sql import SparkSession
import sys
import json
import csv
import itertools
from time import time
import math
import random
import os
import matplotlib
from matplotlib import pyplot as plt
import numpy as np
def process(entry):
revisedEntries= entry[0].split(',')
return (revisedEntries[0], revisedEntries[1])
def convertValuesToTuple(entrySet):
newEntrySet = []
for entry in entrySet:
newEntrySet += [(entry, 1)]
return newEntrySet
def generate_community_users(user_business_map, filter_threshold):
nearby_users_map = {}
users = user_business_map.keys()
for u1 in users:
related_users = set()
for u2 in users:
if u1 != u2:
u1_businesses = set(user_business_map.get(u1))
u2_businesses = set(user_business_map.get(u2))
common_businesses = u1_businesses.intersection(u2_businesses)
if len(common_businesses) >= filter_threshold:
# non_common_businesses = set(u1_businesses.difference(u2_businesses)).union(set(u2_businesses.difference(u1_businesses)))
related_users.add(u2)
if len(related_users) > 0:
nearby_users_map.update({u1:related_users})
return nearby_users_map
def generate_betweenness_map(root, nearby_users_map):
# final result
edges_betweenness_map = {}
# map for all nodes at each level
level_nodes_map = {}
# map for all child->{parents} to calculate partial credit in case of multiple shortest paths
child_parents_map = {}
current_level = 0
current_child_nodes = set([root])
level_nodes_map.update({root:current_level})
# start traversing in bfs - steps 1 & 2 of Girvan-Newman
while len(current_child_nodes) > 0:
current_level += 1
# children for next iteration
children_for_next_iter = set()
for child in current_child_nodes:
# print("Current child: ", child)
current_grandchildren = nearby_users_map.get(child)
# print("Current grandchildren: ", current_grandchildren)
# because we need al the nodes at next level and one node can be reachable through multiple parents
# children_for_next_iter = children_for_next_iter.union(current_grandchildren)
for grandchild in current_grandchildren:
# encountering this node for the first time
if not level_nodes_map.__contains__(grandchild):
level_nodes_map.update({grandchild:current_level})
child_parents_map.update({grandchild:set([child])})
children_for_next_iter.add(grandchild)
# condition required to eliminate siblings to not include them in parent map
# as original map contains all neighbours not just parents
# This stage won't be reached until the node is once added to the map by it's parent
elif level_nodes_map.get(grandchild) == level_nodes_map.get(child)+1:
# print("Current Parents: ", child_parents_map.get(grandchild))
child_parents_map.get(grandchild).add(child)
# print("Updated_Parents: ", child_parents_map.get(grandchild))
# child_parents_map.update({grandchild:new_parents})
current_child_nodes = children_for_next_iter.difference(set([root]))
# print("Nodes for next iteration: ", current_child_nodes)
# traversal completed
# Step 3 of Girvan-Newman
# all nodes not occurring as parents in child_parents_map are leaf nodes
# flatten all values and get unique
# unique_parents = set([parent for parent_group in child_parents_map.values() for parent in parent_group])
# all nodes except root - as root is not assigned any value
all_nodes = set(level_nodes_map.keys())#.difference(set([root]))
# leaf_nodes = all_nodes.difference(unique_parents)
# calculate partial credits for each node starting from leaf node, each node has at least 1 credit other than root
node_partial_credits_map = {}
# Each leaf node in the DAG gets a credit of 1
for node in all_nodes:
node_partial_credits_map.update({node:1.0})
# print(current_level)
# current_level is one before the last level
while current_level > 0:
current_level_nodes = [k for k,v in level_nodes_map.items() if v == current_level]
# Each node that is not a leaf gets credit = 1 + sum of credits of the DAG edges from that node to level below
for node in current_level_nodes:
# Equally divide the current node's credit for all its parents
partial_credit = float(node_partial_credits_map.get(node))/len(child_parents_map.get(node))
parents_of_current_node = child_parents_map.get(node)
for parent in parents_of_current_node:
# Add current nodes share of credit to all its parents
updated_credit = node_partial_credits_map.get(parent) + partial_credit
node_partial_credits_map.update({parent:updated_credit})
# build betweenness for all edges going out from current node to it's parents,
# as we have their total weight calculated at this point because of bottom-up approach
edges_betweenness_map.update({(node, parent):partial_credit})
current_level -= 1
return edges_betweenness_map
def generate_betweenness_result(nearby_users_map):
all_nodes = nearby_users_map.keys()
users_betweenness_map = {}
final_result = set()
for root in all_nodes:
edges_betweenness_map = generate_betweenness_map(root, nearby_users_map)
for edge, betweennness in edges_betweenness_map.items():
if edge in users_betweenness_map.keys():
updated_betweenness = users_betweenness_map.get(edge)+ betweennness
users_betweenness_map.update({edge:updated_betweenness})
else:
users_betweenness_map.update({edge:betweennness})
for edge, betweennness in users_betweenness_map.items():
final_result.add((tuple(sorted(list(edge))),float(betweennness)/2))
# final_result.append((edge, float(betweennness)/2))
# users_betweenness_map.update({edge:float(betweennness)/2})
final_users_betweenness_map = sc.parallelize(final_result)\
.sortBy(lambda entry: (-entry[1], entry[0][0]))\
.collectAsMap()
return final_users_betweenness_map
def generate_adjacency_matrix(nearby_users_map):
users = nearby_users_map.keys()
adjacency_matrix = {}
for u1 in users:
for u2 in users:
if set(nearby_users_map.get(u1)).__contains__(u2):
adjacency_matrix.update({tuple(sorted([u1,u2])):1.0})
else:
adjacency_matrix.update({tuple(sorted([u1, u2])): 0.0})
return adjacency_matrix
def generate_degree_matrix(nearby_users_map):
users = nearby_users_map.keys()
degree_matrix = {}
for u1 in users:
degree_matrix.update({u1:len(nearby_users_map.get(u1))})
return degree_matrix
def generate_user_clusters(graph, vertices):
adjacent_users_map_copy = graph.copy()
clusters = set()
unique_users = set(graph.keys())
for user in unique_users:
current_members = set()
current_cluster = set()
if user in adjacent_users_map_copy.keys():
current_members = adjacent_users_map_copy.get(user)
current_cluster = set([user])
# current_cluster = set()
while len(current_members) > 0:
members_for_next_iteration = set()
for current_member in current_members:
current_cluster.add(current_member)
if current_member in adjacent_users_map_copy.keys():
members_for_next_iteration = members_for_next_iteration.union(set(adjacent_users_map_copy.get(current_member)))
adjacent_users_map_copy.pop(current_member)
current_members = members_for_next_iteration.difference(set([user]))
if len(current_cluster) > 0:
clusters.add(tuple(sorted(list(current_cluster))))
return clusters
def run_girvan_newman(graph, adjacency_matrix, degree_matrix, m, users_betweenness_map, nearby_users_map, vertices):
number_of_clusters = len(graph)
clusters = graph
# calculate Modularity Q based on formula:
current_q = 0
# previous_q = 0
# available_edges = list(users_betweenness_map.keys())
# num_available_edges = len(available_edges)
max_q = float('-inf')
max_clusters = {}
# index = 0
while users_betweenness_map:
# previous_q = current_q
# we updated the clusters after the iteration or first time, we need to calculate Q
# if number_of_clusters != len(clusters) or index == 0:
# number_of_clusters = len(clusters)
total_minus_expected = 0
for cluster in clusters:
cl = list(cluster)
for u1 in cl:
for u2 in cl:
if u1 < u2:
aij = adjacency_matrix.get((u1, u2))
ki = degree_matrix.get(u1)
kj = degree_matrix.get(u2)
mod_sum = aij - (float(ki * kj) / (2 * m))
total_minus_expected += mod_sum
current_q = float(total_minus_expected) / (2 * m)
if current_q > max_q:
max_q = current_q
max_clusters = clusters
# if index == 0:
# previous_q = current_q
# calculation for q completed
# get the edge with maximum betweenness to be dropped to divide into communities
edges_to_drop = []
max_betweenness = max(users_betweenness_map.values())
for edge_to_drop in users_betweenness_map.keys():
if users_betweenness_map.get(edge_to_drop) == max_betweenness:
edges_to_drop.append(edge_to_drop)
# edge_to_drop_betweenness = users_betweenness_map.get(edge_to_drop)
# update all the matrices to remove this edge
# adjacency_matrix.pop(edge_to_drop)
if degree_matrix.get(edge_to_drop[0]) > 0:
degree_matrix.update({edge_to_drop[0]: (degree_matrix.get(edge_to_drop[0]) - 1)})
if degree_matrix.get(edge_to_drop[1]) > 0:
degree_matrix.update({edge_to_drop[1]: (degree_matrix.get(edge_to_drop[1]) - 1)})
# remove users from each others connected list
updated_u1 = set(nearby_users_map.get(edge_to_drop[0])).difference(set([edge_to_drop[1]]))
# print("Original node1 length: ", len(nearby_users_map.get(edge_to_drop[0])))
nearby_users_map.update({edge_to_drop[0]: updated_u1})
# print("Updated node1 length: ", len(nearby_users_map.get(edge_to_drop[0])))
# print("Original node2 length: ", len(nearby_users_map.get(edge_to_drop[1])))
updated_u2 = set(nearby_users_map.get(edge_to_drop[1])).difference(set([edge_to_drop[0]]))
nearby_users_map.update({edge_to_drop[1]: updated_u2})
# print("Updated node2 length: ", len(nearby_users_map.get(edge_to_drop[1])))
for edge in edges_to_drop:
del users_betweenness_map[edge]
# users_betweenness_map = generate_betweenness_result(nearby_users_map)
# print("Previous num of clusters: " , number_of_clusters)
clusters = generate_user_clusters(nearby_users_map, vertices)
# print("Current number of clusters: ", len(clusters))
# m = len(users_betweenness_map.keys())
# index += 1
return max_clusters
if len(sys.argv) != 5:
print("Usage: spark-submit firstname_lastname_task2.py <filter_threshold> <input_file_path> <betweenness_output_file_path> <community_output_file_path>")
exit(-1)
else:
filter_threshold = int(sys.argv[1])
input_file_path = sys.argv[2]
betweenness_output_file_path = sys.argv[3]
community_output_file_path = sys.argv[4]
result = []
SparkContext.setSystemProperty('spark.executor.memory', '4g')
SparkContext.setSystemProperty('spark.driver.memory', '4g')
SparkContext.setSystemProperty('spark.sql.shuffle.partitions', '4')
sc = SparkContext('local[*]', 'task2')
ss = SparkSession(sc)
# input_file_path = "C:/Users/mansi/PycharmProjects/Mansi_Ganatra_HW4/ub_sample_data.csv"
# betweenness_output_file_path = "C:/Users/mansi/PycharmProjects/Mansi_Ganatra_HW4//task2__result_1.csv"
# community_output_file_path = "C:/Users/mansi/PycharmProjects/Mansi_Ganatra_HW4//task2__result_2.csv"
# filter_threshold = 7
#N_values = [2, 5, 10, 15, 20, 25, 28, 30, 35, 40]
N_values = [28]
exec_times = []
for N in N_values:
start = time()
user_businessRdd = sc.textFile(input_file_path).map(lambda entry: entry.split('\n')).map(lambda entry: process(entry)).repartition(N).cache()
headers = user_businessRdd.take(1)
finalRdd = user_businessRdd.filter(lambda entry: entry[0] != headers[0][0]).repartition(N).cache() #persist before cache
user_business_map = finalRdd\
.groupByKey(numPartitions = N)\
.mapValues(lambda entry: list(set(entry)))\
.cache()\
.collectAsMap()
# ############################## Betweenness Calculation ############################
nearby_users_map = generate_community_users(user_business_map, filter_threshold)
# print(nearby_users_map)
users_betweenness_map = generate_betweenness_result(nearby_users_map)
with open(betweenness_output_file_path, "w+") as fp:
for edge, betweenness in users_betweenness_map.items():
string_to_write = "(\'" + edge[0] + "\', \'" + edge[1] +"\'), " + str(betweenness)
fp.write(string_to_write)
fp.write("\n")
# ################################# Community Detection #############################################
# nearby_users_map = generate_community_users(user_business_map, filter_threshold)
adjacency_matrix = generate_adjacency_matrix(nearby_users_map)
degree_matrix = generate_degree_matrix(nearby_users_map)
m = len(users_betweenness_map.keys())
# split network into subgraphs by removing edge with highest betweenness
clusters = generate_user_clusters(nearby_users_map, nearby_users_map.keys())
optimized_clusters = run_girvan_newman(clusters, adjacency_matrix, degree_matrix, m, users_betweenness_map, nearby_users_map, nearby_users_map.keys())
# calculate Modularity Q based on formula:
current_q = 0
total_minus_expected = 0
for cluster in optimized_clusters:
cl = list(cluster)
for u1 in cl:
for u2 in cl:
if u1 < u2:
aij = adjacency_matrix.get((u1, u2))
ki = degree_matrix.get(u1)
kj = degree_matrix.get(u2)
mod_sum = aij - (float(ki * kj) / (2 * m))
total_minus_expected += mod_sum
current_q = float(total_minus_expected) / (2 * m)
user_communities_rdd = sc.parallelize(optimized_clusters, numSlices=N)\
.map(lambda entry:(sorted(list(entry), key= lambda x: x[0]), len(entry)))\
.sortBy(lambda entry: (entry[1], entry[0]), numPartitions=N)\
.map(lambda entry: entry[0]).repartition(N).cache()
#
user_communities = user_communities_rdd.collect()
with open(community_output_file_path, "w+") as fp:
for community in user_communities:
string_to_write = ""
for user in community[:-1]:
string_to_write += "\'" + user + "\', "
string_to_write += "\'" + community[-1] + "\'"
fp.write(string_to_write)
fp.write("\n")
fp.close()
# print(optimized_clusters)
end = time()
#print("Duration: ", end-start)
exec_times.append(end-start)
print('Total execution time:', str(end-start)+'sec')
print('The modularity is:', current_q)
print(exec_times)
fig = plt.figure(figsize=(9, 11))
plt.plot(N_values, exec_times)
plt.xlabel("N")
plt.ylabel("Execution time [s]")
plt.title("Number of partitions used in RDDs vs Execution time of Girvan-Newman algorithm")
plt.savefig('plot_girvan_newman_after.png', bbox_inches = 'tight')