Commit 68847e94 authored by Dennis Noll's avatar Dennis Noll
Browse files

[tasks] sync: new Upload workflow for sync with ext. groups

parent 3e9f2480
......@@ -12,6 +12,7 @@ from law.task.base import ExternalTask
import luigi
import boost_histogram as bh
import hist
from tqdm import tqdm
from tasks.base import BaseTask, ConfigTask
from tasks.mixins import PGroupMixin, RecipeMixin
......@@ -19,18 +20,35 @@ from tasks.coffea import CoffeaProcessor
from tasks.plotting import PlotHistsBase
class SyncTask(RecipeMixin, ConfigTask):
class SyncSelectionUpload(RecipeMixin, ConfigTask):
upload_identifier = luigi.Parameter(default="")
def requires(self):
return CoffeaProcessor.req(self, processor="SyncSelectionExporter", debug=True)
class SyncSelectionUpload(SyncTask):
def output(self):
return self.wlcg_target("sync.root", fs="wlcg_fs_public")
return self.wlcg_target(f"{self.upload_identifier}sync.root", fs="wlcg_fs_public")
@cached_property
def lookup_set(self):
return self.config_inst.get_aux("sync", {})
def lookup(self, old_key):
key = old_key
for (old, new) in self.lookup_set:
key = key.replace(old, new)
return key
def run(self):
intree = uproot.open(self.input().path)
with self.output().localize("w") as target:
self.input().copy_to(target)
with uproot.recreate(target.path) as file:
outtree = {k: intree["tree"][k].array() for k in intree["tree"].keys()}
for k_old in list(outtree.keys()):
k_new = self.lookup(k_old.decode())
outtree[k_new] = outtree.pop(k_old)
file["tree"] = uproot.newtree({n: v.dtype for n, v in outtree.items()})
file["tree"].extend(outtree)
class SyncFile(ExternalTask):
......@@ -50,7 +68,9 @@ def reduce_along_axis(func, axis, h):
def _kstest(arr):
return scipy.stats.kstest(arr[:, 0], arr[:, 1]).statistic
with np.errstate(invalid="ignore"):
return scipy.stats.kstest(arr[0, :], arr[1, :]).statistic
return scipy.stats.chisquare(arr[0, :], arr[1, :]).statistic
def kstest(h1, h2, compare_axis, feature_axis):
......@@ -81,7 +101,7 @@ def kstest(h1, h2, compare_axis, feature_axis):
# perform test
kstest = np.vectorize(_kstest, signature="(2,n)->()")
hnew.view()[:] = kstest(s)
hnew.view()[:] = kstest(s["value"])
return hnew
......@@ -104,7 +124,11 @@ class SyncSelection(PlotHistsBase, ConfigTask):
req = {}
if self.own:
req.update(
{"own": CoffeaProcessor.req(self, processor="SyncSelectionExporter", debug=True)}
{
"own": CoffeaProcessor.req(
self, processor="SyncSelectionExporter", explorative=True
)
}
)
req.update({name: SyncFile.req(self, filepath=file) for name, file in self._files.items()})
return req
......@@ -115,7 +139,7 @@ class SyncSelection(PlotHistsBase, ConfigTask):
assert len(inps) > 0, "No inputs defined, use `--own` and/or `--files`"
out = {}
for key, target in inps.items():
for key, target in tqdm(inps.items(), desc="load data"):
tree = uproot.open(target.path)["tree"]
keys, size = tree.keys(), len(tree["eventnr_eventnr"].array())
dtype = [(k.decode(), "i4" if self.is_mask_key(k.decode()) else "f4") for k in keys]
......@@ -150,7 +174,7 @@ class SyncSelection(PlotHistsBase, ConfigTask):
def run(self):
statistical_tests = {}
for mask_key in self.mask_keys:
for mask_key in tqdm(self.mask_keys, desc="categories"):
for feature_key in self.feature_keys:
# extract name + binning from variables
var_name = self.lut.get(feature_key, feature_key)
......@@ -159,9 +183,10 @@ class SyncSelection(PlotHistsBase, ConfigTask):
var_name = var.name
binning = var.binning
except ValueError:
# default binning
binning = (100, -400, +400)
compare = hist.Hist(
hist.axis.StrCategory(self.data.keys(), name="group"),
hist.axis.StrCategory(list(self.data.keys()), name="group"),
hist.axis.Regular(*binning, name="variable"),
storage=hist.storage.Weight(),
)
......@@ -169,23 +194,23 @@ class SyncSelection(PlotHistsBase, ConfigTask):
mask = dat[mask_key].astype(bool)
values = dat[feature_key][mask]
compare.fill(group=key, variable=values)
self.plot(
targets=[self.local_target(f"plots/{mask_key}/{feature_key}.pdf")],
stack=compare[:1, :],
lines=compare[1:, :],
ratio="lines",
)
# plot histograms
self.plot(compare)
with self.plot_silencer():
# ratio plot still wrong due to https://github.com/scikit-hep/boost-histogram/issues/621
self.plot(
targets=[self.local_target(f"plots/{mask_key}/{feature_key}.pdf")],
stack=compare[:1, :],
lines=compare[1:, :],
ratio="lines",
)
# statistical tests
hnew = kstest(compare[:1, :], compare[1:, :], 0, -1)
statistical_tests[f"{mask_key}_{feature_key}"] = hnew.view()[0]
self.output()["statistical_tests"].dump(statistical_tests)
class SyncSelectionWrapper(SyncTask):
def requires(self):
return {
"upload": SyncSelectionUpload.req(self),
"plots": SyncSelectionPlots.req(self),
}
# class SyncSelectionWrapper(SyncTask):
# def requires(self):
# return {
# "upload": SyncSelectionUpload.req(self),
# "plots": SyncSelectionPlots.req(self),
# }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment