Commit 28b27e41 authored by DavidWalz's avatar DavidWalz
Browse files

first version

parent b1c193b9
This diff is collapsed.
from __future__ import division
import numpy as np
def rand_gumbel(lgE, A, size=None, model='EPOS-LHC'):
"""
Random Xmax values for given energy E [EeV] and mass number A
See Manlio De Domenico et al., JCAP07(2013)050, doi:10.1088/1475-7516/2013/07/050
Args:
lgE (array): energy log10(E/eV)
A (array): mass number
model (string): hadronic interaction model
size (int, optional): number of xmax values to create
Returns:
array of random Xmax values in [g/cm^2]
"""
lE = lgE - 19
lnA = np.log(A)
D = np.array([np.ones_like(A), lnA, lnA**2])
params = {
'QGSJetII': {
'mu': ((758.444, -10.692, -1.253), (48.892, 0.02, 0.179), (-2.346, 0.348, -0.086)),
'sigma': ((39.033, 7.452, -2.176), (4.390, -1.688, 0.170)),
'lambda': ((0.857, 0.686, -0.040), (0.179, 0.076, -0.0130))},
'QGSJetII-04': {
'mu': ((761.383, -11.719, -1.372), (57.344, -1.731, 0.309), (-0.355, 0.273, -0.137)),
'sigma': ((35.221, 12.335, -2.889), (0.307, -1.147, 0.271)),
'lambda': ((0.673, 0.694, -0.007), (0.060, -0.019, 0.017))},
'Sibyll2.1': {
'mu': ((770.104, -15.873, -0.960), (58.668, -0.124, -0.023), (-1.423, 0.977, -0.191)),
'sigma': ((31.717, 1.335, -0.601), (-1.912, 0.007, 0.086)),
'lambda': ((0.683, 0.278, 0.012), (0.008, 0.051, 0.003))},
'EPOS1.99': {
'mu': ((780.013, -11.488, -1.906), (61.911, -0.098, 0.038), (-0.405, 0.163, -0.095)),
'sigma': ((28.853, 8.104, -1.924), (-0.083, -0.961, 0.215)),
'lambda': ((0.538, 0.524, 0.047), (0.009, 0.023, 0.010))},
'EPOS-LHC': {
'mu': ((775.589, -7.047, -2.427), (57.589, -0.743, 0.214), (-0.820, -0.169, -0.027)),
'sigma': ((29.403, 13.553, -3.154), (0.096, -0.961, 0.150)),
'lambda': ((0.563, 0.711, 0.058), (0.039, 0.067, -0.004))}}
param = params[model]
p0, p1, p2 = np.dot(param['mu'], D)
mu = p0 + p1 * lE + p2 * lE**2
p0, p1 = np.dot(param['sigma'], D)
sigma = p0 + p1 * lE
p0, p1 = np.dot(param['lambda'], D)
lambd = p0 + p1 * lE
return mu - sigma * np.log(np.random.gamma(lambd, 1. / lambd, size=size))
""" Plotting routines
"""
from __future__ import division
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.collections import PatchCollection
from matplotlib.colors import Normalize
from utils import distance2showeraxis
def plot_array(v_stations, values, label='', vmin=None, vmax=None):
"""Plot a map *values* for an detector array specified by *v_stations*.
"""
xd, yd, zd = v_stations.T / 1000 # in [km]
circles = [plt.Circle((x, y), 0.650) for x, y in zip(xd.flat, yd.flat)]
fig, ax = plt.subplots(1)
fig.subplots_adjust(left=0, bottom=0.13)
coll = PatchCollection(circles, norm=Normalize(vmin=vmin, vmax=vmax))
coll.set_array(values)
coll.cmap.set_under('#d3d3d3')
ax.add_collection(coll)
ax.set(aspect='equal')
ax.grid()
cbar = fig.colorbar(coll)
cbar.set_label(label)
ax.set_xlim(-8, 8)
ax.set_ylim(-8, 8)
ax.set_xlabel('x [km]')
ax.set_ylabel('y [km]')
# def plot_traces_of_array_for_one_event(Smu, Sem, v_axis, v_stations, arraysize = 5):
# '''plot time traces of tanks around the tank with the smallest distance to shower core
# Sem = EM-SigmalTrace, Smu = Muon-SignalTrace, arraysize = length of array'''
# t = np.arange(0, 2001, 25)
# fig, axes = subplots(arraysize, arraysize, sharex=True, figsize=(29, 16), dpi=80, facecolor='w', edgecolor='k')
# t = np.arange(12.5, 2001, 25)
# tight_layout()
# coordVcore = np.argmin(distance2showeraxis(v_stations, v_axis))
# coordX = int(coordVcore / 11)
# coordY = coordVcore % 11
# coords = np.array(0)
# for j in range(arraysize):
# for k in range(arraysize):
# coords = np.append(coords, (coordX-2+j)*11 + (coordY-2+k))
# coords = coords[1:arraysizearraysize+1]
# for i, ax in enumerate(axes.flat):
# try:
# h1, h2 = Smu[coords[i]], Sem[coords[i]]
# except TypeError:
# h2 = np.histogram(0, bins=t)[0].astype(float)
# ax.step(t, h1 + h2, c='k', where='mid')
# ax.step(t, h1, label='$\mu$', where='mid')
# ax.step(t, h2, label='$e\gamma$', where='mid')
# ax.legend(title='r=%i' % coords[i], fontsize='x-small')
# ax.set_xlim(0, 1500)
# ax.grid(True)
# for k in range(arraysize):
# for l in range(arraysize):
# axes[k, l].set_xlabel('$t$ / ns')
# axes[k, l].set_ylabel('$S$ / VEM')
# savefig('trace_VEM.png')#, bbox_inches='tight')
if __name__ == '__main__':
d = np.load('showers.npz')
logE = d['logE']
mass = d['mass']
Xmax = d['Xmax']
v_core = d['showercore']
v_axis = d['showeraxis']
v_max = d['showermax']
v_stations = d['detector']
T = d['time']
S = d['signal']
# plot example event
plot_array(v_stations, T[0] * 1E6, label='time [mu s]', vmin=-8, vmax=8)
plt.savefig('time.png')
logS = np.log10(S.sum(axis=-1))
plot_array(v_stations, logS[0], label='log(signal)', vmin=0, vmax=5)
plt.savefig('signal.png')
from __future__ import division
import numpy as np
from utils import *
import atmosphere
import gumbel
atm = atmosphere.Atmosphere()
def station_response(r, dX, logE=19, A=1):
""" Simulate time trace f(t) for a given SD station and shower.
Args:
r (float): radial distance to shower axis in [m]
dX (float): distance to Xmax in [g/cm^2]
logE (float): log10(E/eV)
A (float): mass number
Returns:
time traces f_muon(t) and f_em(t) in [VEM] for t = 0 - 2000 ns in bins of 25 ns
"""
# strength of muon and em signal
# scaling with energy
S0 = 1200 * 10**(logE - 19)
# scaling with mass number
a = A**0.15
S1 = S0 * a / (a + 1)
S2 = S0 * 1 / (a + 1)
# TODO: add scaling with zenith angle (CIC)
# ...
# scaling with distance of station to shower axis
S1 *= np.minimum((r / 1000)**-4.3, 1000)
S2 *= np.minimum((r / 1000)**-5.5, 1000)
# scaling with traversed atmosphere to station
S1 *= np.minimum((dX / 100)**-0.1, 10)
S2 *= np.minimum((dX / 100)**-0.8, 10)
# limit total signal, otherwise we get memory problems when drawing that many samples from distribution
Stot = S1 + S2
Smax = 1E6
if Stot > Smax:
S1 *= Smax / Stot
S2 *= Smax / Stot
# draw number of detected muons and em particles
N1 = np.maximum(int(S1 + S1**.5 * np.random.randn()), 0) # number of muons
N2 = np.maximum(int(S2 + S2**.5 * np.random.randn()), 0) # number of em particles
# parameters of log-normal distributions
mean1 = np.log(200 * (r / 750)**1.2 * (1 - 0.2 * dX / 1000.))
mean2 = np.log(320 * (r / 750)**1.2 * (1 - 0.1 * dX / 1000.))
sigma1 = 0.7
sigma2 = 0.7
# draw samples from distributions and create histograms
shift = np.exp(mean2) - np.exp(mean1)
bins = np.arange(0, 2001, 25) # time bins [ns]
h1 = np.histogram(np.random.lognormal(mean1, sigma1, size=N1), bins=bins)[0]
h2 = np.histogram(np.random.lognormal(mean2, sigma2, size=N2) + shift, bins=bins)[0]
# total signal (simplify: 1 particle = 1 VEM)
return h1, h2
def detector_response(logE, mass, v_axis, v_max, v_stations):
""" Simulate the detector response for all SD stations and one event
"""
r = distance2showeraxis(v_stations, v_axis) # radial distance to shower axis
phi, zen = vec2ang(v_max - v_stations) # direction of shower maximum relative to stations
# distance to shower maximum in [g/cm^2]
dX = atm.get_atmosphere(
zen, # zenith angle of shower maximum seen from each station
h_low=v_stations[:, 2], # height of stations
h_up=v_max[2]) # height of shower maximum
# time traces for each station
n = len(v_stations)
S1 = np.zeros((n, 80)) # muon traces
S2 = np.zeros((n, 80)) # em traces
for j in range(n):
h1, h2 = station_response(r=r[j], dX=dX[j], logE=logE, A=mass)
S1[j] = h1
S2[j] = h2
return S1, S2
def rand_shower_geometry(N, logE, mass):
""" Generate random shower geometries: Xmax, axis, core, maximum. """
N = len(logE)
# shower axis
phi = 2 * np.pi * (np.random.rand(N) - 0.5)
zenith = rand_zenith(N)
v_axis = ang2vec(phi, zenith)
# shower core on ground (random offset w.r.t. grid origin)
v_core = SPACING * (np.random.rand(N, 3) - 0.5)
v_core[:, 2] = HEIGHT # core is always at ground level
# shower maximum, require h > hmin
Xmax = np.empty(N)
hmax = np.empty(N)
i = 0
while i < N:
Xmax[i] = gumbel.rand_gumbel(logE[i], mass[i])
hmax[i] = atm.get_vertical_height(zenith[i], Xmax[i])
if hmax[i] < HEIGHT + 200:
continue # shower maximum too low, repeat
i += 1
# position of shower maximum
dmax = (hmax - HEIGHT) / np.cos(zenith) # distance to Xmax
v_max = v_core + v_axis * dmax[:, np.newaxis]
return Xmax, v_axis, v_core, v_max
if __name__ == '__main__':
# detector array
nb_stations = 11**2 # number of stations
v_stations = triangular_array(nb_stations**.5) # x,y,z coordinates of SD stations
# showers
nb_events = 2
logE = 18.5 + 1.5 * np.random.rand(nb_events)
mass = 1 * np.ones(nb_events)
Xmax, v_axis, v_core, v_max = rand_shower_geometry(nb_events, logE, mass)
# detector response for each event
T = np.zeros((nb_events, nb_stations))
S = np.zeros((nb_events, nb_stations, 80))
for i in range(nb_events):
print(i)
s1, s2 = detector_response(logE[i], mass[i], v_axis[i], v_max[i], v_stations)
S[i] = s1 + s2
T[i] = arrival_time_planar(v_stations, v_core[i], v_axis[i])
# add per ton noise on arrival time
T += 20E-9 * np.random.randn(*T.shape)
# add time offset per event (to account for core position)
T += 100E-9 * (np.random.rand(nb_events, 1) - 0.5)
# add relative noise to signal pulse
S += 0.02 * S * np.random.randn(*S.shape)
# add absolute noise to signal pulse
S += 0.5 + 0.2 * np.random.randn(*S.shape)
# trigger threshold: use only stations with sufficient signal-to-noise
c = S.sum(axis=-1) < 80 * 0.55
T[c] = np.NaN
S[c] = np.NaN
# TODO: apply array trigger (3T5)
# save
np.savez_compressed(
'showers.npz',
logE=logE,
mass=mass,
Xmax=Xmax,
time=T,
signal=S,
showercore=v_core,
showeraxis=v_axis,
showermax=v_max,
detector=v_stations)
from pylab import *
from scipy.stats import lognorm
# scipy.stats.lognorm
# ----------------------------------------------
# p(x) = 1 / (s*x*sqrt(2*pi)) * exp(-1/2*(ln(x)/s)**2)
# lognorm takes s as a shape parameter.
# The probability density above is defined in the “standardized” form.
# To shift and/or scale the distribution use the loc and scale parameters.
# lognorm.pdf(x, s, loc, scale) = lognorm.pdf(y, s) / scale with y = (x - loc) / scale.
# A common parametrization is in terms of mu and sigma, of the unique normally distributed random variable X such that exp(X) = Y.
# This parametrization corresponds to setting s = sigma and scale = exp(mu).
# np.random.lognormal
# ---------------------------------------------
# p(x) = 1 / (sigma*x*sqrt(2*pi) * exp(-1/2*(ln(x)-mu)**2)/sigma**2)
def julie(mu, sigma, bins=linspace(0, 20, 101)):
figure()
N = 10000
hist(np.random.lognormal(mean=mu, sigma=sigma, size=N), bins=bins, alpha=0.6)
hist(lognorm.rvs(sigma, loc=mu, scale=exp(mu), size=N) - mu, bins=bins, alpha=0.6)
text(0.05, 0.9, '$\mu$=%.2f, $\sigma$=%.2f' % (mu, sigma), transform=gca().transAxes)
julie(1, 1)
# julie(1, 0.5)
# julie(1.5, 1)
# julie(2, 1)
# julie(2.5, 1)
# julie(2.5, 0.5)
julie(log(250), 0.7, bins=linspace(0, 1000, 101))
show()
from pylab import *
close('all')
# r = linspace(100, 3000)
# figure()
# plot(r, 220 * (r / 750)**1.2)
# plot(r, 420 * (r / 750)**1.2)
# plot([750, 850, 1250], [220, 260, 500], 'C0+')
# plot([750, 850, 1250], [420, 500, 1000], 'C1+')
# xlabel('$r$ [m]')
# ylabel('$\Delta t$ [ns]')
# grid()
# for r in [750, 850, 1250]:
# mean1 = np.log(200 * (r / 750)**1.2) # * (1 - 0.2 * dX / 1000.)
# mean2 = np.log(320 * (r / 750)**1.2) # * (1 - 0.1 * dX / 1000.)
# sigma1 = 0.7 # * (r / 1000.)**0.15 * (1 - 0.2 * dX / 1000.)
# sigma2 = 0.7 # * (r / 1000.)**0.15
# figure()
# bins = np.arange(0, 2001, 25) # time bins [ns]
# axvline(exp(mean1), color='C0')
# axvline(exp(mean2), color='C1')
# hist(np.random.lognormal(mean1, sigma1, size=10000), bins=bins, alpha=0.7)
# hist(np.random.lognormal(mean2, sigma2, size=10000) + exp(mean2)-exp(mean1), bins=bins, alpha=0.7)
# xlim(0, 1500)
# show()
r = 750
for dX in [300, 600, 900]:
mean1 = np.log(200 * (r / 750)**1.2 * (1 - 0.2 * dX / 1000.))
mean2 = np.log(320 * (r / 750)**1.2 * (1 - 0.1 * dX / 1000.))
sigma1 = 0.7 # * (r / 1000.)**0.15 * (1 - 0.2 * dX / 1000.)
sigma2 = 0.7 # * (r / 1000.)**0.15
figure()
bins = np.arange(0, 2001, 25) # time bins [ns]
axvline(exp(mean1), color='C0')
axvline(exp(mean2), color='C1')
shift = exp(mean2)-exp(mean1)
hist(np.random.lognormal(mean1, sigma1, size=10000), bins=bins, alpha=0.7)
hist(np.random.lognormal(mean2, sigma2, size=10000) + shift, bins=bins, alpha=0.7)
xlim(0, 1500)
show()
from __future__ import division
import numpy as np
HEIGHT = 1400 # detector height in [m]
SPACING = 1500 # detector spacing in [m]
def rectangular_array(n=11):
""" Coordinates for rectangular array with n^2 stations and given spacing.
Returns: n^2 x 3 array of x,y,z coordinates for each station.
"""
n0 = (n - 1) / 2
x, y = (np.mgrid[0:n, 0:n].astype(float) - n0) * SPACING
z = np.ones_like(x) * HEIGHT # z-position
return np.dstack([x, y, z]).reshape(-1, 3)
def triangular_array(n=11, offset=True):
""" Coordinates for triangular array with n^2 stations and given spacing.
Returns: n^2 x 3 array of x,y,z coordinates for each station.
"""
n0 = (n - 1) / 2
x, y = np.mgrid[0:n, 0:n].astype(float) - n0
if offset: # offset coordinates
x += 0.5 * (y % 2)
else: # axial coordinates
x += 0.5 * y
x *= SPACING
y *= np.sin(np.pi / 3) * SPACING
z = np.ones_like(x) * HEIGHT # z-position
return np.dstack([x, y, z]).reshape(-1, 3)
def rand_zenith(N=1, zmax=np.pi / 3):
""" Sample random zenith angles z for air shower surface detector.
Returns zenith angles z sampled pdf f(z) ~ sin(z) cos(z) in range [0, zmax].
"""
b = 1 / (1 - np.cos(zmax)**2)
r = np.random.rand(N)
return np.arccos(np.sqrt(1 - r / b))
def ang2vec(phi, zenith):
""" Get 3-vector from spherical angles.
Args:
phi (array): azimuth (pi, -pi), 0 points in x-direction, pi/2 in y-direction
zenith (array): zenith (0, pi), 0 points in z-direction
Returns:
array of 3-vectors
"""
x = np.sin(zenith) * np.cos(phi)
y = np.sin(zenith) * np.sin(phi)
z = np.cos(zenith)
return np.array([x, y, z]).T
def vec2ang(v):
""" Get spherical angles phi and zenith from 3-vector
Args:
array of 3-vectors
Returns:
phi, zenith
phi (array): azimuth (pi, -pi), 0 points in x-direction, pi/2 in y-direction
zenith (array): zenith (0, pi), 0 points in z-direction
"""
x, y, z = v.T
phi = np.arctan2(y, x)
zenith = np.pi / 2 - np.arctan2(z, (x**2 + y**2)**.5)
return phi, zenith
def distance2showerplane(v, va):
""" Get shortest distance to shower plane
Args:
v (Nx3 array): array of positions
va (3 array): shower axis = normal vector of the shower plane
"""
return np.dot(v, va)
def distance2showeraxis(v, va):
""" Shortest distance to shower axis.
Args:
v (Nx3 array): array of positions
va (3 array): shower axis
"""
d = distance2showerplane(v, va)
vp = v - np.outer(d, va)
return np.linalg.norm(vp, axis=-1)
def distance2showermaximum(v, vm):
""" Shortest distance to shower maximum
Args:
v (Nx3 array): array of vectors
vm (3 array): position of the shower maximum
"""
return np.linalg.norm(v - vm, axis=-1)
def arrival_time_planar(v, vc, va):
""" Get arrival times for a planar wavefront.
Note: The shower core is not considered here and as it only adds a constant time offset to all stations.
Args:
v (N x 3 array)
vc (3 array): shower core
va (3 array): shower axis, pointing upwards
Return:
array: arrival times [s]
"""
d = distance2showerplane(v - vc, -va)
return d / 3E8
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment