Source code for icenet.data.producers

from abc import abstractmethod, ABCMeta

import collections
import datetime as dt
import glob
import logging
import os
import re

import pandas as pd

from icenet.utils import Hemisphere, HemisphereMixin


[docs] class DataCollection(HemisphereMixin, metaclass=ABCMeta): """An Abstract base class with common interface for data collection classes. Attributes: _identifier: The identifier of the data collection. _path: The base path of the data collection. _hemisphere: The hemisphere(s) of the data collection. """ @abstractmethod def __init__(self, *args, identifier: object = None, north: bool = True, south: bool = False, path: str = os.path.join(".", "data"), **kwargs) -> None: """Initialises DataCollection class. Args: identifier: An identifier/label for the data collection. Defaults to None. north (optional): A flag indicating if the data collection is in the northern hemisphere. Defaults to True. south (optional): A flag indicating if the data collection is in the southern hemisphere. Defaults to False. path (optional): The base path of the data collection. Defaults to `./data`. Raises: AssertionError: Raised if identifier is not specified, or no hemispheres are selected. """ self._identifier: object = identifier self._path: str = os.path.join(path, identifier) self._hemisphere: Hemisphere = (Hemisphere.NORTH if north else Hemisphere.NONE) | \ (Hemisphere.SOUTH if south else Hemisphere.NONE) assert self._identifier, "No identifier supplied" assert self._hemisphere != Hemisphere.NONE, "No hemispheres selected" @property def base_path(self) -> str: """The base path of the data collection.""" return self._path @base_path.setter def base_path(self, path: str) -> None: self._path = path @property def identifier(self) -> object: """The identifier (label) for this data collection.""" return self._identifier
[docs] class DataProducer(DataCollection): """Manages the creation and organisation of data files. Attributes: dry: Flag specifying whether the data producer should be in dry run mode or not. overwrite: Flag specifying whether existing files should be overwritten or not. """ def __init__(self, *args, dry: bool = False, overwrite: bool = False, **kwargs) -> None: """Initialises the DataProducer instance. Creates the base path of the data collection if it does not exist. Args: dry (optional): Flag specifying whether the data producer should be in dry run mode or not. Defaults to False overwrite (optional): Flag specifying whether existing files should be overwritten or not. Defaults to False """ super(DataProducer, self).__init__(*args, **kwargs) self.dry = dry self.overwrite = overwrite if os.path.exists(self._path): logging.debug("{} already exists".format(self._path)) else: if not os.path.islink(self._path): logging.info("Creating path: {}".format(self._path)) os.makedirs(self._path, exist_ok=True) else: logging.info("Skipping creation for symlink: {}".format( self._path)) # NOTE: specific limitation for the DataProducers, they'll only do one # hemisphere per instance assert self._hemisphere != Hemisphere.BOTH, "Both hemispheres selected"
[docs] def get_data_var_folder(self, var: str, append: object = None, hemisphere: object = None, missing_error: bool = False) -> str: """Returns the path for a specific data variable. Appends additional folders to the path if specified in the `append` parameter. Args: var: The data variable. append (optional): Additional folders to append to the path. Defaults to None. hemisphere (optional): The hemisphere. Defaults to None. missing_error (optional): Flag to specify if missing directories should be treated as an error. Defaults to False. Returns: str: The path for the specific data variable. """ if not append: append = [] if not hemisphere: # We can make the assumption because this implementation is limited # to a single hemisphere hemisphere = self.hemisphere_str[0] data_var_path = os.path.join(self.base_path, *[hemisphere, var, *append]) if not os.path.exists(data_var_path): if not missing_error: os.makedirs(data_var_path, exist_ok=True) else: raise OSError("Directory {} is missing and this is " "flagged as an error!".format(data_var_path)) return data_var_path
[docs] class Downloader(DataProducer): """Abstract base class for a downloader.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] @abstractmethod def download(self): """Abstract download method for this downloader: Must be implemented by subclasses.""" raise NotImplementedError("{}.download is abstract".format( __class__.__name__))
[docs] class Generator(DataProducer): """Abstract base class for a generator.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] @abstractmethod def generate(self): """Abstract generate method for this generator: Must be implemented by subclasses.""" raise NotImplementedError("{}.generate is abstract".format( __class__.__name__))
[docs] class Processor(DataProducer): """An abstract base class for data processing classes. Provides methods for initialising source data, processing the data, and saving the processed data to standard netCDF files. Attributes: _file_filters: List of file filters to exclude certain files during data processing. _lead_time: Forecast/lead time used in the data processing. source_data: Path to the source data directory. _var_files: Dictionary storing variable files organised by variable name. _processed_files: Dictionary storing the processed files organised by variable name. _dates: Named tuple that stores the dates used for training, validation, and testing. """ def __init__(self, identifier: str, source_data: object, *args, file_filters: object = (), lead_time: int = 93, test_dates: object = (), train_dates: object = (), val_dates: object = (), **kwargs) -> None: """Initialise Processor class. Args: identifier: The identifier for the processor. source_data: The source data directory. *args: Additional positional arguments. file_filters (optional): List of file filters to exclude certain files during data processing. Defaults to (). lead_time (optional): The forecast/lead time used in the data processing. Defaults to 93. test_dates (optional): Dates used for testing. Defaults to (). train_dates (optional): Dates used for training. Defaults to (). val_dates (optional): Dates used for validation. Defaults to (). **kargs: Additional keyword arguments. """ super().__init__(*args, identifier=identifier, **kwargs) self._file_filters = list(file_filters) self._lead_time = lead_time self._source_data = os.path.join(source_data, identifier, self.hemisphere_str[0]) self._var_files = dict() self._processed_files = dict() # TODO: better as a mixin? or maybe a Python data class instead? Dates = collections.namedtuple("Dates", ["train", "val", "test"]) self._dates = Dates(train=list(train_dates), val=list(val_dates), test=list(test_dates))
[docs] def init_source_data(self, lag_days: object = None) -> None: """Initialises source data by globbing the files and organising based on date. Adds previous n days of `lag_days` if not already in `self._dates` if lag_days>0. Adds next n days of `self._lead_time` if not already in `self._dates` if `self._lead_time`>0. Args: lag_days: The number of lag days to include in the data processing. Returns: None. The method updates the `_var_files` attribute of the `Processor` object. Raises: OSError: If the source data directory does not exist. """ if not os.path.exists(self.source_data): raise OSError("Source data directory {} does not exist".format( self.source_data)) var_files = {} for date_category in ["train", "val", "test"]: dates = sorted(getattr(self._dates, date_category)) if dates: logging.info("Processing {} dates for {} category".format( len(dates), date_category)) else: logging.info( "No {} dates for this processor".format(date_category)) continue # TODO: ProcessPool for this (avoid the GIL for globbing) # FIXME: needs to deal with a lack of continuity in the date ranges if lag_days: logging.info("Including lag of {} days".format(lag_days)) additional_lag_dates = [] for date in dates: for day in range(lag_days): lag_date = date - dt.timedelta(days=day + 1) if lag_date not in dates: additional_lag_dates.append(lag_date) dates += list(set(additional_lag_dates)) # FIXME: this is conveniently supplied for siconca_abs on # training with OSISAF data, but are we exploiting the # convenient usage of this data for linear trends? if self._lead_time: logging.info("Including lead of {} days".format( self._lead_time)) additional_lead_dates = [] for date in dates: for day in range(self._lead_time): lead_day = date + dt.timedelta(days=day + 1) if lead_day not in dates: additional_lead_dates.append(lead_day) dates += list(set(additional_lead_dates)) globstr = "{}/**/[12]*.nc".format(self.source_data) logging.debug("Globbing {} from {}".format(date_category, globstr)) dfs = glob.glob(globstr, recursive=True) logging.debug("Globbed {} files".format(len(dfs))) # FIXME: using hyphens broadly no? data_dates = [ df.split(os.sep)[-1][:-3].replace("_", "-") for df in dfs ] dt_series = pd.Series(dfs, index=data_dates) logging.debug("Create structure of {} files".format( len(dt_series))) # Ensure we're ordered, it has repercussions for xarray for date in sorted(dates): try: match_dfs = dt_series[date.strftime("%Y")] if type(match_dfs) == str: match_dfs = [match_dfs] except KeyError: logging.info("No data found for {}, outside data boundary " "perhaps?".format(date.strftime("%Y-%m-%d"))) match_dfs = [] for df in match_dfs: if any([ flt in os.path.split(df)[1] for flt in self._file_filters ]): continue path_comps = str(os.path.split(df)[0]).split(os.sep) var = path_comps[-1] # The year is in the path, fall back one further if re.match(r'^\d{4}$', var): var = path_comps[-2] if var not in var_files.keys(): var_files[var] = list() if df not in var_files[var]: var_files[var].append(df) # TODO: allow option to ditch dates from train/val/test for missing # var files self._var_files = { var: var_files[var] for var in sorted(var_files.keys()) } for var in self._var_files.keys(): logging.info("Got {} files for {}".format( len(self._var_files[var]), var))
[docs] @abstractmethod def process(self): """Abstract method defining data processing: Must be implemented by subclasses.""" raise NotImplementedError("{}.process is abstract".format( __class__.__name__))
[docs] def save_processed_file(self, var_name: str, name: str, data: object, **kwargs) -> str: """Save processed data to netCDF file. Args: var_name: The name of the variable. name: The name of the file. data: The data to be saved. **kwargs: Additional keyword arguments to be passed to the `get_data_var_folder` method. Returns: The path of the saved netCDF file. """ file_path = os.path.join(self.get_data_var_folder(var_name, **kwargs), name) data.to_netcdf(file_path) if var_name not in self._processed_files.keys(): self._processed_files[var_name] = list() if file_path not in self._processed_files[var_name]: logging.debug("Adding {} file: {}".format(var_name, file_path)) self._processed_files[var_name].append(file_path) else: logging.warning("{} already exists in {} processed list".format( file_path, var_name)) return file_path
@property def dates(self) -> object: """The dates used for training, validation, and testing in this class as a named collections.tuple.""" return self._dates @property def lead_time(self) -> int: """The lead time used in the data processing.""" return self._lead_time @property def processed_files(self) -> dict: """A dict with the processed files organised by variable name.""" return self._processed_files @property def source_data(self) -> str: """The source data directory as a string.""" return self._source_data