Source code for bmi_wavewatch3.wavewatch3

import contextlib
import datetime
import os
import pathlib
from functools import partial
from multiprocessing import Pool

import numpy as np
import xarray as xr
from dateutil.relativedelta import relativedelta

from .downloader import WaveWatch3Downloader
from .errors import ChoiceError
from .source import SOURCES


[docs]class WaveWatch3:
[docs] def __init__( self, date, grid="glo_30m", cache="~/.wavewatch3/data", lazy=True, source="multigrid", ): """Advance through WAVEWATCH III data, downloading new data as needed. Parameters ---------- date : str Date as an isoformatted string ("YYYY-MM-DD"). grid : str, optional WAVEWATCH III grid region. cache : str or path-like, optional Folder into which to cache downloaded data. lazy : bool, optional If ``True``, wait to download data until the xarray Dataset is first accessed. source : str, optional Source from which to download data from. """ try: Source = SOURCES[source] except KeyError: raise ChoiceError(source, SOURCES) self._source = source self._cache = pathlib.Path(cache).expanduser() self._lazy = lazy self._data = None self._date = None self._step = 0 self._urls = [ Source(date, quantity=quantity, grid=grid) for quantity in Source.QUANTITIES ] self.date = date if not lazy: self._load_data()
@property def data(self): """Current WAVEWATCH III data as an xarray.Dataset.""" if self._data is None: self._load_data() return self._data @property def step(self): return self._step @step.setter def step(self, step): if step < 0: raise ValueError("step must be non-negative") if step < len(self.data.step): self._step = step self.date = str( (self.data.time + self.data.step[step]).values.astype("datetime64[h]") ) else: remaining = step - len(self._data.step) self.inc() self.step = remaining @property def source(self): """Source from which data will be downloaded.""" return self._source @property def grid(self): """The WAVEWATCH III grid region.""" return self._urls[0].grid @property def date(self): """Current date as an isoformatted string.""" return self._date.isoformat(timespec="hours") @date.setter def date(self, date): new_date = datetime.datetime.fromisoformat(date) if new_date != self._date: self._date = new_date for url in self._urls: url.month = new_date.month url.year = new_date.year self._data = None if not self._lazy: self._load_data() @property def year(self): """The current year.""" return self._date.year @property def month(self): """The current month.""" return self._date.month @property def day(self): return self._date.day @property def hour(self): return self._date.hour
[docs] def inc(self, months=1): """Increment to current date by some number of months. Parameters ---------- months : int, optional Number of months to increment the date by. """ self.date = (self._date + relativedelta(months=months)).isoformat()
def _load_data(self): """Load the current data into an xarray Dataset.""" self._fetch_data() self._data = xr.open_mfdataset( [self._cache / url.filename for url in self._urls], engine="cfgrib", parallel=False, ) self._step = np.searchsorted( self._data.step, np.datetime64(self.date, "ns") - self._data.time ) def _fetch_data(self): """Download data in parallel.""" self._cache.mkdir(parents=True, exist_ok=True) with as_cwd(self._cache): Pool().map(WaveWatch3Downloader.retreive, [str(url) for url in self._urls])
[docs] def __repr__(self): """String representation of a WaveWatch3 instance.""" return f"WaveWatch3({self.date!r}, grid={self.grid!r}, source={self.source!r})"
[docs] def __eq__(self, other): """Test if two WaveWatch3 instances refer to the same data.""" return ( self.month == other.month and self.year == other.year and self.grid == other.grid and self.source == other.source )
[docs] @staticmethod def fetch(date, folder=".", force=False, grid="glo_30m", source="multigrid"): """Fetch WAVEWATCH III data by date. Parameters ---------- date : str or iterable of str Date or list of dates isoformat strings ("YYYY-MM-DD"). folder : str or path-like, optional Destination folder into which to download data. force : bool, optional If ``True`` download the data even if the file to be downloaded already exists in the destination folder. grid : str, optional The WAVEWATCH III grid to download. Returns ------- list of path-like The downloaded (or cached) data files. """ dates = [date] if isinstance(date, str) else date folder = pathlib.Path(folder) try: Source = SOURCES[source] except KeyError: raise ChoiceError(source, SOURCES) urls = [] for date in dates: urls += [ Source(date, quantity=quantity, grid=grid) for quantity in Source.QUANTITIES ] with as_cwd(folder): Pool().map( partial(WaveWatch3Downloader.retreive, force=force), [str(url) for url in urls], ) return sorted(folder / url.filename for url in urls)
[docs]@contextlib.contextmanager def as_cwd(path): """Change directory context. Args: path (str): Path-like object to a directory. """ prev_cwd = pathlib.Path.cwd() os.chdir(path) yield prev_cwd os.chdir(prev_cwd)