Skip to content
Snippets Groups Projects
Commit d67d6859 authored by Jan Habscheid's avatar Jan Habscheid
Browse files

rm old files

parent e230d9f1
No related branches found
No related tags found
1 merge request!2Project3
This diff is collapsed.
%% Cell type:code id: tags:
``` python
import numpy as np
import matplotlib.pyplot as plt
```
%% Cell type:code id: tags:
``` python
x = np.linspace(0, 10, 101)
number_points = 4
```
%% Cell type:code id: tags:
``` python
positions = np.linspace(x[0], x[-1], number_points)
index_in_x = [np.argmin(np.abs(x - pos)) for pos in positions]
```
%% Cell type:code id: tags:
``` python
positions, index_in_x, x[index_in_x]
```
%% Output
(array([ 0. , 3.33333333, 6.66666667, 10. ]),
[np.int64(0), np.int64(33), np.int64(67), np.int64(100)],
array([ 0. , 3.3, 6.7, 10. ]))
%% Cell type:code id: tags:
``` python
```
import matplotlib.pyplot as plt
import numpy as np
import scipy.sparse as sparse
import scipy.io
class System:
def __init__(
self,
x0:float, x1:float,
NTime:int,
k_i:int=11,
k_mu=1, k_sigma=.25,
datafile = 'data/TrueData_FluxF.mat'
):
'''
Constructor for the System class, enhanced with MCMC capabilities
'''
# Existing initialization code from your original implementation
mat = scipy.io.loadmat(datafile)
self.mat = mat
self.Nx = int(mat['N'])
self.T_end = float(mat['Tend'])
self.NT_obs = int(mat['NT_obs'])
self.times_observed = mat['time_obs'][0]
self.n_extraction_points = 4
self.x_observed = [float(mat[f'X_{i}']) for i in range(1, self.n_extraction_points+1)]
self.u_true = np.array([mat[f'u_true_X{i}'] for i in range(1, self.n_extraction_points+1)])
self.u_true = self.u_true[:,:,0]
self.x0 = x0
self.x1 = x1
self.dx = (x1 - x0) / self.Nx
self.x = np.linspace(x0, x1, self.Nx)
self.NTime = NTime
self.dt = self.T_end / NTime
# MCMC parameters
self.add_unc = 0.03 # noise in observation data
self.error_val = 100 # lower limit in Loss function
self.kk_tolerance = 1000 # number of steps in MCMC algorithm
self.tau = 20 # extract "theta^k" with step size "tau"
self.beta = 0.05
# Parameter for k(x)
self.k_i = k_i
self.k_mu = k_mu
self.k_sigma = k_sigma
# MCMC tracking variables
self.Loss = np.ones(self.kk_tolerance + 1) * -1
self.theta_history = []
self.phi_history = []
def simulate_g(
self,
initial_ensemble_kx,
initial_true,
K=0,
weights=[1,1,1,1]
):
'''
Simulate the forward model for MCMC
'''
# Use the simulate_g function definition provided
Ant_Init = initial_true.shape[1]
Nt = len(self.times_observed)
It = np.arange(0, Nt)
JX = self.times_observed <= self.T_end
# Create placeholders for results
ensembleOfPredictedObs_X1_0toT3 = np.zeros((Nt * Ant_Init, 1))
ensembleOfPredictedObs_X2_0toT3 = np.zeros((Nt * Ant_Init, 1))
ensembleOfPredictedObs_X3_0toT3 = np.zeros((Nt * Ant_Init, 1))
ensembleOfPredictedObs_X4_0toT3 = np.zeros((Nt * Ant_Init, 1))
# Positions of observation points
X_positions = self.x_observed
# Create u_X to store results for each observation point
u_X = [np.zeros((len(self.times_observed), Ant_Init)) for _ in range(4)]
# Interpolation of k(x)
x_k = np.linspace(self.x0, self.x1, self.k_i)
# Loop through initial conditions
for K_0 in range(Ant_Init):
# Solve using your existing solver with the current k(x)
u_X[0][:, K_0], u_X[1][:, K_0], u_X[2][:, K_0], u_X[3][:, K_0], _ = self.my_solver(
N=self.Nx,
x=self.x,
xh=None, # You might need to define this if used
Ldom=self.x1-self.x0,
Tend=self.T_end,
k_vector=initial_ensemble_kx[:, K],
u0=initial_true[:, K_0],
time_obs=self.times_observed,
x_obs=X_positions,
x_k=x_k
)
# Store results for each observation point
ensembleOfPredictedObs_X1_0toT3[It + (K_0) * Nt, K] = u_X[0][JX, K_0]
ensembleOfPredictedObs_X2_0toT3[It + (K_0) * Nt, K] = u_X[1][JX, K_0]
ensembleOfPredictedObs_X3_0toT3[It + (K_0) * Nt, K] = u_X[2][JX, K_0]
ensembleOfPredictedObs_X4_0toT3[It + (K_0) * Nt, K] = u_X[3][JX, K_0]
# Combine predictions with weights
ensembleOfPredictedObservations = np.concatenate([
weights[0] * ensembleOfPredictedObs_X1_0toT3,
weights[1] * ensembleOfPredictedObs_X2_0toT3,
weights[2] * ensembleOfPredictedObs_X3_0toT3,
weights[3] * ensembleOfPredictedObs_X4_0toT3
])
return ensembleOfPredictedObservations
def my_solver(
self,
N, x, xh, Ldom, Tend,
k_vector, u0,
time_obs, x_obs, x_k
):
'''
Wrapper for your existing solver to match the interface of simulate_g
This is a placeholder - you'll need to implement the actual solver
based on your conservation law solver
'''
# Implement your conservation law solver here
# This is just a skeleton - replace with actual implementation
# Temporary placeholder: return dummy values
u_X1 = np.zeros_like(time_obs)
u_X2 = np.zeros_like(time_obs)
u_X3 = np.zeros_like(time_obs)
u_X4 = np.zeros_like(time_obs)
y_k = np.ones_like(x_k)
return u_X1, u_X2, u_X3, u_X4, y_k
def gen_measurements(self, u_list, weights=[1,1,1,1]):
'''
Generate noisy measurements
'''
measurement = []
for i, u in enumerate(u_list):
m = u + self.add_unc * np.random.normal(size=len(u))
m[m < 0] = 0
m[m > 1] = 1
measurement.append(weights[i] * m)
return np.concatenate(measurement)
def run_mcmc(self, initial_true):
'''
Run Markov Chain Monte Carlo to identify k(x)
'''
# Initial parameter setup
Mr = self.k_i - 2 # Number of unknown points (excluding boundary points)
# Initial guess for k(x)
theta_1 = np.random.normal(loc=self.k_mu, scale=self.k_sigma, size=Mr)
theta_1 = np.clip(theta_1, 0.95, 1.3)
# Pad k(x) with boundary values
y_k_initial = np.concatenate([[1], theta_1, [1]])
# Prepare initial ensemble
initial_ensemble_kx = np.zeros((self.k_i, 1))
initial_ensemble_kx[1:-1, 0] = theta_1
initial_ensemble_kx[0, 0] = 1
initial_ensemble_kx[-1, 0] = 1
# Generate true measurements
u_true_list = [
self.u_true[0],
self.u_true[1],
self.u_true[2],
self.u_true[3]
]
measurement = self.gen_measurements(u_true_list)
# Initial simulation to get first loss
ensembleOfPredictedObservations = self.simulate_g(
initial_ensemble_kx,
initial_true
)
# Compute initial loss
measIndex = np.arange(len(measurement))
weight_obs = sparse.diags(1 / (self.add_unc * np.ones(measIndex.shape[0])) ** 2)
meas_diff = ensembleOfPredictedObservations[measIndex, 0] - measurement[measIndex]
Phi_theta_1 = meas_diff.T @ weight_obs @ meas_diff
# Store initial values
self.Loss[0] = Phi_theta_1
self.theta_history.append(theta_1)
# MCMC loop
kk = 0
while self.Loss[kk] > self.error_val and kk < self.kk_tolerance:
# Generate perturbed parameter vector
phi_1 = np.sqrt(1 - self.beta**2) * theta_1 + self.beta * self.k_sigma * np.random.normal(size=Mr)
phi_1 = np.clip(phi_1, 0.25, 1.75)
# Update ensemble with perturbed parameters
initial_ensemble_kx[1:-1, 0] = phi_1
# Simulate with perturbed parameters
ensembleOfPredictedObservations = self.simulate_g(
initial_ensemble_kx,
initial_true
)
# Compute loss for perturbed parameters
meas_diff = ensembleOfPredictedObservations[measIndex, 0] - measurement[measIndex]
Phi_phi_1 = meas_diff.T @ weight_obs @ meas_diff
# Compute acceptance probability
a_theta_1_phi_1 = min(1, np.exp(Phi_theta_1 - Phi_phi_1))
# Accept/reject step
choice = np.random.uniform(0, 1)
if choice <= a_theta_1_phi_1:
theta_1 = phi_1.copy()
Phi_theta_1 = Phi_phi_1
# Update iteration
kk += 1
self.Loss[kk] = Phi_theta_1
self.theta_history.append(theta_1)
# Optional: Print progress
print(f"Iteration: {kk}, Loss: {self.Loss[kk]}")
return theta_1
def plot_results(self):
'''
Plot MCMC results
'''
# Plot loss function
plt.figure()
to_plot = self.Loss[self.Loss >= 0]
plt.plot(to_plot)
plt.title("Loss Function")
plt.xlabel("Iteration")
plt.ylabel("Loss")
plt.grid(True)
plt.show()
# Plot identified k(x)
plt.figure()
x_k = np.linspace(self.x0, self.x1, self.k_i)
y_k_ident = np.concatenate([[1], self.theta_history[-1], [1]])
plt.plot(x_k, y_k_ident, '-b')
plt.title("Identified k(x) Function")
plt.xlabel("x")
plt.ylabel("k(x)")
plt.grid(True)
plt.show()
# Example usage
def main():
# Example initialization and usage
system = System(x0=0, x1=1, NTime=100)
# Load initial condition (example)
initial_true = np.random.rand(system.Nx, 1)
# Run MCMC
identified_k = system.run_mcmc(initial_true)
# Plot results
system.plot_results()
if __name__ == "__main__":
main()
\ No newline at end of file
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment