Source code for icenet.data.interfaces.downloader

import concurrent
import logging
import os
import re
import shutil
import tempfile

from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor
from itertools import product

from icenet.data.sic.mask import Masks
from icenet.data.sic.utils import SIC_HEMI_STR
from icenet.data.producers import Downloader
from icenet.data.utils import assign_lat_lon_coord_system, \
    gridcell_angles_from_dim_coords, \
    invert_gridcell_angles, \
    rotate_grid_vectors
from icenet.data.interfaces.utils import batch_requested_dates
from icenet.utils import run_command

import iris
import iris.analysis
import iris.cube
import iris.exceptions
import numpy as np
import pandas as pd
import xarray as xr
"""

"""


[docs] def filter_dates_on_data(latlon_path: str, regridded_name: str, req_dates: object, check_latlon: bool = True, check_regridded: bool = True, drop_vars: list = None): """Reduces request dates and target files based on existing data To avoid what is potentially significant resource expense downloading extant data, downloaders should call this method to reduce the request dates only to that data not already present. This is a fairly naive implementation, in that if the data is present in either the latlon intermediate file OR the target regridded file, we'll not bother downloading again. This can be overridden via the method arguments. :param latlon_path: :param regridded_name: :param req_dates: :param check_latlon: :param check_regridded: :param drop_vars: :return: req_dates(list) """ latlon_dates = list() regridded_dates = list() drop_vars = list() if drop_vars is None else drop_vars # Latlon files should in theory be aggregated and singular arrays # meaning we can naively open and interrogate the dates if check_latlon and os.path.exists(latlon_path): try: latlon_dates = xr.open_dataset(latlon_path, drop_variables=drop_vars).time.values logging.debug("{} latlon dates already available in {}".format( len(latlon_dates), latlon_path)) except ValueError: logging.warning("Latlon {} dates not readable, ignoring file") if check_regridded and os.path.exists(regridded_name): regridded_dates = xr.open_dataset(regridded_name, drop_variables=drop_vars).time.values logging.debug("{} regridded dates already available in {}".format( len(regridded_dates), regridded_name)) exclude_dates = list(set(latlon_dates).union(set(regridded_dates))) logging.debug("Excluding {} dates already existing from {} dates " "requested.".format(len(exclude_dates), len(req_dates))) return sorted( list( pd.to_datetime(req_dates).difference( pd.to_datetime(exclude_dates))))
[docs] def merge_files(new_datafile: str, other_datafile: str, drop_variables: object = None): """ :param new_datafile: :param other_datafile: :param drop_variables: """ drop_variables = list() if drop_variables is None else drop_variables if other_datafile is not None: (datafile_path, new_filename) = os.path.split(new_datafile) moved_new_datafile = \ os.path.join(datafile_path, "new.{}".format(new_filename)) os.rename(new_datafile, moved_new_datafile) d1 = xr.open_dataarray(moved_new_datafile, drop_variables=drop_variables) logging.info( "Concatenating with previous data {}".format(other_datafile)) d2 = xr.open_dataarray(other_datafile, drop_variables=drop_variables) new_ds = xr.concat([d1, d2], dim="time").\ sortby("time").\ drop_duplicates("time", keep="first") logging.info("Saving merged data to {}... ".format(new_datafile)) new_ds.to_netcdf(new_datafile) os.unlink(other_datafile) os.unlink(moved_new_datafile)
[docs] class ClimateDownloader(Downloader): """Climate downloader base class :param dates: :param delete_tempfiles: :param download: :param group_dates_by: :param max_threads: :param postprocess: :param pregrid_prefix: :param levels: :param var_name_idx: :param var_names: """ def __init__(self, *args, dates: object = (), delete_tempfiles: bool = True, download: bool = True, drop_vars: list = None, group_dates_by: str = "year", levels: object = (), max_threads: int = 1, postprocess: bool = True, pregrid_prefix: str = "latlon_", var_name_idx: int = -1, var_names: object = (), **kwargs): super().__init__(*args, **kwargs) self._dates = list(dates) self._delete = delete_tempfiles self._download = download self._drop_vars = list() if drop_vars is None else drop_vars self._files_downloaded = [] self._group_dates_by = group_dates_by self._levels = list(levels) self._masks = Masks(north=self.north, south=self.south) self._max_threads = max_threads self._postprocess = postprocess self._pregrid_prefix = pregrid_prefix self._rotatable_files = [] self._sic_ease_cubes = dict() self._var_name_idx = var_name_idx self._var_names = list(var_names) assert len(self._var_names), "No variables requested" assert len(self._levels) == len(self._var_names), \ "# of levels must match # vars" if not self._delete: logging.warning("!!! Deletions of temp files are switched off: be " "careful with this, you need to manage your " "files manually") self._download_method = None self._validate_config() def _validate_config(self): """ """ if self.hemisphere_str in os.path.split(self.base_path): raise RuntimeError("Don't include hemisphere string {} in " "base path".format(self.hemisphere_str))
[docs] def download(self): """Handles concurrent (threaded) downloading for variables This takes dates, variables and levels as configured, batches them into requests and submits those via a ThreadPoolExecutor for concurrent downloading. Returns nothing, relies on _single_download to implement appropriate updates to this object to record state changes arising from downloading. """ logging.info("Building request(s), downloading and daily averaging " "from {} API".format(self.identifier.upper())) requests = list() for idx, var_name in enumerate(self.var_names): levels = [None] if not self.levels[idx] else self.levels[idx] dates_per_request = \ batch_requested_dates(self._dates, attribute=self._group_dates_by) for var_prefix, level, req_date in \ product([var_name], levels, dates_per_request): requests.append((var_prefix, level, req_date)) with ThreadPoolExecutor(max_workers=min(len(requests), self._max_threads)) \ as executor: futures = [] for var_prefix, level, req_date in requests: future = executor.submit(self._single_download, var_prefix, level, req_date) futures.append(future) for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logging.exception("Thread failure: {}".format(e)) logging.info("{} daily files downloaded".format( len(self._files_downloaded)))
def _single_download(self, var_prefix: str, level: object, req_dates: object): """Implements a single download based on configured download_method This allows delegation of downloading logic in a consistent manner to the configured download_method, ensuring a guarantee of adherence to naming and processing flow within ClimateDownloader implementations. :param var_prefix: the icenet variable name :param level: the height to download :param req_dates: the request date """ logging.info( "Processing single download for {} @ {} with {} dates".format( var_prefix, level, len(req_dates))) var = var_prefix if not level else \ "{}{}".format(var_prefix, level) var_folder = self.get_data_var_folder(var) latlon_path, regridded_name = \ self.get_req_filenames(var_folder, req_dates[0]) req_dates = filter_dates_on_data(latlon_path, regridded_name, req_dates, drop_vars=self._drop_vars) if len(req_dates): if self._download: with tempfile.TemporaryDirectory() as tmpdir: tmp_latlon_path = os.path.join( tmpdir, os.path.basename("{}.download".format(latlon_path))) self.download_method(var, level, req_dates, tmp_latlon_path) if os.path.exists(latlon_path): (ll_path, ll_file) = os.path.split(latlon_path) rename_latlon_path = os.path.join( ll_path, "{}_old{}".format(*os.path.splitext(ll_file))) os.rename(latlon_path, rename_latlon_path) old_da = xr.open_dataarray( rename_latlon_path, drop_variables=self._drop_vars) tmp_da = xr.open_dataarray( tmp_latlon_path, drop_variables=self._drop_vars) logging.debug("Input (old): \n{}".format(old_da)) logging.debug("Input (dl): \n{}".format(tmp_da)) da = xr.concat([old_da, tmp_da], dim="time") logging.debug("Output: \n{}".format(da)) da.to_netcdf(latlon_path) old_da.close() tmp_da.close() os.unlink(rename_latlon_path) else: shutil.move(tmp_latlon_path, latlon_path) logging.info("Downloaded to {}".format(latlon_path)) else: logging.info( "Skipping actual download to {}".format(latlon_path)) else: logging.info("No requested dates remain, likely already present") if self._postprocess and os.path.exists(latlon_path): self.postprocess(var, latlon_path) if os.path.exists(latlon_path): self._files_downloaded.append(latlon_path)
[docs] def postprocess(self, var, download_path): logging.debug("No postprocessing in place for {}: {}".format( var, download_path))
[docs] def save_temporal_files(self, var, da, date_format=None, freq=None): """ :param var: :param da: :param date_format: :param freq: """ var_folder = self.get_data_var_folder(var) group_by = "time.{}".format(self._group_dates_by) if not freq else freq for dt, dt_da in da.groupby(group_by): req_date = pd.to_datetime(dt_da.time.values[0]) latlon_path, regridded_name = \ self.get_req_filenames(var_folder, req_date, date_format=date_format) logging.info("Retrieving and saving {}".format(latlon_path)) dt_da.compute() dt_da.to_netcdf(latlon_path) if not os.path.exists(regridded_name): self._files_downloaded.append(latlon_path)
@property def sic_ease_cube(self): """ :return sic_cube: """ if self._hemisphere not in self._sic_ease_cubes: sic_day_fname = 'ice_conc_{}_ease2-250_cdr-v2p0_197901021200.nc'. \ format(SIC_HEMI_STR[self.hemisphere_str[0]]) sic_day_path = os.path.join(self.get_data_var_folder("siconca"), sic_day_fname) if not os.path.exists(sic_day_path): logging.info("Downloading single daily SIC netCDF file for " "regridding ERA5 data to EASE grid...") retrieve_sic_day_cmd = 'wget -m -nH --cut-dirs=6 -P {} ' \ 'ftp://osisaf.met.no/reprocessed/ice/' \ 'conc/v2p0/1979/01/{}'.\ format(self.get_data_var_folder("siconca"), sic_day_fname) run_command(retrieve_sic_day_cmd) # Load a single SIC map to obtain the EASE grid for # regridding ERA data self._sic_ease_cubes[self._hemisphere] = \ iris.load_cube(sic_day_path, 'sea_ice_area_fraction') # Convert EASE coord units to metres for regridding self._sic_ease_cubes[self._hemisphere].coord( 'projection_x_coordinate').convert_units('meters') self._sic_ease_cubes[self._hemisphere].coord( 'projection_y_coordinate').convert_units('meters') return self._sic_ease_cubes[self._hemisphere]
[docs] def regrid(self, files: object = None, rotate_wind: bool = True): """ :param files: """ filelist = self._files_downloaded if not files else files batches = [filelist[b:b + 1000] for b in range(0, len(filelist), 1000)] max_workers = min(len(batches), self._max_threads) regrid_results = list() if max_workers > 0: with ThreadPoolExecutor(max_workers=max_workers) \ as executor: futures = [] for files in batches: future = executor.submit(self._batch_regrid, files) futures.append(future) for future in concurrent.futures.as_completed(futures): try: fut_results = future.result() for res in fut_results: logging.debug( "Future result -> regrid_results: {}".format( res)) regrid_results.append(res) except Exception as e: logging.exception("Thread failure: {}".format(e)) else: logging.info("No regrid batches to processing, moving on...") if rotate_wind: logging.info("Rotating wind data prior to merging") self.rotate_wind_data() for new_datafile, moved_datafile in regrid_results: merge_files(new_datafile, moved_datafile, self._drop_vars)
def _batch_regrid(self, files: object): """ :param files: """ results = list() for datafile in files: (datafile_path, datafile_name) = os.path.split(datafile) new_filename = re.sub(r'^{}'.format(self.pregrid_prefix), '', datafile_name) new_datafile = os.path.join(datafile_path, new_filename) moved_datafile = None if os.path.exists(new_datafile): moved_filename = "moved.{}".format(new_filename) moved_datafile = os.path.join(datafile_path, moved_filename) os.rename(new_datafile, moved_datafile) logging.info("{} already existed, moved to {}".format( new_filename, moved_filename)) logging.debug("Regridding {}".format(datafile)) try: cube = iris.load_cube(datafile) cube = self.convert_cube(cube) cube_ease = cube.regrid(self.sic_ease_cube, iris.analysis.Linear()) except iris.exceptions.CoordinateNotFoundError: logging.warning( "{} has no coordinates...".format(datafile_name)) if self.delete: logging.debug( "Deleting failed file {}...".format(datafile_name)) os.unlink(datafile) continue self.additional_regrid_processing(datafile, cube_ease) logging.info("Saving regridded data to {}... ".format(new_datafile)) iris.save(cube_ease, new_datafile, fill_value=np.nan) results.append((new_datafile, moved_datafile)) if self.delete: logging.info("Removing {}".format(datafile)) os.remove(datafile) return results
[docs] def convert_cube(self, cube: object): """Converts Iris cube to be fit for regrid :param cube: the cube requiring alteration :return cube: the altered cube """ cube = assign_lat_lon_coord_system(cube) return cube
[docs] @abstractmethod def additional_regrid_processing(self, datafile: str, cube_ease: object): """ :param datafile: :param cube_ease: """ pass
[docs] def rotate_wind_data(self, apply_to: object = ("uas", "vas"), manual_files: object = None): """ :param apply_to: :param manual_files: """ assert len(apply_to) == 2, "Too many wind variables supplied: {}, " \ "there should only be two.".\ format(", ".join(apply_to)) angles = gridcell_angles_from_dim_coords(self.sic_ease_cube) invert_gridcell_angles(angles) logging.info("Rotating wind data in {}".format(" ".join( [self.get_data_var_folder(v) for v in apply_to]))) wind_files = {} for var in apply_to: source = self.get_data_var_folder(var) file_source = self._files_downloaded \ if not manual_files else manual_files latlon_files = [df for df in file_source if source in df] wind_files[var] = sorted([ re.sub(r'{}'.format(self.pregrid_prefix), '', df) for df in latlon_files if os.path.dirname(df).split(os.sep)[self._var_name_idx] == var ], key=lambda x: int( re.search(r'^(?:\w+_)?(\d+).nc', os.path.basename(x)).group(1) )) logging.info("{} files for {}".format(len(wind_files[var]), var)) # NOTE: we're relying on apply_to having equal datasets assert len(wind_files[apply_to[0]]) == len(wind_files[apply_to[1]]), \ "The wind file datasets are unequal in length" # validation for idx, wind_file_0 in enumerate(wind_files[apply_to[0]]): wind_file_1 = wind_files[apply_to[1]][idx] wd0 = re.sub(r'^{}_'.format(apply_to[0]), '', os.path.basename(wind_file_0)) if not wind_file_1.endswith(wd0): logging.error("Wind file array is not valid: {}".format( zip(wind_files))) raise RuntimeError("{} is not at the end of {}, something is " "wrong".format(wd0, wind_file_1)) for idx, wind_file_0 in enumerate(wind_files[apply_to[0]]): wind_file_1 = wind_files[apply_to[1]][idx] logging.info("Rotating {} and {}".format(wind_file_0, wind_file_1)) wind_cubes = dict() wind_cubes_r = dict() wind_cubes[apply_to[0]] = iris.load_cube(wind_file_0) wind_cubes[apply_to[1]] = iris.load_cube(wind_file_1) try: wind_cubes_r[apply_to[0]], wind_cubes_r[apply_to[1]] = \ rotate_grid_vectors( wind_cubes[apply_to[0]], wind_cubes[apply_to[1]], angles, ) except iris.exceptions.CoordinateNotFoundError: logging.exception("Failure to rotate due to coordinate issues. " "moving onto next file") continue # Original implementation is in danger of lost updates # due to potential lazy loading for i, name in enumerate([wind_file_0, wind_file_1]): # NOTE: implementation with tempfile caused problems on NFS # mounted filesystem, so avoiding in place of letting iris do it temp_name = os.path.join( os.path.split(name)[0], "temp.{}".format(os.path.basename(name))) logging.debug("Writing {}".format(temp_name)) iris.save(wind_cubes_r[apply_to[i]], temp_name) os.replace(temp_name, name) logging.debug("Overwritten {}".format(name))
[docs] def get_req_filenames(self, var_folder: str, req_date: object, date_format: str = None): """ :param var_folder: :param req_date: :param date_format: :return: """ filename_date = getattr(req_date, self._group_dates_by) \ if not date_format else req_date.strftime(date_format) latlon_path = os.path.join( var_folder, "{}{}.nc".format(self.pregrid_prefix, filename_date)) regridded_name = os.path.join(var_folder, "{}.nc".format(filename_date)) logging.debug("Got {} filenames: {} and {}".format( self._group_dates_by, latlon_path, regridded_name)) return latlon_path, regridded_name
@property def dates(self): return self._dates @property def delete(self): return self._delete @property def download_method(self) -> callable: if not self._download_method: raise RuntimeError("Downloader has no method set, " "implementation error") return self._download_method @download_method.setter def download_method(self, method: callable): self._download_method = method @property def group_dates_by(self): return self._group_dates_by @property def levels(self): return self._levels @property def pregrid_prefix(self): return self._pregrid_prefix @property def var_names(self): return self._var_names