Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: merging new architecture of certtools #9

Open
wants to merge 2 commits into
base: sparsity_update
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion _scripts/generate_sparsity_results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import matplotlib

from _scripts.run_clique_study import run_hierarchy_study
from _scripts.run_examples import run_example
from _scripts.run_tightness_study import (
plot_accuracy_study_all,
Expand Down
Empty file added _test/test_admm.py
Empty file.
51 changes: 10 additions & 41 deletions _test/test_cliques.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import numpy as np
from cert_tools.hom_qcqp import HomQCQP
from cert_tools.test_tools import constraints_test, cost_test

from decomposition.generate_cliques import create_clique_list_loc
from lifters.matweight_lifter import MatWeightLocLifter
from lifters.range_only_lifters import RangeOnlyLocLifter
from poly_matrix.poly_matrix import PolyMatrix
from ro_certs.cert_matrix import get_cost_matrices
from ro_certs.generate_cliques import generate_clique_list
from ro_certs.problem import Reg


def test_clique_cost():
def test_clique_decomp():
n_params = 4
lifters = [
MatWeightLocLifter(n_landmarks=8, n_poses=n_params),
Expand All @@ -29,45 +27,16 @@ def test_clique_cost():
lifter.generate_random_setup()
lifter.simulate_y(noise=noise, sparsity=sparsity)

Q, _ = lifter.get_Q()
Constraints = [(lifter.get_A0(), 1.0)] + [
(A, 0.0) for A in lifter.get_A_learned_simple()
]
# test problem without redundant constraints
clique_list = create_clique_list_loc(
lifter,
use_known=True,
use_autotemplate=False,
add_redundant=True,
verbose=False,
)
Q_test = PolyMatrix(symmetric=False)
for c in clique_list:
Ci, __ = PolyMatrix.init_from_sparse(c.Q, var_dict=c.var_dict)
Q_test += Ci
np.testing.assert_allclose(Q.toarray(), Q_test.toarray(lifter.var_dict))
problem = HomQCQP.init_from_lifter(lifter)

# doing get_asg just to suppress a warning.
problem.get_asg()
problem.clique_decomposition()

def test_clique_creation():
new_lifter = RangeOnlyLocLifter(
n_landmarks=10, n_positions=5, d=3, reg=Reg.CONSTANT_VELOCITY
)
clique_list = create_clique_list_loc(
new_lifter, use_known=True, use_autotemplate=False
)
cost_matrices = get_cost_matrices(new_lifter.prob)
clique_list_new = generate_clique_list(new_lifter.prob, cost_matrices)
for c1, c2 in zip(clique_list_new, clique_list):
ii1, jj1 = c1.Q.nonzero()
ii2, jj2 = c2.Q.nonzero()
np.testing.assert_allclose(ii1, ii2)
np.testing.assert_allclose(jj1, jj2)
np.testing.assert_allclose(c1.Q.data, c2.Q.data)
for i, (A1, A2) in enumerate(zip(c1.A_list, c2.A_list)):
np.testing.assert_allclose(A1.toarray(), A2.toarray())
constraints_test(problem)
cost_test(problem)


if __name__ == "__main__":
test_clique_creation()
test_clique_cost()
test_clique_decomp()
print("done")
File renamed without changes.
112 changes: 87 additions & 25 deletions _test/test_sparse.py → _test/test_sdp_solvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import numpy as np
from cert_tools.admm_solvers import solve_alternating
from cert_tools.hom_qcqp import HomQCQP
from cert_tools.linalg_tools import rank_project
from cert_tools.sdp_solvers import solve_sdp
from cert_tools.sparse_solvers import solve_oneshot
from cert_tools.sdp_solvers import solve_sdp, solve_sdp_homqcqp
from cert_tools.sparse_solvers import solve_dsdp, solve_oneshot

from decomposition.generate_cliques import create_clique_list_loc
from decomposition.sim_experiments import extract_solution, get_relative_gap
from lifters.matweight_lifter import MatWeightLocLifter
from lifters.range_only_lifters import RangeOnlyLocLifter, Reg
Expand All @@ -23,6 +23,44 @@
VERBOSE = False


def test_matrices():
def test_equal(Q_list, Q_list_test):
for Q, Q_test in zip(Q_list, Q_list_test):
np.testing.assert_allclose(Q.toarray(), Q_test.toarray())

n_params = 4
lifters = [
MatWeightLocLifter(n_landmarks=8, n_poses=n_params),
RangeOnlyLocLifter(
n_landmarks=8,
n_positions=n_params,
reg=Reg.CONSTANT_VELOCITY,
d=2,
level="no",
),
]
noises = [1.0, 1e-4]
sparsity = 1.0
seed = 0
for lifter, noise in zip(lifters, noises):

np.random.seed(seed)
lifter.generate_random_setup()
lifter.simulate_y(noise=noise, sparsity=sparsity)

Q = lifter.get_Q_from_y(lifter.y_)
A_list = lifter.get_A_learned_simple()

problem = HomQCQP.init_from_lifter(lifter)
test_equal([problem.C.get_matrix(lifter.var_dict)], [Q])
test_equal([A.get_matrix(lifter.var_dict) for A in problem.As], A_list)

Q_test, Constraints_list_test = problem.get_problem_matrices()
A_list_test = [C[0] for C in Constraints_list_test]
test_equal([Q], [Q_test])
test_equal(A_list, A_list_test)


def test_highlevel():
n_params = 4
lifters = [
Expand Down Expand Up @@ -68,7 +106,7 @@ def test_highlevel():
).items():
assert val < TEST_OPT, f"error {key} failed for local"

## =================== solve with SDP =================== ##
## =================== solve with SDP the old way =================== ##
Q, _ = lifter.get_Q()
Constraints = [(lifter.get_A0(), 1.0)] + [
(A, 0.0) for A in lifter.get_A_learned_simple()
Expand All @@ -82,56 +120,79 @@ def test_highlevel():
tol=TOL_SDP,
verbose=VERBOSE,
)
rdg = get_relative_gap(info["cost"], cost_gt)

rdg = get_relative_gap(info["cost"], cost_gt)
x_SDP, info_rank = rank_project(X, p=1)
theta_SDP = lifter.get_theta_from_x(x=x_SDP)
for key, val in lifter.get_error(
theta_hat=theta_SDP, theta_gt=theta_opt
).items():
assert val < TEST_SDP, f"error {key} failed for SDP"

assert info_rank["EVR"] > EVR_THRESH
print(f"SDP converged to cost={info['cost']}")

# test problem without redundant constraints
clique_list = create_clique_list_loc(
lifter,
use_known=True,
use_autotemplate=False,
add_redundant=True,
## =================== solve with SDP from HomQCQP =================== ##
problem = HomQCQP.init_from_lifter(lifter)
np.testing.assert_allclose(
problem.C.get_matrix_dense(lifter.var_dict), Q.toarray()
)
problem.get_asg(var_list=lifter.var_dict)
# clique_data = HomQCQP.get_chain_clique_data(fixed="h", variables=["x", "z"])
problem.clique_decomposition()

Q_homqcqp, Constraints_homqcqp = problem.get_problem_matrices()
X, info = solve_sdp(
Q_homqcqp,
Constraints_homqcqp,
adjust=True,
primal=True,
use_fusion=True,
tol=TOL_SDP,
verbose=VERBOSE,
)
Q_test = PolyMatrix(symmetric=False)
for c in clique_list:
Ci, __ = PolyMatrix.init_from_sparse(c.Q, var_dict=c.var_dict)
Q_test += Ci
np.testing.assert_allclose(Q.toarray(), Q_test.toarray(lifter.var_dict))

rdg = get_relative_gap(info["cost"], cost_gt)
x_SDP, info_rank = rank_project(X, p=1)
theta_SDP = lifter.get_theta_from_x(x=x_SDP)
for key, val in lifter.get_error(
theta_hat=theta_SDP, theta_gt=theta_opt
).items():
assert val < TEST_SDP, f"error {key} failed for SDP"
assert info_rank["EVR"] > EVR_THRESH
print(f"SDP converged to cost={info['cost']}")

## =================== solve with dSDP =================== ##
X_list, info = solve_oneshot(
clique_list,

X_list, info = solve_dsdp(
problem,
use_primal=True,
use_fusion=True,
verbose=VERBOSE,
tol=TOL_DSDP,
)

(
x_dSDP,
ranks,
factors,
) = problem.get_mr_completion(X_list)

rdg = get_relative_gap(info["cost"], cost_gt)
assert rdg < TEST_RDG
x_dSDP, evr_mean = extract_solution(lifter, X_list)
theta_dSDP = lifter.get_theta_from_x(x=np.hstack([1.0, x_dSDP.flatten()]))

theta_dSDP = lifter.get_theta_from_x(x=x_dSDP)
if not isinstance(lifter, RangeOnlyLocLifter):
for key, val in lifter.get_error(
theta_hat=theta_dSDP, theta_gt=theta_opt
).items():
assert val < TEST_OPT, f"error {key} failed for dSDP"
assert evr_mean > EVR_THRESH
np.testing.assert_allclose(ranks, 1.0)
print(f"dSDP converged to cost={info['cost']}")

return
## =================== solve with ADMM =================== ##
if lifter.ADMM_INIT_XHAT:
X0 = []
for c in clique_list:
for c in problem.cliques:
x_clique = lifter.get_x(theta=theta_est, var_subset=c.var_dict)
X0.append(np.outer(x_clique, x_clique))
else:
Expand All @@ -140,7 +201,7 @@ def test_highlevel():
lifter.ADMM_OPTIONS["maxiter"] = 30
lifter.ADMM_OPTIONS["early_stop"] = True
X_list, info = solve_alternating(
deepcopy(clique_list),
deepcopy(problem),
X0=X0,
verbose=False,
**lifter.ADMM_OPTIONS,
Expand All @@ -158,4 +219,5 @@ def test_highlevel():


if __name__ == "__main__":
# test_matrices()
test_highlevel()
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies:
- autograd>=1.6.2
- pandas==1.5.3
- spatialmath-python=1.1.11
- plotly

- pip:
- -r requirements.txt
Expand Down
4 changes: 3 additions & 1 deletion lifters/range_only_lifters.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,9 @@ def get_Q_from_y(
continue
Q_poly[f"x_{n-1}", f"x_{n}"] += self.prob.get_R_nm(n)

if save:
if save and output_poly:
self.Q_fixed = Q_poly
elif save and not output_poly:
self.Q_fixed = Q_poly.get_matrix(self.var_dict)

if output_poly:
Expand Down