Skip to content
Snippets Groups Projects
Commit bbbc5ed2 authored by Vincent Grande's avatar Vincent Grande
Browse files

initial commit

parents
No related branches found
No related tags found
No related merge requests found
Showing with 1024 additions and 0 deletions
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
3.12
This diff is collapsed.
LICENSE 0 → 100644
MIT License
Copyright (c) 2024 Vincent Grande and Computational Network Science, RWTH Aachen University
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# Code for "Topological Trajectory Classification and Landmark Inference on Simplicial Complexes"
A sample run with illustrations of the proposed method can be found in the Jupyter Notebook `FindingHoles.ipynb`.
## Paper
When you find our work interesting, please cite our Paper "Topological Trajectory Classification and Landmark Inference on Simplicial Complexes", Vincent P. Grande, Josef Hoppe, Florian Frantzen, Michael T. Schaub, 2024 Asilomar Conference on Signals, Systems and Computers, IEEE.
## License
This work is licensed under a MIT license, see the license file.
\ No newline at end of file
from .embeddings import *
from .target_function import *
from .learning_holes import *
\ No newline at end of file
import numpy as np
#__all__ = ["embedding_distinctiveness", "mean_embedding_distinctiveness", "min_embedding_distance"]
def mean_embedding_distinctiveness(embeddings: np.ndarray, targets: np.ndarray) -> float:
"""Measure how well the different classes are separated in the embedding space.
Parameters
----------
embeddings : np.ndarray; shape = (n_samples, n_features)
The embeddings.
targets : np.ndarray; shape = (n_samples,)
The target class of each sample.
Returns
-------
float
The distinctiveness of the embeddings.
"""
combinations = np.array(
np.meshgrid(np.arange(embeddings.shape[0]), np.arange(embeddings.shape[0]))
).reshape(2, -1)
pairwise_distances = np.linalg.norm(
embeddings[combinations[0]] - embeddings[combinations[1]], axis=1
)
mean_between_distance = pairwise_distances[
targets[combinations[0]] != targets[combinations[1]]
].mean()
mean_within_distance = pairwise_distances[
targets[combinations[0]] == targets[combinations[1]]
].mean()
return mean_between_distance / mean_within_distance
def min_embedding_distance(embeddings: np.ndarray, targets: np.ndarray) -> float:
"""Measure how well the different classes are separated in the embedding space.
Parameters
----------
embeddings : np.ndarray; shape = (n_samples, n_features)
The embeddings.
targets : np.ndarray; shape = (n_samples,)
The target class of each sample.
Returns
--------
float
The min_distance of the embeddings.
"""
combinations = np.array(
np.meshgrid(np.arange(embeddings.shape[0]), np.arange(embeddings.shape[0]))
).reshape(2, -1)
pairwise_distances = np.linalg.norm(
embeddings[combinations[0]] - embeddings[combinations[1]], axis=1
)
min_between_distance = pairwise_distances[
targets[combinations[0]] != targets[combinations[1]]
].min()
return min_between_distance
def min_embedding_distance_equal_clusters(embeddings: np.ndarray, targets: np.ndarray) -> float:
"""Measure how well the different classes are separated in the embedding space.
Parameters
----------
embeddings : np.ndarray; shape = (n_samples, n_features)
The embeddings.
targets : np.ndarray; shape = (n_samples,)
The target class of each sample.
Returns
--------
float
The min_distance of the embeddings.
"""
combinations = np.array(
np.meshgrid(np.arange(embeddings.shape[0]), np.arange(embeddings.shape[0]))
).reshape(2, -1)
pairwise_distances = np.linalg.norm(
embeddings[combinations[0]] - embeddings[combinations[1]], axis=1
)
min_between_distance = pairwise_distances[
targets[combinations[0]] != targets[combinations[1]]
].min()
min_number_per_cluster = min([np.sum(targets == i) for i in np.unique(targets)])
max_number_per_cluster = max([np.sum(targets == i) for i in np.unique(targets)])
std = np.std([np.sum(targets == i) for i in np.unique(targets)])
total_number = len(targets)
num_classes = len(np.unique(targets))
return min_between_distance*(min_number_per_cluster-1+1e-10)**1/(std+total_number/num_classes)
def embedding_distinctiveness(embeddings: np.ndarray, targets: np.ndarray) -> float:
"""Measure how well the different classes are separated in the embedding space.
Parameters
----------
embeddings : np.ndarray; shape = (n_samples, n_features)
The embeddings.
targets : np.ndarray; shape = (n_samples,)
The target class of each sample.
Returns
-------
float
The distinctiveness of the embeddings.
"""
combinations = np.array(
np.meshgrid(np.arange(embeddings.shape[0]), np.arange(embeddings.shape[0]))
).reshape(2, -1)
pairwise_distances = np.linalg.norm(
embeddings[combinations[0]] - embeddings[combinations[1]], axis=1
)
min_between_distance = pairwise_distances[
targets[combinations[0]] != targets[combinations[1]]
].min()
max_within_distance = pairwise_distances[
targets[combinations[0]] == targets[combinations[1]]
].max()
return min_between_distance / max_within_distance
from random import shuffle
import cell_flower as cf
import networkx as nx
import numpy as np
import scipy.sparse
from scipy.spatial import Delaunay
from scipy.sparse.linalg import expm_multiply
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
import time
import platform
import tqdm
if platform.system() == 'Darwin':
import multiprocess as multiprocessing
else:
import multiprocessing
from learning_holes.utils import target_function, target_function_unsupervised
from util import trajectory
def run_finding_holes_once(parameter_dict):
np.random.seed()
network_dict = generate_complex(n_points = parameter_dict["n_points"])
G = network_dict["G"]
edge_dict = network_dict["edge_dict"]
cell_dict = network_dict["cell_dict"]
boundary_2 = network_dict["boundary_2"]
trajectory_dict = generate_trajectories(G, n_classes = parameter_dict["n_classes"], n_train = parameter_dict["n_train"], n_test = parameter_dict["n_test"], min_rel_length = parameter_dict["min_rel_length"], fixed_length = parameter_dict["fixed_length"], points=network_dict["points"])
train_trajectories = trajectory_dict["train_trajectories"]
train_targets = trajectory_dict["train_targets"]
test_trajectories = trajectory_dict["test_trajectories"]
test_targets = trajectory_dict["test_targets"]
targets = np.concatenate([train_targets, test_targets])
start_time = time.time()
result_dict = find_holes(parameter_dict["num_holes"], train_trajectories, cell_dict, boundary_2, boundary_2, targets, edge_dict, num_init_cells = parameter_dict["num_init_cells"],
max_depth = parameter_dict["max_depth"], step_size = parameter_dict["step_size"], verbose = parameter_dict["verbose"], joint_holes = parameter_dict["joint_holes"], unsupervised=parameter_dict["unsupervised"],n_clusters = parameter_dict["n_classes"], smooth_trajectories = parameter_dict["smooth_trajectories"])
end_time = time.time()
#compute ARI
ARIscore = adjusted_rand_score(test_targets,predict_classes_forest(train_targets, result_dict["cell_indices"], train_trajectories, cell_dict, boundary_2, boundary_2, test_trajectories, joint_holes = False, unsupervised=parameter_dict["unsupervised"],n_clusters = parameter_dict["n_classes"],edge_dict = edge_dict))
return {"parameter_dict":parameter_dict, "ARI":ARIscore, "time":end_time-start_time}
def run_multi_holes(parameters,num_processes):
results_list = []
PROCESSES = num_processes
numbered_params = []
for param in parameters:
numbered_params.append([param])
with multiprocessing.Pool(PROCESSES) as pool:
results = [pool.map_async(run_finding_holes_once, p)
for p in numbered_params]
for r in tqdm.tqdm(results):
results_list.append(r.get())
holes_results = [result[0] for result in results_list]
return holes_results
def remove_ith_column(matrix, i):
n = matrix.shape[1]
return matrix[:,list(range(i))+list(range(i+1, n))]
def remove_ith_columns(matrix, indices):
n = matrix.shape[1]
return matrix[:,[i for i in range(n) if i not in indices]]
def cell_indices_to_harmonic_vectors(removed_indices, cell_dict, boundary_2, weighted_boundary_2):
ind_vectors_cell = np.zeros((len(cell_dict), len(removed_indices)))
for i,index in enumerate(removed_indices):
ind_vectors_cell[index,i] = 1
generating_boundaries = boundary_2 @ ind_vectors_cell
updated_weighted_boundary_2 = remove_ith_columns(weighted_boundary_2, removed_indices)
curl_generators = np.array([scipy.sparse.linalg.lsmr(updated_weighted_boundary_2, generating_boundary)[0] for generating_boundary in generating_boundaries.T])
harmonic_vectors = generating_boundaries - updated_weighted_boundary_2 @ curl_generators.T
normalised_harmonic_vectors = harmonic_vectors / np.linalg.norm(harmonic_vectors, axis=0)
return normalised_harmonic_vectors
def shorten_trajectory(trajectory, min_rel_length, fixed_length = False):
min_length = int(np.ceil(len(trajectory) * min_rel_length))
if fixed_length:
new_length = min_length
else:
new_length = np.random.randint(min_length, len(trajectory))
new_start = np.random.randint(len(trajectory) - new_length+1)
return trajectory[new_start:new_start+new_length]
def trajectories_to_edges(trajectories):
result = []
for traj in trajectories:
result += nx.utils.pairwise(traj)
return result
def trajectory_to_oriented_edges(traj):
result = {}
edges = [tuple(e) for e in trajectories_to_edges([traj])]
for e in edges:
normalized = cf.normalize_cell(e)
result[normalized] = 1 if normalized == e else -1
return result
def trajectories_to_sparse_matrix(trajectories, edge_dict, normalise_trajectories = False):
result = np.zeros(len(edge_dict))
trajectory_dict = {}
row_inds = []
col_inds = []
data = []
for i, traj in enumerate(trajectories):
oriented_edges = trajectory_to_oriented_edges(traj)
num_edges = len(oriented_edges)
for edge, orientation in oriented_edges.items():
if normalise_trajectories:
orientation = orientation / num_edges
data.append(orientation)
row_inds.append(i)
col_inds.append(edge_dict[tuple((int(vert) for vert in edge))])
return scipy.sparse.csc_matrix((data, (row_inds,col_inds),),shape = (len(trajectories), len(edge_dict)))
def get_adjacent_cells(cell_index_boundaries, k_hop_matrix):
cell_indicator_vector = scipy.sparse.csr_matrix((np.ones(len(cell_index_boundaries)),(cell_index_boundaries,np.zeros(len(cell_index_boundaries)))), shape = (k_hop_matrix.shape[0],len(cell_index_boundaries)))
adjacent_cells_vector = k_hop_matrix @ cell_indicator_vector
adjacent_cell_indices = adjacent_cells_vector.nonzero()[0]
return adjacent_cell_indices
def compute_loss_from_cells(test_cells, trajectories_matrix, targets, cell_dict, boundary_2):
harmonic_vectors = cell_indices_to_harmonic_vectors(test_cells, cell_dict, boundary_2, boundary_2)
harmonic_embeddings = trajectories_matrix @ harmonic_vectors
embedding_distinctiveness_score = target_function(harmonic_embeddings, targets)
return embedding_distinctiveness_score
def knn_accuracy_score(harmonic_embeddings, targets, n_splits = 5, epsilon = 0.01):
knn = KNeighborsClassifier(n_neighbors=1)
scores = []
for i in range(n_splits):
X_train, X_test, y_train, y_test = train_test_split(harmonic_embeddings, targets, test_size=0.2,random_state=i)
knn.fit(X_train, y_train)
scores.append(knn.score(X_test, y_test))
return np.mean(scores)+epsilon*embedding_distinctiveness(harmonic_embeddings, targets)
def get_candidate_cell_tuples(cur_cell_indices, adjacent_cell_indices):
candidate_cell_tuples = []
for i_cell in range(len(cur_cell_indices)):
for new_cell_index in adjacent_cell_indices[i_cell]:
test_cells = np.array(cur_cell_indices)
test_cells[i_cell] = new_cell_index
candidate_cell_tuples.append(tuple(test_cells))
return candidate_cell_tuples
def find_holes(num_holes, trajectories, cell_dict, boundary_2, weighted_boundary_2, targets, edge_dict, num_init_cells = 100, max_depth = 10, step_size = 1, verbose = False, joint_holes = True, unsupervised=False,n_clusters = 0, normalise_trajectories = False, smooth_trajectories = 0):
"""
Finds cells to remove from SC to best classify trajectory classes.
Args:
num_holes (int): Number of holes to find.
trajectories (array-like): List of trajectories.
cell_dict (dict): Dictionary mapping cell indices to cell information.
boundary_2 (array-like): Boundary matrix.
targets (array-like): Target values for each trajectory.
num_init_cells (int, optional): Number of initial cells to consider. Defaults to 100.
max_depth (int, optional): Maximum depth of search. Defaults to 10.
step_size (int, optional): Step size for matrix power calculation. Defaults to 1.
Returns:
dict: A dictionary containing the following information:
- 'cell_indices': List of cell indices representing the found holes.
- 'embedding_distinctiveness': Embedding distinctiveness score of the found holes.
- 'known_losses': Dictionary mapping cell index tuples to their corresponding loss values.
- 'loss_over_time': List of loss values over time.
"""
if not (weighted_boundary_2 != boundary_2).nnz == 0:
raise ValueError("weighted_boundary_2 and boundary_2 must be equal, otherwise you need to check whether some functions assume boundary_2=weighted_boundary.")
if targets is None:
targets = np.array(range(len(trajectories)))
if unsupervised and n_clusters == 0:
raise ValueError("n_clusters must be specified for unsupervised learning.")
known_losses = {}
used_cell_indices = []
loss_over_time = []
max_depth = min(max_depth, len(cell_dict)**num_holes)
trajectories_matrix = trajectories_to_sparse_matrix(trajectories, edge_dict=edge_dict, normalise_trajectories=normalise_trajectories)
up_laplacian = boundary_2 @ boundary_2.T
if smooth_trajectories > 0:
trajectories_matrix = expm_multiply(-smooth_trajectories*up_laplacian, trajectories_matrix.T).T
k_hop_matrix = scipy.sparse.linalg.matrix_power(np.abs(boundary_2.T) @ np.abs(boundary_2),step_size)
cur_cell_indices = []
known_losses = {}
known_harmonic_embeddings = {}
#Initialise by first looking for best single hole among #num_init_cells candidates, then looks for best second hole after fixing first hole, etc.
for i in range(num_holes):
candidate_cells = np.random.randint(len(cell_dict),size = num_init_cells)
for cell_index in candidate_cells:
test_cell_indices = cur_cell_indices + [cell_index]
if joint_holes:
harmonic_vectors = cell_indices_to_harmonic_vectors(test_cell_indices, cell_dict, boundary_2, weighted_boundary_2)
harmonic_embeddings = trajectories_matrix @ harmonic_vectors
else:
if cell_index not in known_harmonic_embeddings:
new_harmonic_vector = cell_indices_to_harmonic_vectors([cell_index], cell_dict, boundary_2, weighted_boundary_2)
known_harmonic_embeddings[cell_index] = (trajectories_matrix @ new_harmonic_vector)[:,0]
harmonic_embeddings = [known_harmonic_embeddings[cur_cell_index] for cur_cell_index in test_cell_indices]
harmonic_embeddings = np.array(harmonic_embeddings).T
if unsupervised:
kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(harmonic_embeddings)
targets = kmeans.labels_
embedding_distinctiveness_score = target_function_unsupervised(harmonic_embeddings, targets)
else:
embedding_distinctiveness_score = target_function(harmonic_embeddings, targets)
known_losses[tuple(test_cell_indices)] = embedding_distinctiveness_score
best_candidate = candidate_cells[np.argmax([known_losses[tuple(cur_cell_indices + [cell_index])] for cell_index in candidate_cells])]
cur_cell_indices.append(best_candidate)
adjacent_cell_indices = [[cell_index] for cell_index in cur_cell_indices]
adjacent_cell_indices = [get_adjacent_cells([cur_cell_index], k_hop_matrix) for cur_cell_index in cur_cell_indices]
num_edges = len(edge_dict)
loss_over_time.append(known_losses[tuple(cur_cell_indices)])
#Iteratively looks for better cells to remove
while True:
if verbose:
print(cur_cell_indices)
candidate_cell_tuples = get_candidate_cell_tuples(cur_cell_indices, adjacent_cell_indices)
shuffle(candidate_cell_tuples)
computed_cell_tuples = [tuple(cur_cell_indices)]
for i, test_cells in enumerate(candidate_cell_tuples[:max_depth]):
computed_cell_tuples.append(tuple(test_cells))
if tuple(test_cells) not in known_losses:
if verbose:
print("computing ", i+1, " of ", len(candidate_cell_tuples[:max_depth]))
if joint_holes:
known_losses[tuple(test_cells)] = compute_loss_from_cells(test_cells, trajectories_matrix, targets, cell_dict, boundary_2)
else:
for cell_index in test_cells:
if cell_index not in known_harmonic_embeddings:
new_harmonic_vector = cell_indices_to_harmonic_vectors([cell_index], cell_dict, boundary_2, weighted_boundary_2)
known_harmonic_embeddings[cell_index] = (trajectories_matrix @ new_harmonic_vector)[:,0]
harmonic_embeddings = [known_harmonic_embeddings[cur_cell_index] for cur_cell_index in test_cells]
harmonic_embeddings = np.array(harmonic_embeddings).T
if unsupervised:
kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(harmonic_embeddings)
targets = kmeans.labels_
known_losses[tuple(test_cells)] = target_function_unsupervised(harmonic_embeddings, targets)
else:
known_losses[tuple(test_cells)] = target_function(harmonic_embeddings, targets)
if known_losses[tuple(test_cells)] > known_losses[tuple(cur_cell_indices)]: #exit for loop and look around newly found cell
if verbose:
print("found better cell")
loss_over_time.append(known_losses[tuple(test_cells)])
cur_cell_indices = test_cells
used_cell_indices.append(cur_cell_indices)
adjacent_cell_indices = [get_adjacent_cells([cur_cell_index], k_hop_matrix) for cur_cell_index in cur_cell_indices]
break
if cur_cell_indices != test_cells: #if no better cell was found
if len(computed_cell_tuples) < max_depth: #if there are still cells to check make search space large
if verbose:
print("expanding search space")
print("length computed_cell_tuples ", len(computed_cell_tuples))
print("max depth ", max_depth)
adjacent_cell_indices = [get_adjacent_cells(indices, k_hop_matrix) for indices in adjacent_cell_indices]
else: #if all cells have been checked or maximum depth has been reached
return {"cell_indices":cur_cell_indices, "embedding_distinctiveness":known_losses[tuple(cur_cell_indices)], "known_losses" : known_losses, "loss_over_time":loss_over_time, "used_cell_indices":used_cell_indices}
def generate_complex(n_points = 1000):
rng = np.random.default_rng()
points = rng.uniform(size=(n_points, 2))
triang = Delaunay(points)
triang_list = [tuple(s) for s in triang.simplices]
SC = cf.CellComplex(n_points, triang_list)
G = cf.cc_to_nx_graph(SC)
for a, b in G.edges():
G[a][b]['weight'] = np.linalg.norm(points[a] - points[b])
edge_dict = {tuple((int(vert) for vert in edge)):i for i, edge in enumerate(SC.get_cells(1))}
cell_dict = {tuple((int(vert) for vert in cell)):i for i, cell in enumerate(SC.get_cells(2))}
boundary_1 = SC.boundary_map(1)
boundary_2 = SC.boundary_map(2)
return_dict = {'G': G, 'edge_dict': edge_dict, 'cell_dict': cell_dict, 'boundary_1': boundary_1, 'boundary_2': boundary_2, 'points': points, 'SC': SC}
return return_dict
def generate_trajectories(G,
n_classes = 5,
n_train = 5,
n_test = 5,
min_rel_length = 0.6,
fixed_length = True,
points = None
):
n_trajectories_per_class = n_train + n_test
trajectories = []
targets = []
test_train = []
existing_locations = []
for i in range(n_classes):
arrival, departure, new_trajectories = trajectory.random_trajectory_class(G,points,n_trajectories_per_class, existing_locations = existing_locations, points_on_exterior = True, corridor_size = 1.0, min_distance = 0.2)
existing_locations += [arrival, departure]
shuffle(new_trajectories)
trajectories += [shorten_trajectory(traj, min_rel_length=min_rel_length, fixed_length = fixed_length) for traj in new_trajectories]
targets += [i] * n_trajectories_per_class
test_train = np.array(([0] * n_train + [1] * n_test)*n_classes)
targets = np.array(targets)
test_targets = targets[test_train == 1]
train_targets = targets[test_train == 0]
test_trajectories = [trajectory for trajectory, test in zip(trajectories, test_train) if test == 1]
train_trajectories = [trajectory for trajectory, test in zip(trajectories, test_train) if test == 0]
return_dict = {"train_trajectories":train_trajectories, "train_targets":train_targets, "test_trajectories":test_trajectories, "test_targets":test_targets}
return return_dict
def smooth_flow_list(flow_list, up_laplacian, t =5):
smoothed_flows = []
for flow in flow_list:
smoothed_flows.append(expm_multiply(-up_laplacian, flow,start=t, stop=t, num=1, endpoint=True)[0])
return smoothed_flows
def predict_classes(targets, hole_indices, trajectories, cell_dict, boundary_2, weighted_boundary_2, test_trajectories, joint_holes, edge_dict, unsupervised = False, n_clusters = 0):
trajectories_matrix = trajectories_to_sparse_matrix(trajectories, edge_dict)
test_trajectories_matrix = trajectories_to_sparse_matrix(test_trajectories, edge_dict)
pos_boundary_2 = np.abs(boundary_2)
if joint_holes:
harmonic_vectors = cell_indices_to_harmonic_vectors(hole_indices, cell_dict, boundary_2, weighted_boundary_2)
else:
harmonic_vectors = np.array([cell_indices_to_harmonic_vectors([hole_index] , cell_dict, boundary_2, weighted_boundary_2)[:,0] for hole_index in hole_indices]).T
harmonic_embeddings = trajectories_matrix @ harmonic_vectors
test_harmonic_embeddings = test_trajectories_matrix @ harmonic_vectors
knn = KNeighborsClassifier(n_neighbors=1)
if unsupervised:
kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(harmonic_embeddings)
targets = kmeans.labels_
knn.fit(harmonic_embeddings, targets)
return knn.predict(test_harmonic_embeddings)
def predict_classes_forest(targets, hole_indices, trajectories, cell_dict, boundary_2, weighted_boundary_2, test_trajectories, joint_holes, edge_dict, unsupervised=False, n_clusters = 0):
trajectories_matrix = trajectories_to_sparse_matrix(trajectories, edge_dict)
test_trajectories_matrix = trajectories_to_sparse_matrix(test_trajectories, edge_dict)
pos_boundary_2 = np.abs(boundary_2)
if joint_holes:
harmonic_vectors = cell_indices_to_harmonic_vectors(hole_indices, cell_dict, boundary_2, weighted_boundary_2)
else:
harmonic_vectors = np.array([cell_indices_to_harmonic_vectors([hole_index] , cell_dict, boundary_2, weighted_boundary_2)[:,0] for hole_index in hole_indices]).T
harmonic_embeddings = trajectories_matrix @ harmonic_vectors
test_harmonic_embeddings = test_trajectories_matrix @ harmonic_vectors
rfc = RandomForestClassifier(n_estimators=1000)
if unsupervised:
kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(harmonic_embeddings)
targets = kmeans.labels_
rfc.fit(harmonic_embeddings, targets)
return rfc.predict(test_harmonic_embeddings)
\ No newline at end of file
from learning_holes.utils import *
def target_function(harmonic_embeddings, targets): # Change to change behaviour of find_holes. Available functions are mean_embedding_distinctiveness and min_embedding_distance and embedding_distinctiveness and knn_accuracy_score
return embedding_distinctiveness(harmonic_embeddings, targets)
def target_function_unsupervised(harmonic_embeddings, targets): # Change to change behaviour of find_holes. Available functions are mean_embedding_distinctiveness and min_embedding_distance and embedding_distinctiveness and knn_accuracy_score
return min_embedding_distance_equal_clusters(harmonic_embeddings, targets)
\ No newline at end of file
from copy import deepcopy
from typing import Generator, List, Tuple
import networkx as nx
import numpy as np
from scipy.sparse import csr_matrix, linalg
from util import SEED, random_pair_in_exterior, random_pair_in_plane
from util.graph import edges_on_path, point_to_closest_node
def random_trajectory_class(graph: nx.Graph, node_positions: np.ndarray, number_trajectories: int,
*, existing_locations: List[np.ndarray] = None, seed: SEED = None, points_on_exterior: bool = True, corridor_size: float = 0.2, min_distance: float = 0.3) -> Tuple[np.ndarray, np.ndarray, List[List[int]]]:
random_state = nx.utils.create_random_state(seed)
# We operate on a copy since we will alter the edge attributes below after each trajectory generation.
# This change is only temporary and should not affect the other trajectory classes or the rest of the
# computation.
graph = deepcopy(graph)
if points_on_exterior:
departure_point, arrival_point = random_pair_in_exterior(existing_locations=existing_locations, seed=random_state, corridor_size=corridor_size, min_distance=min_distance)
else:
departure_point, arrival_point = random_pair_in_plane(existing_locations=existing_locations, seed=random_state)
departure_node = point_to_closest_node(departure_point, graph, node_positions)# , ignore_nodes=graph.graph['removed_nodes'])
arrival_node = point_to_closest_node(arrival_point, graph, node_positions)# , ignore_nodes=graph.graph['removed_nodes'])
possible_departures = list(graph.neighbors(departure_node)) + [departure_node]
possible_arrivals = list(graph.neighbors(arrival_node)) + [arrival_node]
trajectories = []
for _ in range(number_trajectories):
departure = random_state.choice(possible_departures)
arrival = random_state.choice(possible_arrivals)
trajectory = nx.shortest_path(graph, departure, arrival, weight='weight')
trajectories.append(trajectory)
# Increase the edge weights along the trajectory a bit. This should make it more
# likely that the shortest path is slightly different in the next iteration.
# This should generate distinct but similar trajectories between the departure and the
# arrival node.
for e0, e1 in edges_on_path(trajectory):
graph[e0][e1]['weight'] += 0.0001
return departure_point, arrival_point, trajectories
def interpolate_trajectory(trajectory: List[int], graph: nx.Graph) -> Generator[int, None, None]:
previous_index = trajectory[0]
yield previous_index
for current_index in trajectory[1:]:
if graph.has_edge(previous_index, current_index):
yield current_index
previous_index = current_index
else:
shortest_path: List[int] = nx.shortest_path(graph, previous_index, current_index)
yield from shortest_path[1:]
previous_index = shortest_path[-1]
def full_trajectory_matrix(graph: nx.Graph, mat_traj, elist, elist_dict) -> List[csr_matrix]:
mat_vec_e = []
for count, j in enumerate(mat_traj):
if len(j) == 0:
print(f"{count}: No Trajectory")
continue
data = []
row_ind = []
col_ind = []
for i, (x, y) in enumerate(j):
assert (x, y) in elist or (y, x) in elist # TODO
assert graph.has_edge(x, y)
if x < y:
idx = elist_dict[(x, y)]
row_ind.append(idx)
col_ind.append(i)
data.append(1)
if y < x:
idx = elist_dict[(y, x)]
row_ind.append(idx)
col_ind.append(i)
data.append(-1)
mat_temp = csr_matrix((data, (row_ind, col_ind)), shape=(
elist.shape[0], len(j)), dtype=np.int8)
mat_vec_e.append(mat_temp)
return mat_vec_e
def flatten_trajectory_matrix(H_full) -> np.ndarray:
# TODO: This should return a sparse matrix
flattened = map(lambda mat_tmp: mat_tmp.sum(axis=1), H_full)
return np.array(list(flattened)).squeeze()
def create_matrix_coordinates_trajectory_Hspace(H, M_full):
return [H @ mat for mat in M_full]
def harmonic_projection_matrix(L1: csr_matrix, number_of_holes: int) -> np.ndarray:
"""
Coputes the harmonic projection matrix for the simplicial complex with the
given Hodge-1 Laplacian.
Parameters
----------
L1 : csr_matrix of type float
number_of_holes : int
"""
_, v = linalg.eigsh(L1, k=number_of_holes,
v0=np.ones(L1.shape[0]), which='SM')
return v.T
This diff is collapsed.
overview_holes.png

710 KiB

[project]
name = "publication-code-finding-holes"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"cell-flower>=1.0.1",
"imageio>=2.36.1",
"ipykernel>=6.29.5",
"ipympl>=0.9.4",
"matplotlib>=3.9.3",
"multiprocess>=0.70.17",
"nbformat==4.2",
"networkx>=3.4.2",
"numpy>=2.1.3",
"pandas>=2.2.3",
"pip>=24.3.1",
"plotly>=5.24.1",
"scikit-learn>=1.5.2",
"scipy>=1.14.1",
"seaborn>=0.13.2",
"shapely>=2.0.6",
"tqdm>=4.67.1",
]
smoothed_flows.gif

1.78 MiB

from typing import List, Tuple, Union
import networkx as nx
import numpy as np
from shapely.affinity import rotate, scale
from shapely.geometry import Point
SEED = Union[int, np.random.RandomState, None]
def lexsort_rows(array: np.ndarray) -> np.ndarray:
array = np.array(array)
return array[np.lexsort(np.rot90(array))]
def make_ellipse(center: Point, semi_major: float, semi_minor: float, rotation: float):
circle = center.buffer(1.0)
ellipse_base = scale(circle, semi_major, semi_minor)
return rotate(ellipse_base, rotation)
def random_point_in_exterior(corridor_size: float = 0.2, *, seed=None) -> np.ndarray:
"""
Selects a random point in the exterior of the 2-dimensional space
[0, 1] ⨉ [0, 1].
"""
random_state = nx.utils.create_random_state(seed)
x = random_state.uniform(0.0, corridor_size)
y = random_state.uniform(0.0, 1.0)
if random_state.uniform() < 0.5:
x, y = x, y
if random_state.uniform() < 0.5:
x = 1.0 - x
y = 1.0 - y
return np.array((x, y))
def random_pair_in_exterior(corridor_size: float = 0.2, existing_locations: List[np.ndarray] = None, *, seed=None , min_distance: float = 0.3) -> Tuple[np.ndarray, np.ndarray]:
random_state = nx.utils.create_random_state(seed)
if existing_locations is None:
existing_locations = []
while True:
departure = random_point_in_exterior(corridor_size, seed=random_state)
arrival = random_point_in_exterior(corridor_size, seed=random_state)
# Ensure some distance between departure and arrival area. This should lead to more
# interesting trajectories.
if np.linalg.norm(departure - arrival) < 0.6:
continue
# Ensure that the departure and arrival area is not very similar to the departure and
# arrival area of another trajectory class. This would mean that the two classes are
# actually just one class, which is particularly problematic of one of the classes is
# the outlier.
found_similar_class = False
for existing in existing_locations:
if np.linalg.norm(existing[0] - departure) < min_distance and np.linalg.norm(existing[1] - arrival) < min_distance:
found_similar_class = True
break
if found_similar_class:
continue
# if this is reached, we found a good departure and arrival location pair
return departure, arrival
def random_pair_in_plane(corridor_size: float = 0.2, existing_locations: List[np.ndarray] = None, *, seed=None) -> Tuple[np.ndarray, np.ndarray]:
random_state = nx.utils.create_random_state(seed)
if existing_locations is None:
existing_locations = []
while True:
departure = random_state.uniform(0.0, 1.0, 2)
arrival = random_state.uniform(0.0, 1.0, 2)
# Ensure some distance between departure and arrival area. This should lead to more
# interesting trajectories.
if np.linalg.norm(departure - arrival) < 0.6:
continue
# Ensure that the departure and arrival area is not very similar to the departure and
# arrival area of another trajectory class. This would mean that the two classes are
# actually just one class, which is particularly problematic of one of the classes is
# the outlier.
found_similar_class = False
for existing in existing_locations:
if np.linalg.norm(existing[0] - departure) < 0.3 and np.linalg.norm(existing[1] - arrival) < 0.3:
found_similar_class = True
break
if found_similar_class:
continue
# if this is reached, we found a good departure and arrival location pair
return departure, arrival
from itertools import chain
from typing import Iterable, List, Optional, Set, Tuple, TypeVar
import networkx as nx
import numpy as np
from scipy.spatial import Delaunay
from shapely.geometry.point import Point
from util import make_ellipse, SEED
#from util.holes import random_holes
V = TypeVar('V')
def edges_on_path(path: List[V]) -> Iterable[Tuple[V, V]]:
return zip(path, path[1:])
def point_to_closest_node(point: np.ndarray, graph: nx.Graph, node_positions: np.ndarray, *, ignore_nodes: Optional[Set[int]] = None) -> int:
distances = np.linalg.norm(node_positions - point, axis=1)
for index in distances.argsort():
#if index not in ignore_nodes:
return index
raise RuntimeError('Could not find a closed node in the graph. Is the graph empty or did you ignore all nodes?')
def random_triangle_graph(number_points: int, *, seed: SEED = None,
distance_weights: bool = False) -> Tuple[nx.Graph, np.ndarray]:
random_state: np.random.RandomState = nx.utils.create_random_state(seed)
points = random_state.uniform(0, 1, (number_points, 2))
tri = Delaunay(points)
nodes = set()
edges = set()
for u, v, w in tri.simplices:
nodes.update([u, v, w])
edges.add(tuple(sorted([u, v])))
edges.add(tuple(sorted([v, w])))
edges.add(tuple(sorted([w, u])))
graph = nx.Graph()
graph.add_nodes_from(sorted(nodes))
graph.add_edges_from(sorted(edges))
if distance_weights:
for a, b in graph.edges():
graph[a][b]['weight'] = np.linalg.norm(points[a] - points[b])
return graph, points
def random_graph(number_nodes: int, number_holes: int, *, seed=None) -> Tuple[nx.Graph, np.ndarray, List[List[int]], np.ndarray]:
random_state = nx.utils.create_random_state(seed)
graph, node_positions = random_triangle_graph(number_nodes, seed=random_state, distance_weights=True)
graph.graph['removed_nodes'] = set()
holes = []
hole_centers, axis_values, hole_rotations = random_holes(number_holes, seed=random_state)
for center, axis_value, hole_rotation in zip(hole_centers, axis_values, hole_rotations):
center = Point(center)
ellipse = make_ellipse(center, axis_value[1], axis_value[0], hole_rotation)
removed_points_round = set()
# TODO: this could be done way more efficiently
for node_id, point in enumerate(map(lambda p: Point(*p), node_positions)):
if ellipse.contains(point):
removed_points_round.add(node_id)
all_neighbors = set(chain.from_iterable(graph.neighbors(point) for point in removed_points_round))
graph.remove_nodes_from(removed_points_round)
graph.graph['removed_nodes'].update(removed_points_round)
# Sometimes a node on the border of the hole remains that has only one neighbor
# since all other neighbors have been deleted. We delete these nodes as well, as
# they otherwise cause problems later on.
hole = []
for node in all_neighbors - removed_points_round:
if graph.degree(node) > 1:
hole.append(node)
else:
graph.remove_node(node)
graph.graph['removed_nodes'].add(node)
holes.append(hole)
return graph, node_positions, holes, hole_centers
def sort_corners_of_hole_old(graph: nx.Graph, corners: List[int]) -> List[int]:
for i in range(0, len(corners) - 1):
neighbor_indexes = list(filter(lambda j: graph.has_edge(
corners[i], corners[j]), range(i, len(corners))))
# if there are two choices, use the one that only connects to the other
# neighbor and the current node itself.
if len(neighbor_indexes) == 1 or i == 0 or i == len(corners) - 2:
chosen_neighbor_index = neighbor_indexes[0]
elif len(neighbor_indexes) == 2:
neighbors_0 = set(filter(lambda j: graph.has_edge(
corners[neighbor_indexes[0]], corners[j]), range(i, len(corners))))
print(neighbors_0)
if neighbors_0 == {i, neighbor_indexes[1]}:
chosen_neighbor_index = neighbor_indexes[0]
else:
chosen_neighbor_index = neighbor_indexes[1]
else:
raise RuntimeError('Cannot order corners of polygon.')
corners[i + 1], corners[chosen_neighbor_index] = corners[chosen_neighbor_index], corners[i + 1]
return corners
def sort_corners_of_hole(graph: nx.Graph, corners: List[int]) -> List[int]:
def sort_corners_of_hole_inner(i: int, graph: nx.Graph, corners: List[int]) -> List[int]:
if i == len(corners) - 1:
if graph.has_edge(corners[0], corners[-1]):
return corners
raise RuntimeError('Solution is invalid.')
neighbor_indexes = list(filter(lambda j: graph.has_edge(
corners[i], corners[j]), range(i, len(corners))))
for possible_neighbor in neighbor_indexes:
corners[i + 1], corners[possible_neighbor] = corners[possible_neighbor], corners[i + 1]
try:
return sort_corners_of_hole_inner(i + 1, graph, corners)
except RuntimeError:
corners[i + 1], corners[possible_neighbor] = corners[possible_neighbor], corners[i + 1]
raise RuntimeError('Exceeded all possible choices.')
return sort_corners_of_hole_inner(0, graph, corners)
from copy import deepcopy
from typing import Generator, List, Tuple
import networkx as nx
import numpy as np
from scipy.sparse import csr_matrix, linalg
from util import SEED, random_pair_in_exterior, random_pair_in_plane
from util.graph import edges_on_path, point_to_closest_node
def random_trajectory_class(graph: nx.Graph, node_positions: np.ndarray, number_trajectories: int,
*, existing_locations: List[np.ndarray] = None, seed: SEED = None, points_on_exterior: bool = True, corridor_size: float = 0.2, min_distance: float = 0.3) -> Tuple[np.ndarray, np.ndarray, List[List[int]]]:
random_state = nx.utils.create_random_state(seed)
# We operate on a copy since we will alter the edge attributes below after each trajectory generation.
# This change is only temporary and should not affect the other trajectory classes or the rest of the
# computation.
graph = deepcopy(graph)
if points_on_exterior:
departure_point, arrival_point = random_pair_in_exterior(existing_locations=existing_locations, seed=random_state, corridor_size=corridor_size, min_distance=min_distance)
else:
departure_point, arrival_point = random_pair_in_plane(existing_locations=existing_locations, seed=random_state)
departure_node = point_to_closest_node(departure_point, graph, node_positions)# , ignore_nodes=graph.graph['removed_nodes'])
arrival_node = point_to_closest_node(arrival_point, graph, node_positions)# , ignore_nodes=graph.graph['removed_nodes'])
possible_departures = list(graph.neighbors(departure_node)) + [departure_node]
possible_arrivals = list(graph.neighbors(arrival_node)) + [arrival_node]
trajectories = []
for _ in range(number_trajectories):
departure = random_state.choice(possible_departures)
arrival = random_state.choice(possible_arrivals)
trajectory = nx.shortest_path(graph, departure, arrival, weight='weight')
trajectories.append(trajectory)
# Increase the edge weights along the trajectory a bit. This should make it more
# likely that the shortest path is slightly different in the next iteration.
# This should generate distinct but similar trajectories between the departure and the
# arrival node.
for e0, e1 in edges_on_path(trajectory):
graph[e0][e1]['weight'] += 0.0001
return departure_point, arrival_point, trajectories
def interpolate_trajectory(trajectory: List[int], graph: nx.Graph) -> Generator[int, None, None]:
previous_index = trajectory[0]
yield previous_index
for current_index in trajectory[1:]:
if graph.has_edge(previous_index, current_index):
yield current_index
previous_index = current_index
else:
shortest_path: List[int] = nx.shortest_path(graph, previous_index, current_index)
yield from shortest_path[1:]
previous_index = shortest_path[-1]
def full_trajectory_matrix(graph: nx.Graph, mat_traj, elist, elist_dict) -> List[csr_matrix]:
mat_vec_e = []
for count, j in enumerate(mat_traj):
if len(j) == 0:
print(f"{count}: No Trajectory")
continue
data = []
row_ind = []
col_ind = []
for i, (x, y) in enumerate(j):
assert (x, y) in elist or (y, x) in elist # TODO
assert graph.has_edge(x, y)
if x < y:
idx = elist_dict[(x, y)]
row_ind.append(idx)
col_ind.append(i)
data.append(1)
if y < x:
idx = elist_dict[(y, x)]
row_ind.append(idx)
col_ind.append(i)
data.append(-1)
mat_temp = csr_matrix((data, (row_ind, col_ind)), shape=(
elist.shape[0], len(j)), dtype=np.int8)
mat_vec_e.append(mat_temp)
return mat_vec_e
def flatten_trajectory_matrix(H_full) -> np.ndarray:
# TODO: This should return a sparse matrix
flattened = map(lambda mat_tmp: mat_tmp.sum(axis=1), H_full)
return np.array(list(flattened)).squeeze()
def create_matrix_coordinates_trajectory_Hspace(H, M_full):
return [H @ mat for mat in M_full]
def harmonic_projection_matrix(L1: csr_matrix, number_of_holes: int) -> np.ndarray:
"""
Coputes the harmonic projection matrix for the simplicial complex with the
given Hodge-1 Laplacian.
Parameters
----------
L1 : csr_matrix of type float
number_of_holes : int
"""
_, v = linalg.eigsh(L1, k=number_of_holes,
v0=np.ones(L1.shape[0]), which='SM')
return v.T
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment