Skip to content
Snippets Groups Projects
Commit 97abf87e authored by Julia310's avatar Julia310
Browse files

add ml toolkits

parent b141ee17
No related branches found
No related tags found
No related merge requests found
Showing
with 16501 additions and 0 deletions
# Changelog
## [0.0.*] - 2024-05-14
### Added
- `training` module:
- Trainer classes for `sklearn`, `PyTorch`, and `TensorFlow` to improve usability.
- `pytorch_distributed.py` for distributed machine learning (originally `distributed_training.py`).
- `datasets` module:
- `XrDataset` for smaller `xarray` data manageable in memory.
- `LargeScaleXrDataset` for `PyTorch` and `TensorFlow` to handle large datasets by iterating over (batches of) chunks.
- Enabled batch training (partial fit) for the `training.sklearn` trainer class using both `LargeScaleXrDataset` (from `datasets.pytorch_xr`) or `XrDataset`, integrated with a `PyTorch` data loader.
- Configuration options for `XrDataset` and both `LargeScaleXrDataset` to select training data
- `prepare_dataloader` method for `PyTorch` and `prepare_dataset` for `TensorFlow` to configure data processing during training (e.g., batch size, distributed training).
- `preprocessing.py` module with methods:
- `apply_filter` for filtering training data based on a filter variable contained in the dataset.
- `drop_nan_values` to remove data points containing any NaN values.
- Published the `ml4xcube` package (v0.0.6) on PyPI and Conda-forge.
### Changed
- Renamed `mltools` to `ml4xcube` due to name conflict.
- Updated `gapfilling` module:
- Added a progress bar to visualize progress during gap filling.
- Updated final print statement to show the location of gap-filled data.
- Renamed `rand` method to `assign_rand_split` in `data_assignment` module and unified its usage with `assign_block_split` to improve usability.
- Updated the use cases withing the `Examples` directory to demonstrate new / edited functionalities.
- Renamed module `distributed_training` to `pytorch_distributed`
### Removed
- Removed `torch_training` module including the containing methods `train_one_epoch` and `test`.
## [Unreleased] - 2024-04-11
### Added
- New functionality for gap filling in datasets, implemented in the `gap_dataset.py` and `gap_filling.py` scripts within the `mltools` package. This includes:
- The `GapDataset` class for handling datasets with data gaps, enabling the slicing of specific dimensions and the addition of artificial gaps for testing gap-filling algorithms.
- The `EarthSystemDataCubeS3` subclass to facilitate operations on ESDC.
- The `Gapfiller` class to fill gaps using machine learning models, with a focus on Support Vector Regression (SVR). This class supports various hyperparameter search methods and predictor strategies for gap filling.
- Methods in `gap_dataset.py` for retrieving additional data matrices as predictors and processing the actual data matrix for gap analysis.
- Example script (`gapfilling_process.py`) and detailed usage instructions for the new gap-filling functionality, demonstrating how to apply these methods to real-world datasets.
- New `geo_plots.py` script within the `mltools` package, which includes:
- `plot_geo_data` function for creating geographical plots of data from a DataFrame, complete with customizable visual features and the ability to save figures.
### Changed
- The `data_processing` module of the `mltools` package has been renamed to `statistics` to more accurately reflect its purpose and functionality.
- Updated use cases in the `Examples` directory now utilize the `plot_geo_data` function for enhanced visualization of geographic data.
- Revision of Examples: Updated Python examples for `sklearn`, `PyTorch`, and `TensorFlow` to be compatible with the current versions of these packages.
### Improved
- Streamlined the Conda (and pip) package creation process to facilitate faster and more efficient package builds with Miniconda.
## [Unreleased] - 2024-03-13
### Added
- Transformed `mltools.py` into a Python package with modules: `cube_utilities.py`, `data_processing.py`, `torch_training.py`, `sampling.py`.
- New functions integrated into the corresponding modules based on functionality.
- Two new methods in `cube_utilities.py`:
- `get_chunk_by_index` for retrieving data cube chunks by index.
- `rechunk_cube` for re-chunking the data cube.
- `distributed_training.py` added to the `mltools` package for PyTorch distributed training.
- Corresponding `distributed_training.py` example added to the `Examples` directory for practical use.
- Pip packaging support with the `pip_env` directory, including `setup.py` and `requirements.txt`.
- Conda packaging option with the `conda_recipe` directory containing `meta.yaml`.
### Changed
- Organize the initial `mltools.py` functions by functionality and expand the collection with new functions.
- Improved `get_statistics` (`data_processing.py`) method performance by utilizing Dask's parallel computation capabilities.
### Fixed
- Removed semicolons in use cases in the `Examples` directory to maintain Python syntax.
import torch
import numpy as np
import xarray as xr
import dask.array as da
from torch import nn
from global_land_mask import globe
from torch.utils.data import TensorDataset
from xcube.core.store import new_data_store
from ml4xcube.datasets.xr_dataset import XrDataset
from ml4xcube.cube_utilities import get_chunk_sizes
from ml4xcube.data_split import assign_block_split
from ml4xcube.preprocessing import get_statistics, standardize
from ml4xcube.datasets.pytorch import prepare_dataloader
from ml4xcube.training.pytorch_distributed import ddp_init, dist_train, Trainer
# To utilize ml4xcube for distributed training, use the following command with torchrun to initiate the process:
#
# torchrun --standalone --nproc_per_node=<number_of_processes> distributed_training.py <epochs>
#
# Replace `<number_of_processes>` with the number of processes you wish to run per node,
# and `<epochs>` with the total number of training epochs.
def preprocess_data(ds: xr.Dataset):
ds = XrDataset(ds, 3).get_dataset()
at_stat = get_statistics(ds, 'air_temperature_2m')
lst_stat = get_statistics(ds, 'land_surface_temperature')
X = standardize(ds['air_temperature_2m'], *at_stat)
y = standardize(ds['land_surface_temperature'], *lst_stat)
# Split the data based on the 'split' attribute
X_train, X_test = X[ds['split'] == True], X[ds['split'] == False]
y_train, y_test = y[ds['split'] == True], y[ds['split'] == False]
X_train = X_train.reshape(-1, 1) # Making it [num_samples, 1]
y_train = y_train.reshape(-1, 1)
X_test = X_test.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)
return X_train, X_test, y_train, y_test
def load_train_objs():
"""Load and preprocess dataset, returning prepared training and testing sets, model, and optimizer."""
# Load dataset from storage
data_store = new_data_store("s3", root="esdl-esdc-v2.1.1", storage_options=dict(anon=True))
dataset = data_store.open_data('esdc-8d-0.083deg-184x270x270-2.1.1.zarr')
ds = dataset[['land_surface_temperature', 'air_temperature_2m']]
# Create a land mask
lon_grid, lat_grid = np.meshgrid(ds.lon, ds.lat)
lm0 = da.from_array(globe.is_land(lat_grid, lon_grid))
lm = da.stack([lm0 for _ in range(ds.dims['time'])], axis=0)
# Assign land mask to the dataset and split data into blocks
ds = ds.assign(land_mask=(['time', 'lat', 'lon'], lm.rechunk(chunks=([v for _, v in get_chunk_sizes(ds)]))))
ds = assign_block_split(ds, block_size=[("time", 10), ("lat", 100), ("lon", 100)], split=0.8)
# Preprocess data and split into training and testing sets
X_train, X_test, y_train, y_test = preprocess_data(ds)
# Create TensorDataset objects for both training and testing sets
train_set = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
test_set = TensorDataset(torch.tensor(X_test), torch.tensor(y_test))
# Initialize model and optimizer
model = nn.Linear(in_features=1, out_features=1, bias=True)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
return train_set, test_set, model, optimizer
def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt", best_model_path: str = 'Best_Model.pt'):
"""Main function to run distributed training process."""
# Initialize distributed data parallel training
ddp_init()
# Load training objects and prepare data loaders
train_set, test_set, model, optimizer = load_train_objs()
train_loader = prepare_dataloader(train_set, batch_size, num_workers=5, parallel=True)
test_loader = prepare_dataloader(test_set, batch_size, num_workers=5, parallel=True)
# Initialize the trainer and start training
trainer = Trainer(model, train_loader, test_loader, optimizer, save_every, best_model_path, snapshot_path, task_type='supervised')
dist_train(trainer, total_epochs)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('--save_every', default=10, type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=7, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()
main(args.save_every, args.total_epochs, args.batch_size)
\ No newline at end of file
Source diff could not be displayed: it is too large. Options to address this: view the blob.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
%% Cell type:markdown id:bcdc0bb5518a402e tags:
# 3D Visualization of Earth System Data Cubes using Lexcube
## Introduction
This notebook demonstrates how to visualize Earth System Data Cubes (ESDC) in 3D using the Lexcube library. The focus is on providing an enhanced visual representation of the data used in three different machine learning notebooks that utilize PyTorch, TensorFlow, and Scikit-learn for predicting missing land surface temperature values from air temperature values. The datacubes involved in these analyses are:
- land_surface_temperature
- air_temperature_2m
## Purpose
The primary goal of this notebook is to offer a comprehensive and interactive 3D visualization of the datacubes, facilitating a better understanding of the spatial and temporal relationships within the data. This notebook complements the following machine learning examples:
- ML on ESDC using PyTorch: Demonstrates linear regression for predicting missing land surface temperature values from air temperature values using PyTorch.
- ML on ESDC using TensorFlow: Showcases a similar predictive model implemented in TensorFlow.
- ML on ESDC using Scikit-learn: Uses Scikit-learn to achieve the same predictive goal.
%% Cell type:code id:initial_id tags:
``` python
import lexcube
from xcube.core.store import new_data_store
```
%% Cell type:code id:e968359d-db06-4700-8223-d1b256d1727f tags:
``` python
data_store = new_data_store("s3", root="esdl-esdc-v2.1.1", storage_options=dict(anon=True))
dataset = data_store.open_data('esdc-8d-0.083deg-184x270x270-2.1.1.zarr')
# Smaller cube for demo case
start_time = "2002-05-21"
end_time = "2002-05-29"
dataset = dataset[["land_surface_temperature", "air_temperature_2m"]].sel(time=slice(start_time, end_time))
```
%% Cell type:markdown id:2620846ab4ac4610 tags:
The following visualization refers to the air temperature data cube based on which the missing land surface temperature values are predicted.
%% Cell type:code id:6cc8b086-7c8f-4b9c-80b5-f95a975dcda8 tags:
``` python
at_ds = dataset['air_temperature_2m']
w1 = lexcube.Cube3DWidget(at_ds, cmap="thermal")
w1
```
%% Output
Cube3DWidget(api_metadata={'/api': {'status': 'ok', 'api_version': 5}, '/api/datasets': [{'id': 'default', 'sh…
%% Cell type:code id:70e4dd2f-7c2d-42e0-b634-93102010fbb3 tags:
``` python
w1.show_sliders()
```
%% Output
Sliders(children=(HBox(children=(IntRangeSlider(value=(0, 4319), description='lon:', max=4319), Label(value='-…
%% Cell type:markdown id:9b4301a2-df45-43eb-9150-46c0d2519507 tags:
The data cube displayed next corresponds to the land surface temperature.
%% Cell type:code id:99561855-c80a-4bff-929d-f3c66aeda73e tags:
``` python
lst_ds = dataset['land_surface_temperature']
w2 = lexcube.Cube3DWidget(lst_ds, cmap="thermal")
w2
```
%% Output
/home/julia/miniconda3/envs/cube_vis/lib/python3.10/site-packages/jupyter_client/session.py:721: UserWarning: Message serialization failed with:
Out of range float values are not JSON compliant
Supporting this message is deprecated in jupyter-client 7, please make sure your message is JSON-compliant
content = self.pack(content)
Cube3DWidget(api_metadata={'/api': {'status': 'ok', 'api_version': 5}, '/api/datasets': [{'id': 'default', 'sh…
%% Cell type:code id:2f0dedf4-95d4-42ac-b5ec-c2fb31b082a8 tags:
``` python
w2.show_sliders()
```
%% Output
Sliders(children=(HBox(children=(IntRangeSlider(value=(0, 4319), description='lon:', max=4319), Label(value='-…
include LICENSE
include README.md
include ml4xcube/gapfilling/helper/*
\ No newline at end of file
{% set name = "ml4xcube" %}
{% set version = "0.0.6" %}
package:
name: {{ name|lower }}
version: {{ version }}
source:
url: https://pypi.io/packages/source/{{ name[0] }}/{{ name }}/ml4xcube-{{ version }}.tar.gz
sha256: 3f58ed6e4babf54cf935bc881348ceea99c8207c24e878d2e793bb8bb9221903
build:
noarch: python
script: {{ PYTHON }} -m pip install . -vv --no-deps --no-build-isolation
number: 0
requirements:
host:
- python >=3.8
- pip
run:
- python >=3.8
- conda-forge::xcube
- bokeh >=2.4.3
- dask-core >=2023.2.0
- dask >=2023.2.0
- numpy >=1.24
- pandas >=2.2
- scikit-learn >1.3.1
- xarray >2023.8.0
- zarr >2.11
- rechunker >=0.5.1
- sentinelhub
test:
imports:
- ml4xcube
commands:
- pip check
requires:
- pip
about:
home: https://pypi.org/project/ml4xcube/
license: MIT
license_family: MIT
license_file: LICENSE
summary: 'ML package for data cubes'
description: 'ML package for data cubes'
doc_url: https://deepesdl.readthedocs.io/en/latest/ml-toolkit/introduction/
dev_url: https://github.com/deepesdl/ML-Toolkits/tree/master/mltools/ml4xcube
extra:
recipe-maintainers:
- Julia310
\ No newline at end of file
import itertools
import rechunker
import numpy as np
import xarray as xr
from typing import Sequence, Tuple, Iterator, Dict, List, Optional
def get_chunk_sizes(ds: xr.Dataset) -> Sequence[Tuple[str, int]]:
"""Determine maximum chunk sizes of all data variables of dataset *ds*.
Helper function.
"""
chunk_sizes = {}
for var in ds.data_vars.values():
if var.chunks:
chunks = tuple(max(*c) if len(c) > 1 else c[0]
for c in var.chunks)
for dim_name, chunk_size in zip(var.dims, chunks):
chunk_sizes[dim_name] = max(chunk_size,
chunk_sizes.get(dim_name, 0))
return [(str(k), v) for k, v in chunk_sizes.items()]
def iter_data_var_blocks(ds: xr.Dataset,
block_sizes: Sequence[Tuple[str, int]] = None) \
-> Iterator[Dict[str, np.ndarray]]:
"""Create an iterator that will provide all data blocks of all data
variables of given dataset *ds*.
The data blocks' order and shapes are predescribed
by *block_sizes* argument, which is a seqence comprising
dimension name and block size pairs. If *block_size is not given,
the chunk sizes of data variables are used instead.
"""
block_sizes = get_chunk_sizes(ds) if block_sizes is None else block_sizes
dim_ranges = []
for dim_name, chunk_size in block_sizes:
dim_size = ds.dims[dim_name]
dim_ranges.append(range(0, dim_size, chunk_size))
for offsets in itertools.product(*dim_ranges):
dim_slices = {block_size[0]: slice(offset, offset + block_size[1])
for block_size, offset in zip(block_sizes, offsets)}
var_blocks = {}
for var_name, var in ds.data_vars.items():
indexers = {dim_name: dim_slice
for dim_name, dim_slice in dim_slices.items()
if dim_name in var.dims}
var_blocks[var_name] = var.isel(indexers).values
yield var_blocks
def calculate_total_chunks(ds: xr.Dataset, block_sizes: Sequence[Tuple[str, int]] = None) -> int:
"""Calculate the total number of chunks for the dataset based on maximum chunk sizes."""
default_block_sizes = get_chunk_sizes(ds)
if block_sizes is not None:
# Replace the sizes which are not None
for i, (dim, size) in enumerate(block_sizes):
if size is not None:
default_block_sizes[i] = (dim, size)
block_sizes = default_block_sizes
total_chunks = np.prod([
len(range(0, ds.dims[dim_name], size))
for dim_name, size in block_sizes
])
return total_chunks
def get_chunk_by_index(ds: xr.Dataset, index: int, block_sizes: Sequence[Tuple[str, int]] = None) -> Dict[
str, np.ndarray]:
"""
Returns a specific data chunk from an xarray.Dataset by index.
Parameters:
- ds: The xarray.Dataset from which to retrieve a chunk.
- index: The linear index of the chunk to retrieve.
- block_sizes: An optional sequence of tuples specifying the block size for each dimension.
Each tuple should contain a dimension name and a block size for that dimension.
If not provided, the function will attempt to use the dataset's chunk sizes.
Returns:
A dictionary where keys are variable names and values are the chunk data as numpy arrays.
"""
# Get the default chunk sizes from the dataset
default_block_sizes = get_chunk_sizes(ds)
if block_sizes is not None:
# Replace the sizes which are not None
for i, (dim, size) in enumerate(block_sizes):
if size is not None:
default_block_sizes[i] = (dim, size)
block_sizes = default_block_sizes
# Calculate the total number of chunks along each dimension
dim_chunks = [range(0, ds.dims[dim_name], size) for dim_name, size in block_sizes]
total_chunks_per_dim = [len(list(chunks)) for chunks in dim_chunks]
# Convert the linear index to a multi-dimensional index
multi_dim_index = np.unravel_index(index, total_chunks_per_dim)
# Calculate the slice for each dimension based on the multi-dimensional index
dim_slices = {}
for dim_idx, (dim_name, block_size) in enumerate(block_sizes):
start = multi_dim_index[dim_idx] * block_size
end = min(start + block_size, ds.dims[dim_name])
dim_slices[dim_name] = slice(start, end)
# Extract the chunk for each variable
var_blocks = {}
for var_name, var in ds.data_vars.items():
# Determine the slices applicable to this variable
indexers = {dim_name: dim_slice for dim_name, dim_slice in dim_slices.items() if dim_name in var.dims}
# Extract the chunk using variable-specific indexers
var_blocks[var_name] = var.isel(indexers).values
return var_blocks
def rechunk_cube(source_cube: xr.DataArray, target_chunks: dict | tuple | list, target_path: str):
"""
Rechunks an xarray DataArray to a new chunking scheme.
Parameters:
- source_cube: xr.DataArray
The input DataArray that you want to rechunk.
- target_chunks: dict | tuple | list
The desired chunk sizes for the rechunking operation.
If a dict, specify sizes for each named dimension, e.g., {'lon': 60, 'lat': 1, 'time': 100}.
If a tuple or list, specify sizes by order, corresponding to the array's dimensions.
- target_path: str
The path where the rechunked DataArray should be stored, typically a path to a Zarr store.
Returns:
Nothing, but prints a message upon successful completion.
"""
# Validate target_chunks input
if not isinstance(target_chunks, (dict, tuple, list)):
raise ValueError("target_chunks must be a dictionary, tuple, or list")
# Create a rechunking plan
rechunk_plan = rechunker.rechunk(source_cube, target_chunks, target_path, temp_store=None)
# Execute the rechunking
rechunk_plan.execute()
print("Rechunking completed successfully.")
def split_chunk(chunk: Dict[str, np.ndarray], point_indices: List[Tuple[str, int]],
overlap: Optional[List[Tuple[str, int]]] = None) -> Dict[str, np.ndarray]:
"""
Split a chunk into points based on provided indices.
Parameters:
- chunk: The chunk to split.
- point_indices: Specific indices for extracting data points.
Returns:
A dictionary where keys are variable names and values are the extracted points as numpy arrays.
"""
# Extract the step sizes from the point_indices
step_sizes = [step for _, step in point_indices]
# Handle overlaps
if overlap:
overlap_steps = [
int(step * overlap_frac) if step > 1 else 0
for (_, step), (_, overlap_frac) in zip(point_indices, overlap)
]
else:
overlap_steps = [0] * len(step_sizes)
# Extract the shapes of the arrays in the chunk
shape = next(iter(chunk.values())).shape # Assuming all arrays have the same shape
# Calculate the number of splits for each dimension
num_splits = [
(shape[i] - step_sizes[i]) // (step_sizes[i] - overlap_steps[i]) + 1
for i in range(len(step_sizes))
]
# Calculate the total number of resulting points
total_points = np.prod(num_splits)
# Initialize the result dictionary with the expected shape
result = {}
for key in chunk.keys():
if step_sizes[0] == 1:
result[key] = np.zeros((total_points, *step_sizes[1:]), dtype=chunk[key].dtype)
else:
result[key] = np.zeros((total_points, *step_sizes), dtype=chunk[key].dtype)
# Iterate through all possible splits
point_idx = 0
for time_idx in range(0, shape[0], step_sizes[0] - overlap_steps[0]):
if time_idx + step_sizes[0] > shape[0]: continue
for lat_idx in range(0, shape[1], step_sizes[1] - overlap_steps[1]):
if lat_idx + step_sizes[1] > shape[1]: continue
for lon_idx in range(0, shape[2], step_sizes[2] - overlap_steps[2]):
if lon_idx + step_sizes[2] > shape[2]: continue
for key in chunk.keys():
result[key][point_idx] = chunk[key][time_idx:time_idx + step_sizes[0],
lat_idx:lat_idx + step_sizes[1],
lon_idx:lon_idx + step_sizes[2]]
#print(result[key][point_idx])
point_idx += 1
return result
import random
import warnings
import xarray as xr
import dask.array as da
from typing import (Sequence, Tuple)
from ml4xcube.cube_utilities import get_chunk_sizes
warnings.filterwarnings('ignore')
def assign_rand_split(ds: xr.Dataset, split: float = 0.8):
"""Assign random split using random sampling."""
seed = 32 # Consistent seed for reproducibility
random.seed(seed)
# Ensure the random array matches the dimensions and chunk sizes of the input dataset
dimensions = list(ds.dims) # Gets the dimensions from the dataset
chunk_sizes = {dim: ds.chunks[dim] for dim in ds.dims} # Assumes the dataset is chunked
# Generate a random array with the same shape and chunking as the dataset
random_split = da.random.random(size=tuple(ds.dims[dim] for dim in dimensions), chunks=tuple(chunk_sizes[dim] for dim in dimensions)) < split
# Assign the new data array to the dataset under the variable name 'split'
return ds.assign(split=(dimensions, random_split))
### dask block sampling
def cantor_pairing(x: int, y: int):
"""unique assignment of a pair (x,y) to a natural number, bijectiv """
return int((x + y) * (x + y + 1) / 2 + y)
# for random seed generation
def cantor_tuple(index_list: list):
"""unique assignment of a tuple to a natural number, generalization of cantor pairing to tuples"""
t = index_list[0]
for x in index_list[1:]:
t = cantor_pairing(t, x)
return t
def assign_block_split(ds: xr.Dataset, block_size: Sequence[Tuple[str, int]] = None, split: float = 0.8):
"""Block sampling: add a variable "split" to xarray x, that contains blocks filled with 0 or 1 with frequency split
Usage:
xds = xr.open_zarr("***.zarr")
cube_with_split = assign_block_split(xds, block_size=[("time", 10), ("lat", 20), ("lon", 20)], split=0.5)"""
if block_size is None:
block_size = get_chunk_sizes(ds)
def role_dice(x, block_id=None):
if block_id is not None:
seed=cantor_tuple(block_id)
random.seed(seed)
return x + (random.random() < split)
def block_rand(x):
block_ind_array = da.zeros(
(list(x.dims.values())), chunks=([v for k, v in block_size])
)
mapped = block_ind_array.map_blocks(role_dice)
return ("time", "lat", "lon"), mapped
return ds.assign({"split": block_rand})
import zarr
import math
import random
import numpy as np
import xarray as xr
from multiprocessing import Pool
from typing import Tuple, Optional, Dict, List
from ml4xcube.cube_utilities import split_chunk
from ml4xcube.preprocessing import apply_filter, drop_nan_values
from ml4xcube.cube_utilities import get_chunk_by_index, calculate_total_chunks
def worker_preprocess_chunk(args):
chunk, point_indices, overlap, filter_var, callback_fn, data_split = args
if point_indices is not None:
cf = {x: chunk[x] for x in chunk.keys()}
cf = split_chunk(cf, point_indices, overlap=overlap)
else:
cf = {x: chunk[x].ravel() for x in chunk.keys()}
# Apply filtering based on the specified variable, if provided
if not filter_var is None:
cft = apply_filter(cf, filter_var)
else:
cft = cf
vars = list(cft.keys())
cft = drop_nan_values(cft, vars)
valid_chunk = all(np.nan_to_num(cft[var]).sum() > 0 for var in cf)
if valid_chunk:
if callback_fn:
cft = callback_fn(cft)
num_samples = cft[list(cft.keys())[0]].shape[0]
indices = list(range(num_samples))
random.shuffle(indices)
split_idx = int(num_samples * data_split)
train_indices = indices[:split_idx]
test_indices = indices[split_idx:]
return cft, train_indices, test_indices
return None, None, None
class MultiProcSampler():
def __init__(self, ds: xr.Dataset, filter_var: str = 'land_mask',
data_fraq: float = 0.03, data_split: float = 0.8,
nproc: int = 4, callback_fn = None,
block_sizes: Optional[Dict[str, Optional[int]]] = None,
point_indices: Optional[List[Tuple[str, int]]] = None,
overlap: Optional[List[Tuple[str, int]]] = None):
"""
Initialize the TrainTestSampler with the given dataset and parameters.
Args:
ds (xr.Dataset): The input dataset.
filter_var (str): The variable to use for filtering. Defaults to 'land_mask'.
If None, no filtering is applied.
data_fraq (float): The fraction of data to process.
data_split (float): The fraction of data to use for training. The rest is used for testing.
nproc (int): Number of processes to use for parallel processing.
callback_fn (function): Optional callback function to apply to each chunk after preprocessing.
block_sizes (Optional[Dict[str, Optional[int]]]): Optional dictionary specifying the block sizes for each dimension.
point_indices (Optional[List[Tuple[str, int]]]): List of tuples specifying the dimensions and their respective step sizes.
overlap (Optional[List[Tuple[str, int]]]): List of tuples specifying the dimensions and their respective overlap sizes.
Returns:
None
"""
self.ds = ds
self.filter_var = filter_var
self.data_fraq = data_fraq
self.data_split = data_split
self.block_sizes = block_sizes
self.point_indices = point_indices
self.overlap = overlap
self.total_chunks = int(calculate_total_chunks(self.ds, self.block_sizes) * self.data_fraq)
self.chunks = None
self.nproc = nproc
self.callback_fn = callback_fn
self.train_store = zarr.open('train_cube.zarr')
self.test_store = zarr.open('test_cube.zarr')
self.chunk_size = tuple(dim[1] for dim in self.point_indices) if self.point_indices else (1000,)
self.create_cubes()
def store_chunks(self, processed_chunks):
"""
Store the processed chunks into the training and testing Zarr cubes.
Args:
processed_chunks (List[Tuple[Dict[str, np.ndarray], List[int], List[int]]]):
List of tuples containing the processed chunk, training indices, and testing indices.
Returns:
None
"""
for chunk, train_indices, test_indices in processed_chunks:
print(train_indices, test_indices)
if chunk is None:
continue
for var in chunk.keys():
train_data = chunk[var][train_indices]
test_data = chunk[var][test_indices]
print(f"Processing variable: {var}")
print(f"Train data shape: {train_data.shape}")
print(f"Test data shape: {test_data.shape}")
if var not in self.train_store:
self.train_store.create_dataset(var, data=train_data, chunks=self.chunk_size, append_dim=0)
self.test_store.create_dataset(var, data=test_data, chunks=self.chunk_size, append_dim=0)
else:
self.train_store[var].append(train_data)
self.test_store[var].append(test_data)
def create_cubes(self):
"""
Create the training and testing cubes by processing chunks of data from the dataset.
This method retrieves specific chunks of data, preprocesses them in parallel, and stores the results in Zarr cubes.
Returns:
None
"""
chunk_indices = list(range(self.total_chunks))
chunks = [get_chunk_by_index(self.ds, idx) for idx in chunk_indices]
with Pool(processes=self.nproc) as pool:
for i in range(0, self.total_chunks, math.ceil(self.nproc*0.2)):
batch_chunks = chunks[i:i + math.ceil(self.nproc*0.2)]
processed_chunks = pool.map(worker_preprocess_chunk, [
(chunk, self.point_indices, self.overlap, self.filter_var, self.callback_fn, self.data_split)
for chunk in batch_chunks
])
self.store_chunks(processed_chunks)
def get_datasets(self):
"""
Retrieve the training and testing Zarr datasets.
Returns:
Tuple[zarr.Group, zarr.Group]: The training and testing Zarr datasets.
"""
return self.train_store, self.test_store
import torch
import random
import xarray as xr
from ml4xcube.cube_utilities import split_chunk
from torch.utils.data import Dataset, DataLoader
from typing import Tuple, Optional, Dict, List, Callable
from ml4xcube.preprocessing import apply_filter, drop_nan_values
from ml4xcube.cube_utilities import calculate_total_chunks, get_chunk_by_index
class LargeScaleXrDataset(Dataset):
def __init__(self, xr_dataset: xr.Dataset, chunk_indices: list = None, num_chunks: int = None,
rand_chunk: bool = True, drop_nan: bool = True, filter_var: str = 'land_mask',
block_sizes: Optional[Dict[str, Optional[int]]] = None,
point_indices: Optional[List[Tuple[str, int]]] = None,
overlap: Optional[List[Tuple[str, int]]] = None):
"""
Initialize the dataset to manage large datasets efficiently.
Args:
xr_dataset (xr.Dataset): The xarray dataset.
chunk_indices (list): List of indices of chunks to load.
num_chunks (int): Number of chunks to process dynamically.
rand_chunk (bool): Whether to select chunks randomly.
drop_nan (bool): Whether to drop NaN values.
filter_var (str): Filtering variable name, default 'land_mask'.
"""
self.ds = xr_dataset
self.num_chunks = num_chunks
self.rand_chunk = rand_chunk
self.drop_nan = drop_nan
self.filter_var = filter_var
self.block_sizes = block_sizes
self.point_indices = point_indices
self.overlap = overlap
self.total_chunks = calculate_total_chunks(xr_dataset)
if not chunk_indices is None:
self.chunk_indices = chunk_indices
elif num_chunks is not None and self.total_chunks >= num_chunks:
self.chunk_indices = random.sample(range(self.total_chunks), num_chunks)
else:
self.chunk_indices = range(self.total_chunks)
def __len__(self):
return self.num_chunks
def __getitem__(self, idx):
chunk_index = self.chunk_indices[idx]
chunk = get_chunk_by_index(self.ds, chunk_index)
# Process the chunk
if self.point_indices is not None:
cf = {x: chunk[x] for x in chunk.keys()}
cf = split_chunk(cf, self.point_indices, overlap=self.overlap)
else:
cf = {x: chunk[x].ravel() for x in chunk.keys()}
if not self.filter_var is None:
cft = apply_filter(cf, self.filter_var)
else:
cft = cf
if self.drop_nan:
cft = drop_nan_values(cft, list(cft.keys()))
return cft # Return the processed chunk
def prepare_dataloader(dataset: Dataset, batch_size: int = 1, callback_fn: Callable = None, num_workers: int = 0, parallel: bool = False, shuffle = True) -> DataLoader:
"""
Prepares a DataLoader.
Parameters:
- dataset: The pytorch dataset from which to load the data.
- batch_size: How many samples per batch to load.
- callback_fn: A function used to collate data into batches.
- num_workers: How many subprocesses to use for data loading.
- parallel: Specifies if distributed training is performed.
Returns:
A DataLoader object.
"""
sampler = None
if parallel:
from torch.utils.data.distributed import DistributedSampler
sampler = DistributedSampler(dataset)
shuffle = False
return DataLoader(
dataset,
batch_size=batch_size,
num_workers=num_workers,
pin_memory=True if torch.cuda.is_available() else False, # Conditionally use pin_memory
shuffle=shuffle,
collate_fn=callback_fn,
sampler=sampler,
drop_last=True
)
import random
import xarray as xr
import tensorflow as tf
from typing import Tuple, Optional, Dict, List
from ml4xcube.cube_utilities import split_chunk
from ml4xcube.preprocessing import apply_filter, drop_nan_values
from ml4xcube.cube_utilities import get_chunk_by_index, calculate_total_chunks
class LargeScaleXrDataset:
def __init__(self, xr_dataset: xr.Dataset, chunk_indices: list = None, num_chunks: int = None,
rand_chunk: bool = True, drop_nan: bool = True, filter_var: str = 'land_mask', callback_fn=None,
block_sizes: Optional[Dict[str, Optional[int]]] = None,
point_indices: Optional[List[Tuple[str, int]]] = None,
overlap: Optional[List[Tuple[str, int]]] = None):
"""
Initialize the dataset for TensorFlow, managing large datasets efficiently.
Args:
xr_dataset (xr.Dataset): The xarray dataset.
chunk_indices (list): List of indices of chunks to load.
num_chunks (int): Number of chunks to process dynamically.
rand_chunk (bool): Whether to select chunks randomly.
drop_nan (bool): Whether to drop NaN values.
filter_var (str): Filtering variable name, default 'land_mask'.
"""
self.chunk_indices = None
self.ds = xr_dataset
self.num_chunks = num_chunks
self.rand_chunk = rand_chunk
self.callback_fn = callback_fn
self.drop_nan = drop_nan
self.filter_var = filter_var
self.block_sizes = block_sizes
self.point_indices = point_indices
self.overlap = overlap
self.total_chunks = calculate_total_chunks(xr_dataset)
if not chunk_indices is None:
self.chunk_indices = chunk_indices
elif num_chunks is not None and self.total_chunks >= num_chunks:
self.chunk_indices = random.sample(range(self.total_chunks), num_chunks)
else:
self.chunk_indices = list(range(self.total_chunks))
def __len__(self):
return len(self.chunk_indices)
def generate(self):
"""
Generator function to yield chunks.
Yields:
Processed chunk as a dictionary of NumPy arrays.
"""
for idx in self.chunk_indices:
chunk = get_chunk_by_index(self.ds, idx)
if self.point_indices is not None:
cf = {x: chunk[x] for x in chunk.keys()}
cf = split_chunk(cf, self.point_indices, overlap=self.overlap)
else:
cf = {x: chunk[x].ravel() for x in chunk.keys()}
if not self.filter_var is None:
cft = apply_filter(cf, self.filter_var)
else:
cft = cf
if self.drop_nan:
cft = drop_nan_values(cft, list(cft.keys()))
if self.callback_fn:
cft = self.callback_fn(cft)
yield cft
def get_dataset(self):
"""
Creates a TensorFlow dataset from the generator.
Returns:
tf.data.Dataset: The TensorFlow Dataset object.
"""
example_chunk = next(self.generate())
if self.callback_fn is None:
output_signature = {name: tf.TensorSpec(shape=(None,), dtype=tf.float32)
for name in example_chunk.keys()}
else:
output_signature = (
tf.TensorSpec(shape=(None,None), dtype=tf.float32),
tf.TensorSpec(shape=(None,None), dtype=tf.float32)
)
return tf.data.Dataset.from_generator(
self.generate,
output_signature=output_signature
)
def prepare_dataset(dataset: tf.data.Dataset, batch_size: int, shuffle: bool = True, num_parallel_calls: int = None, distributed: bool = False) -> tf.data.Dataset:
"""
Prepares a TensorFlow dataset for training or evaluation.
Parameters:
- dataset: The TensorFlow dataset from which to load the data.
- batch_size: How many samples per batch to load.
- shuffle: Whether to shuffle the dataset.
- num_parallel_calls: How many threads to use for parallel processing of data loading.
- distributed: Specifies if distributed training is performed.
Returns:
A TensorFlow Dataset object ready for iteration.
"""
if distributed:
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
if shuffle:
# Shuffle the data, ideally the buffer size should be the size of the dataset, but it can be reduced if memory is limited
dataset = dataset.shuffle(buffer_size=10000) # Adjust buffer_size according to your dataset size and memory availability
# Batching the dataset
dataset = dataset.batch(batch_size)
# Using `num_parallel_calls` with `prefetch` to improve pipeline performance
if num_parallel_calls is None:
num_parallel_calls = tf.data.experimental.AUTOTUNE
dataset = dataset.prefetch(buffer_size=num_parallel_calls)
return dataset
import random
import numpy as np
import xarray as xr
from typing import Tuple, Optional, Dict, List
from ml4xcube.cube_utilities import split_chunk
from ml4xcube.preprocessing import apply_filter, drop_nan_values
from ml4xcube.cube_utilities import get_chunk_by_index, calculate_total_chunks
class XrDataset():
def __init__(self, ds: xr.Dataset, chunk_indices: list = None, num_chunks: int = None, rand_chunk: bool = True,
drop_nan: bool = True, strict_nan: bool = False, filter_var: str = 'land_mask', patience: int = 500,
block_sizes: Optional[Dict[str, Optional[int]]] = None,
point_indices: Optional[List[Tuple[str, int]]] = None,
overlap: Optional[List[Tuple[str, int]]] = None):
"""
Initialize xarray dataset.
Args:
ds (xr.Dataset): The input dataset.
num_chunks (int): The number of unique chunks to process.
rand_chunk (bool): If true, chunks are chosen randomly.
drop_nan (bool): If true, NaN values are dropped.
strict_nan (bool): If true, discard the entire chunk if any NaN is found in any variable.
filter_var (str): The variable to use for filtering. Defaults to 'land_mask'.
If None, no filtering is applied.
patience (int): The number of consecutive iterations without a valid chunk before stopping.
Returns:
list: A list of processed data chunks.
"""
self.ds = ds
self.rand_chunk = rand_chunk
self.drop_nan = drop_nan
self.strict_nan = strict_nan
self.filter_var = filter_var
self.patience = patience
self.block_sizes = block_sizes
self.point_indices = point_indices
self.overlap = overlap
self.total_chunks = calculate_total_chunks(self.ds, self.block_sizes)
self.chunks = None
self.chunk_indices = chunk_indices
if not self.chunk_indices is None:
self.num_chunks = len(self.chunk_indices)
elif num_chunks is not None and self.total_chunks >= num_chunks:
self.num_chunks = num_chunks
else:
self.num_chunks = self.total_chunks
self.chunks = self.get_chunks()
self.dataset = self.concatenate_chunks()
def get_dataset(self):
return self.dataset
def concatenate_chunks(self):
concatenated_chunks = {}
# Get the keys of the first dictionary in self.chunks
keys = list(self.chunks[0].keys())
# Loop over the keys and concatenate the arrays along the time dimension
for key in keys:
concatenated_chunks[key] = np.concatenate([chunk[key] for chunk in self.chunks], axis=0)
print(concatenated_chunks)
return concatenated_chunks
def preprocess_chunk(self, chunk):
# Flatten the data and select only land values, then drop NaN values
if self.point_indices is not None:
cf = {x: chunk[x] for x in chunk.keys()}
cf = split_chunk(cf, self.point_indices, overlap=self.overlap)
else:
cf = {x: chunk[x].ravel() for x in chunk.keys()}
# Apply filtering based on the specified variable, if provided
if not self.filter_var is None:
cft = apply_filter(cf, self.filter_var)
else:
cft = cf
valid_chunk = True
#
if self.drop_nan:
vars = list(cft.keys())
cft = drop_nan_values(cft, vars)
valid_chunk = all(np.nan_to_num(cft[var]).sum() > 0 for var in cf)
if self.strict_nan:
valid_chunk = any(np.nan_to_num(cft[var]).sum() > 0 for var in cf)
return cft, valid_chunk
def get_chunks(self):
"""
Retrieve specific chunks of data from a dataset.
Returns:
list: A list of processed data chunks.
"""
chunks_idx = list()
chunks_list = []
chunk_index = 0
no_valid_chunk_count = 0
iterations = 0
# Process chunks until 3 unique chunks have been processed
while len(chunks_idx) < self.num_chunks:
iterations += 1
if no_valid_chunk_count >= self.patience:
print("Patience threshold reached, returning collected chunks.")
break
if self.rand_chunk and self.chunk_indices is None:
chunk_index = random.randint(0, self.total_chunks - 1) # Select a random chunk index
if chunk_index in chunks_idx:
continue # Skip if this chunk has already been processed
# Retrieve the chunk by its index
if self.chunk_indices is None:
chunk = get_chunk_by_index(self.ds, chunk_index)
else:
chunk = get_chunk_by_index(self.ds, self.chunk_indices[chunk_index])
cft, valid_chunk = self.preprocess_chunk(chunk)
if valid_chunk:
chunks_idx.append(chunk_index)
chunks_list.append(cft)
no_valid_chunk_count = 0 # reset the patience counter after finding a valid chunk
else:
no_valid_chunk_count += 1 # increment the patience counter if no valid chunk is found
chunk_index += 1
return chunks_list
import os
import time
import shutil
import datetime
import numpy as np
import xarray as xr
"""
In this file we prepare the data for the gapfilling algorithm.
We slice a specific dimensions (e.g. area and time range) from the data cube with the values for the specified variable.
If requested artificial gaps can be inserted in the array.
"""
class GapDataset:
"""
Represents a dataset for handling data gaps.
Attributes:
ds (xarray.DataArray): The original unsliced dataset.
ds_name (str): The name of the dataset.
dimensions (dict): Dict containing dimension ranges (e.g. lat, lon, times); sample values if no dim specified
artificial_gaps (list): List of artificial gap sizes; None if no artificial gaps should be created
actual_matrix (str or datetime.date): Specifies the actual data matrix or 'Random' for random selection.
directory (str): The directory where data will be stored.
extra_data: Additional data used as predictors (e.g. Land Cover Classes).
sliced_ds: The sliced dataset.
"""
def __init__(self, ds, ds_name='Test123',
dimensions=None,
artificial_gaps=None, actual_matrix='Random'):
self.ds = ds
self.ds_name = ds_name
self.dimensions = dimensions
self.artificial_gaps = artificial_gaps
self.actual_matrix = actual_matrix
self.directory = os.path.dirname(os.getcwd()) + '/application_results/' + ds_name + "/" if \
os.getcwd().split('/')[-1] != 'gapfilling' else 'application_results/' + ds_name + "/"
self.extra_data = None
self.sliced_ds = None
def get_data(self):
"""
Retrieve and process (area-)specific data.
This method performs the following tasks:
- Creates a directory or cleans it if it already exists.
- Slices the dimensions from a global dataset.
- Retrieves additional data (e.g., land cover classes) for use as predictors.
- Process the data and optionally creates artificial data gaps for gap filling.
"""
start_time = time.time()
# Create a directory or clean it if it already exists
shutil.rmtree(self.directory, ignore_errors=True)
os.makedirs(self.directory, exist_ok=True)
# Slice the dimensions from a global dataset
self.slice_dataset()
# Retrieve land cover data or other extra matrix to use them as predictors
self.get_extra_matrix()
# If requested create artificial data gaps which values will be estimated later on
self.process_actual_matrix()
print("runtime:", round(time.time() - start_time, 2))
def slice_dataset(self):
"""
Slice the dataset to extract the specific area, latitude, longitude, and time range.
This method slices the dataset based on the specified dimensions (lat, lon, and times) and if no dimensions
are specified, sample values will be used.
Returns:
None
"""
# Create a data store for accessing global data and open the dataset
# Slice the dataset to extract the specific area, latitude, longitude, and time range
if self.dimensions is None:
self.dimensions = {'lat': (54, 48),
'lon': (6, 15),
'times': (datetime.date(2008, 11, 1), datetime.date(2008, 12, 31))}
sliced_ds = self.ds.sel(lat=slice(self.dimensions['lat'][0], self.dimensions['lat'][1]),
lon=slice(self.dimensions['lon'][0], self.dimensions['lon'][1]),
time=slice(self.dimensions['times'][0], self.dimensions['times'][1]))
print(self.ds_name, sliced_ds.sizes.mapping)
# To avoid exceptions due to type errors, save the dataset and load it again
sliced_ds.to_netcdf(self.directory + "cube.nc")
self.sliced_ds = xr.open_dataset(self.directory + "cube.nc")[sliced_ds.name]
def get_extra_matrix(self):
"""
Retrieve Land Cover Classes (LCC) for use as predictors.
This method opens a NetCDF file containing global LCC data, selects and slices the LCC data
based on the specified latitude and longitude range, and saves it as an extra data matrix
for use in gap filling.
Returns:
None
"""
# Open the LCCS dataset from a NetCDF file with the global LCC data
current_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(current_dir, 'helper', 'global_lcc.nc')
# Check if the file exists at the specified path
if not os.path.exists(data_file):
raise FileNotFoundError(f"File not found: {data_file}")
# Open the LCCS dataset from the NetCDF file
lcc_dataset = xr.open_dataset(data_file)['lccs_class']
# Select and slice the LCCS data based on the specified latitude and longitude range
self.extra_data = lcc_dataset.sel(lat=slice(self.dimensions['lat'][0], self.dimensions['lat'][1]),
lon=slice(self.dimensions['lon'][0], self.dimensions['lon'][1]))
self.extra_data.to_netcdf(self.directory + "extra_matrix_lcc.nc")
def process_actual_matrix(self):
"""
Process the actual data matrix.
This method processes the actual data matrix, including selecting a random or specified date,
calculating the real gap size percentage, and creating artificial data gaps if requested.
Returns:
None
"""
# Select the relevant/random date with the gaps to be filled and slice the corresponding array
dates = self.sliced_ds.coords["time"].values
actual_date = np.random.choice(dates) if self.actual_matrix == 'Random' else np.datetime64(self.actual_matrix)
actual_matrix = self.sliced_ds.sel(time=actual_date)
# Calculate the real gap size percentage and print it with the relevant date to give insights
real_gap_size = round(np.isnan(actual_matrix).sum().item() / actual_matrix.size * 100)
actual_date = np.datetime_as_string(actual_date, unit='D')
print("date:", actual_date)
print("real gap size: ", real_gap_size, "%")
# Save the original array
actual_matrix.to_netcdf(self.directory + "actual.nc")
# If requested create artificial data gaps which values will be estimated later on
if self.artificial_gaps:
self.create_gaps(actual_matrix, actual_date)
def create_gaps(self, actual_matrix, actual_date):
"""
Create artificial data gaps.
This method creates artificial data gaps in the desired array based on the specified gap sizes.
Args:
actual_matrix (xArray): Original array in which the gaps are created.
actual_date (str): The date of the array.
Returns:
None
"""
# Find the indices of non-NaN values in the original data
non_nan_indices = np.argwhere(~np.isnan(actual_matrix.values))
# Define a new directory for saving data with artificial gaps
new_directory = self.directory + "GapImitation/"
shutil.rmtree(new_directory, ignore_errors=True)
os.makedirs(new_directory, exist_ok=True)
# Iterate through different gap sizes to create them in the original array
gap_creation_count = 0
for gap_size in self.artificial_gaps:
# Calculate the absolute gap size based on the original data
gap_size_absolute = round(gap_size * actual_matrix.size)
# Check if there are enough non-NaN values to create the desired gap size and skip them if not
if len(non_nan_indices) < gap_size_absolute:
continue
# Create a copy of the original data to insert the artificial gaps
array_with_gaps = actual_matrix.copy()
# Randomly select indices for creating artificial gaps
selected_indices = np.random.choice(non_nan_indices.shape[0], gap_size_absolute, replace=False)
selected_indices = non_nan_indices[selected_indices]
# Loop through each of these selected indices and insert -100 as a value
for index in selected_indices:
array_with_gaps[index[0], index[1]] = -100
# Save the data with artificial gaps in the GapImitation directory
array_with_gaps.to_netcdf(new_directory + actual_date + "_" + str(gap_size) + ".nc")
gap_creation_count += 1
# Format the different gap sized in order to print them to give insights
formatted_gaps = [f"{float(element * 100)}%" for element in self.artificial_gaps]
print(gap_creation_count, "arrays with the following gaps were created:", ', '.join(formatted_gaps[:gap_creation_count]))
print(f"These arrays are saved in /{self.directory}")
if gap_creation_count < len(formatted_gaps):
print("However, the original array doesn't contain enough non-Nan values to imitate the following gap sizes:", ', '.join(formatted_gaps[gap_creation_count:]))
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment