Source code for bmi_wavewatch3.cli

import inspect
import itertools
import os
import pathlib
import sys
import textwrap
import urllib
from collections import namedtuple
from functools import partial
from multiprocessing import Pool, RLock

import click
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

from .downloader import WaveWatch3Downloader
from .errors import ChoiceError, DateValueError
from .source import SOURCES
from .wavewatch3 import WaveWatch3

out = partial(click.secho, bold=True, file=sys.stderr)
err = partial(click.secho, fg="red", file=sys.stderr)


DownloadResult = namedtuple("DownloadResult", ["remote", "local", "success", "status"])


[docs]def validate_date(ctx, param, value): source = SOURCES[ctx.parent.params["source"]] for date_str in [value] if isinstance(value, str) else value: try: source.validate_date(date_str) except DateValueError as error: raise click.BadParameter(error) return value
[docs]def validate_quantity(ctx, param, value): source = SOURCES[ctx.parent.params["source"]] if not value: return sorted(source.QUANTITIES) for quantity in [value] if isinstance(value, str) else value: try: source.validate_quantity(quantity) except ChoiceError as error: raise click.BadParameter(error) return value
[docs]def validate_data_var(ctx, param, value): data_var_to_quantity = { "dirpw": "dp", "swh": "hs", "perpw": "tp", "u": "wind", "v": "wind", "swdir": "pdir", "swell": "phs", "swper": "ptp", } source = SOURCES[ctx.parent.params["source"]] try: quantity = data_var_to_quantity[value] except KeyError: raise click.BadParameter(ChoiceError(value, list(data_var_to_quantity))) try: source.validate_quantity(quantity) except ChoiceError as error: raise click.BadParameter(error) return value
[docs]def validate_grid(ctx, param, value): source = SOURCES[ctx.parent.params["source"]] if not value: return inspect.signature(source).parameters["grid"].default try: source.validate_grid(value) except ChoiceError as error: raise click.BadParameter(error) return value
@click.group(chain=True) @click.version_option() @click.option( "--cd", default=".", type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True), help="chage to directory, then execute", ) @click.option( "-s", "--silent", is_flag=True, help="Suppress status status messages, including the progress bar.", ) @click.option( "-v", "--verbose", is_flag=True, help="Also emit status messages to stderr." ) @click.option( "--source", type=click.Choice(sorted(SOURCES)), default="multigrid", help="WAVEWATCH III data source", ) def ww3(cd, silent, verbose, source) -> None: """Download WAVEWATCH III data. \b Examples: Download WAVEWATCH III data by date, $ ww3 fetch 2010-05-22 2010-05-22 """ os.chdir(cd) @ww3.command() @click.option("--all", is_flag=True, help="info on all sources") @click.pass_context def info(ctx, all): source = ctx.parent.params["source"] sources = SOURCES if all else {source: SOURCES[source]} sections = [] for name, source in sources.items(): endpoint = urllib.parse.urlunparse( [source.SCHEME, source.NETLOC, source.PREFIX, "", "", ""] ) sections.append( textwrap.dedent( f""" [wavewatch3.sources.{name}] grids = {sorted(source.GRIDS)!r} quantities = {sorted(source.QUANTITIES)!r} min_date = {source.MIN_DATE!r} max_date = {source.MAX_DATE!r} endpoint = {endpoint!r}""" ).lstrip() ) print((2 * os.linesep).join(sections)) @ww3.command() @click.argument("date", nargs=-1, callback=validate_date) @click.option("--grid", default=None, help="Grid to download", callback=validate_grid) @click.option( "--quantity", "-q", multiple=True, help="Quantity to download", callback=validate_quantity, ) @click.pass_context def url(ctx, date, grid, quantity): """Construct URLs from which to download WAVEWATCH III data.""" Source = SOURCES[ctx.parent.params["source"]] for d in date: for q in quantity: print(Source(d, q, grid=grid)) @ww3.command() @click.argument("date", nargs=-1, callback=validate_date) @click.option("--dry-run", is_flag=True, help="do not actually download data") @click.option( "--force", "-f", is_flag=True, help="force download even if local file already exists", ) @click.option("--file", type=click.File("r", lazy=False), help="read dates from a file") @click.option("--grid", default=None, help="Grid to download", callback=validate_grid) @click.option( "--quantity", "-q", multiple=True, help="Quantity to download", callback=validate_quantity, ) @click.pass_context def fetch(ctx, date, dry_run, force, file, grid, quantity): """Download WAVEWATCH III data by date.""" verbose = ctx.parent.params["verbose"] silent = ctx.parent.params["silent"] Source = SOURCES[ctx.parent.params["source"]] if file: date += file.read().splitlines() urls = [str(Source(d, q, grid=grid)) for q in quantity for d in date] if not silent and verbose: for url in urls: out(url) if not dry_run: results = _retreive_urls(urls, disable=silent, force=force) if not silent: [ out(f"{result.status}: {result.local}") for result in results if result.success and result.status ] [ err(f"{result.status}: {result.remote}") for result in results if not result.success ] [print(result.local) for result in results if result.success] @ww3.command() @click.option("--dry-run", is_flag=True, help="only display what would have been done") @click.option( "--cache-dir", type=click.Path(file_okay=False, path_type=pathlib.Path), help="cache folder to clean", default="~/.wavewatch3/data", ) @click.option("--yes", is_flag=True, help="remove files without prompting") @click.pass_context def clean(ctx, dry_run, cache_dir, yes): """Remove cached date files.""" verbose = ctx.parent.params["verbose"] silent = ctx.parent.params["silent"] source = "multi_*" grid = "*" quantity = "*" date = "*" cache_dir = cache_dir.expanduser() cache_files = list( itertools.chain( cache_dir.glob(f"{source}.{grid}.{quantity}.{date}.grb2"), cache_dir.glob(f"{source}.{grid}.{quantity}.{date}.grb2.gz"), cache_dir.glob(f"{source}.{grid}.{quantity}.{date}.grb2.*.idx"), cache_dir.glob(f"{grid}.{quantity}.{date}.grb"), cache_dir.glob(f"{grid}.{quantity}.{date}.grb.*.idx"), ) ) total_bytes = sum(cache_file.stat().st_size for cache_file in cache_files) if not silent and not dry_run: for cache_file in cache_files: out(f"{cache_file}") out(f"Total size: {total_bytes // 2**20} MB") if not dry_run and len(cache_files): yes = yes or click.confirm( "Are you sure you want to remove all files?", abort=True ) for cache_file in cache_files: cache_file.unlink() if not dry_run else out(f"rm {cache_file}") if not dry_run and (verbose and not silent): out(f"Removed {len(cache_files)} files ({total_bytes} bytes)") @ww3.command() @click.argument("date", callback=validate_date) @click.option("--grid", default=None, help="Grid to download", callback=validate_grid) @click.option( "--data-var", help="Data variable to plot", default="swh", callback=validate_data_var, ) @click.pass_context def plot(ctx, date, grid, data_var): """Plot WAVEWATCH III data by date.""" verbose = ctx.parent.params["verbose"] silent = ctx.parent.params["silent"] source = ctx.parent.params["source"] data_var_to_quantity = { "dirpw": "dp", "swh": "hs", "perpw": "tp", "u": "wind", "v": "wind", "swdir": "pdir", "swell": "phs", "swper": "ptp", } quantity = data_var_to_quantity[data_var] if not silent: out(f"source: {source}") out(f"grid: {grid}") out(f"date: {date}") out(f"data_var: {data_var} ({quantity})") ww3 = WaveWatch3(date, source=source, grid=grid) if not silent and verbose: [out(f"source file: {url}") for url in ww3._urls] ww3.data if not silent and verbose: [out(f"cache file: {ww3._cache / url.filename}") for url in ww3._urls] ww3.data[data_var][ww3.step, :, :].plot() plt.gca().set_aspect(1) plt.show() def _retreive_urls(urls, disable=False, force=False): tqdm.set_lock(RLock()) p = Pool(initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),)) return p.map( partial(_retreive, disable=disable, force=force), list(enumerate(urls)) ) def _retreive(position_and_url, disable=False, force=False): position, url = position_and_url name = pathlib.Path(urllib.parse.urlparse(url).path).name if not pathlib.Path(name).is_file() or force: with TqdmUpTo( unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=name, position=position, disable=disable, leave=False, ) as t: try: WaveWatch3Downloader.retreive( url, filename=name, reporthook=t.update_to, force=force ) except (urllib.error.HTTPError, urllib.error.URLError) as error: success, status = False, str(error) else: t.total = t.n success, status = True, f"downloaded {t.total} bytes" else: success, status = True, "cached" return DownloadResult( local=pathlib.Path(name).absolute(), remote=url, success=success, status=status )
[docs]class TqdmUpTo(tqdm):
[docs] def update_to(self, b=1, bsize=1, tsize=None): if tsize is not None: self.total = tsize return self.update(b * bsize - self.n) # also sets self.n = b * bsize