forked from eliewolfe/mDAG-analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadjmat_class.py
150 lines (111 loc) · 4.92 KB
/
adjmat_class.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from __future__ import absolute_import
import numpy as np
from sys import hexversion
if hexversion >= 0x3080000:
from functools import cached_property
elif hexversion >= 0x3060000:
from backports.cached_property import cached_property
else:
cached_property = property
# Acknowledgement to https://digitalcommons.njit.edu/cgi/viewcontent.cgi?article=2820&context=theses
class AdjMat:
@staticmethod
def adjacency_plus_method(adjmat):
return np.bitwise_or(np.asarray(adjmat, dtype=bool), np.identity(len(adjmat), dtype=bool))
@staticmethod
def adjacency_minus_method(adjmat):
return np.bitwise_and(np.asarray(adjmat, dtype=bool), np.invert(np.identity(len(adjmat), dtype=bool)))
@staticmethod
def _children_list(adjmat):
return list(map(frozenset, map(np.flatnonzero, adjmat)))
def _parents_list(self, adjmat):
return self._children_list(np.transpose(adjmat))
@staticmethod
def _children_of(adjmat, X_indices):
if hasattr(X_indices, '__iter__'):
return np.flatnonzero(np.any(adjmat[X_indices], axis=0))
else:
return np.flatnonzero(adjmat[X_indices])
def _parents_of(self, adjmat, X_indices):
return self._children_of(np.transpose(adjmat), X_indices)
def __init__(self, adjmat):
self.adjacency_plus = self.adjacency_plus_method(adjmat)
self.adjacency_minus = self.adjacency_minus_method(adjmat)
self.childrenplus_list = self._children_list(self.adjacency_plus)
self.parentsplus_list = self._parents_list(self.adjacency_plus)
self.children_list = self._children_list(self.adjacency_minus)
self.parents_list = self._parents_list(self.adjacency_minus)
self.len = len(adjmat)
@cached_property
def numeric_edges(self):
return [(parent, child) for parent in range(self.len) for child in self.children_list[parent]]
def childrenplus_of(self, X_indices):
return self._children_of(self.adjacency_plus, X_indices)
def parentsplus_of(self, X_indices):
return self._parents_of(self.adjacency_plus, X_indices)
def children_of(self, X_indices):
return self._children_of(self.adjacency_minus, X_indices)
def parents_of(self, X_indices):
return self._parents_of(self.adjacency_minus, X_indices)
@cached_property
def transitive_closure_plus(self):
n = self.len
closure_mat = self.adjacency_plus.copy()
while n > 0:
n = np.floor_divide(n, 2)
next_closure_mat = np.matmul(closure_mat, closure_mat)
if np.array_equal(closure_mat, next_closure_mat):
break
else:
closure_mat = next_closure_mat
return closure_mat
@cached_property
def transitive_closure(self):
return self.adjacency_minus_method(self.transitive_closure_plus)
@cached_property
def transitive_reduction(self):
return np.bitwise_and(self.transitive_closure, np.invert(np.matmul(self.transitive_closure, self.transitive_closure)))
@cached_property
def descendantsplus_list(self):
return self._children_list(self.transitive_closure_plus)
@cached_property
def ancestorsplus_list(self):
return self._parents_list(self.transitive_closure_plus)
@cached_property
def descendants_list(self):
return self._children_list(self.transitive_closure)
@cached_property
def ancestors_list(self):
return self._parents_list(self.transitive_closure)
@cached_property
def closure_numeric_edges(self):
return [(parent, child) for parent in range(self.len) for child in self.descendants_list[parent]]
def descendantsplus_of(self, X_indices):
return self._children_of(self.transitive_closure_plus, X_indices)
def ancestorsplus_of(self, X_indices):
return self._parents_of(self.transitive_closure_plus, X_indices)
def descendants_of(self, X_indices):
return self._children_of(self.transitive_closure, X_indices)
def ancestors_of(self, X_indices):
return self._parents_of(self.transitive_closure, X_indices)
def subadjmat(self, X_indices):
subindices = np.ix_(X_indices, X_indices)
submat = np.zeros((self.len, self.len), dtype=bool)
submat[subindices] = self.adjacency_minus[subindices]
return submat
def subhypmat(hypmat, X_indices):
submat = np.zeros_like(hypmat)
submat[:, X_indices] = hypmat[:, X_indices]
return submat
if __name__ == '__main__':
adjmat = np.asarray(np.array(
[[1, 0, 1, 1, 0, 0],
[0, 1, 0, 0, 1, 1],
[0, 1, 1, 0, 0, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1]], dtype=int), dtype=bool) + np.identity(6, dtype=bool)
test = AdjMat(adjmat)
print(test.transitive_closure.astype(int))
print(test.transitive_reduction.astype(int))
# print(np.linalg.matrix_power(adjmat, 2).astype(np.int_))