Skip to content
Snippets Groups Projects
download.py 9.30 KiB
#! /usr/bin/env python3


# eurostat-fetcher -- Fetch series from Eurostat database
# By: Emmanuel Raviart <emmanuel.raviart@cepremap.org>
#
# Copyright (C) 2017-2018 Cepremap
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.


"""Fetch series from Eurostat using bulk download and SDMX formats.

http://ec.europa.eu/eurostat/data/database

EUROSTAT bulk download:
- http://ec.europa.eu/eurostat/fr/data/bulkdownload
- http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing

EUROSTAT SDMX documentation:
- http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome
- http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1
"""


import argparse
import logging
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from xml.etree.ElementTree import Element

from lxml import etree

log = logging.getLogger(__name__)
nsmap = dict(
    nt="urn:eu.europa.ec.eurostat.navtree",
)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "target_dir",
        type=Path,
        help="path of target directory containing Eurostat series",
    )
    parser.add_argument(
        "--from-datetime",
        type=datetime_with_timezone,
        default=os.getenv("FROM_DATETIME"),
        help="download only the datasets that changed since the provided datetime",
    )
    parser.add_argument("--log", default="INFO", help="level of logging messages")
    parser.add_argument("--resume", action="store_true", help="keep existing files in target directory")
    parser.add_argument(
        "--datasets",
        default=os.getenv("DATASETS"),
        help="download only the given datasets (datasets codes, comma separated)",
    )
    args = parser.parse_args()

    if not args.target_dir.is_dir():
        parser.error("Could not find directory {!r}".format(str(args.target_dir)))

    numeric_level = getattr(logging, args.log.upper(), None)
    if not isinstance(numeric_level, int):
        raise ValueError("Invalid log level: {}".format(args.log))
    logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)

    log.info("Command-line arguments: %r", args)

    if args.from_datetime is None:
        log.info(
            "Argument --from-datetime was not provided: "
            "the requested datasets will be downloaded without taking their update date into account"
        )
    else:
        log.info(
            "Argument --from-datetime=%r: only datasets that changed since that datetime will be downloaded",
            args.from_datetime.isoformat(),
        )

    if args.datasets is not None:
        args.datasets = args.datasets.split(",")

    toc_file_name = "table_of_contents.xml"
    toc_file = args.target_dir / toc_file_name
    if args.resume and toc_file.is_file():
        log.debug("Skip downloading table of contents because of --resume and file already exists")
    else:
        toc_url = "http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}".format(
            toc_file_name
        )
        log.info("Downloading table of content from %r", toc_url)
        download(toc_url, toc_file)
    toc_element = parse_table_of_contents(toc_file)

    # Create first-level directories.

    data_dir = args.target_dir / "data"
    data_dir.mkdir(exist_ok=True)
    datastructures_dir = args.target_dir / "datastructure"
    datastructures_dir.mkdir(exist_ok=True)

    # Build set of datasets and set of datasets definitions to download.

    datasets = set()
    datastructures = set()
    for leaf_element in iter_datasets_to_download(toc_element, args.from_datetime):
        dataset_code = leaf_element.findtext("./nt:code", namespaces=nsmap)
        if args.datasets and dataset_code not in args.datasets:
            log.debug(
                "Skipping dataset %r because it is not mentioned by --datasets option",
                dataset_code,
            )
            continue
        dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap)
        if dataset_url:
            datasets.add((dataset_code, dataset_url))
        datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap)
        if datastructure_url:
            datastructure_code = datastructure_url.rsplit("/", 1)[-1].split(".", 1)[0]
            datastructures.add((datastructure_code, datastructure_url))

    log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures)))

    # Fetch datasets.

    for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
        dataset_dir = data_dir / dataset_code
        dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code)
        if args.resume and dataset_xml_file_path.is_file():
            log.debug("Skipping existing dataset {}/{} {}".format(index, len(datasets), dataset_url))
        else:
            if dataset_dir.is_dir():
                shutil.rmtree(str(dataset_dir))
            dataset_dir.mkdir(exist_ok=True)
            log.info("Fetching dataset {}/{} {}".format(index, len(datasets), dataset_url))
            zip_file_path = dataset_dir / "{}.zip".format(dataset_code)
            download(dataset_url, zip_file_path)
            unzip(zip_file_path, dataset_dir)
            zip_file_path.unlink()

    # Fetch datasets definitions.

    for index, (datastructure_code, datastructure_url) in enumerate(sorted(datastructures), start=1):
        datastructure_dir = datastructures_dir / datastructure_code
        datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(datastructure_code)
        # Handle particular case for rail_if_esms.
        if datastructure_code == "rail_if_esms":
            datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(datastructure_code)
        if args.resume and datastructure_xml_file_path.is_file():
            log.debug("Skipping existing data structure {}/{} {}".format(index, len(datastructures), datastructure_url))
        else:
            if datastructure_dir.is_dir():
                shutil.rmtree(str(datastructure_dir))
            datastructure_dir.mkdir(exist_ok=True)
            log.info("Fetching data structure {}/{} {}".format(index, len(datastructures), datastructure_url))
            zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code)
            download(datastructure_url, zip_file_path)
            unzip(zip_file_path, datastructure_dir)
            zip_file_path.unlink()

    return 0


def datetime_with_timezone(s: str) -> datetime:
    d = datetime.fromisoformat(s)
    if d.tzinfo is None:
        raise ValueError(f"Datetime must be provided with a timezone. Received {s!r}")
    return d


def download(url, output_file: Path):
    if output_file.is_file():
        output_file.unlink()
    return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)])


def parse_table_of_contents(file: Path) -> Element:
    xml_parser = etree.XMLParser(remove_blank_text=True)
    toc_element = etree.parse(str(file), parser=xml_parser)
    return toc_element


def iter_datasets_to_download(toc_element, from_datetime: Optional[datetime] = None):
    """Yield datasets elements for datasets to be downloaded.

    If from_datetime is provided, yield only the datasets that changed since that datetime."""

    def iter_dataset_elements(toc_element):
        for element in toc_element.iterfind(".//nt:leaf", namespaces=nsmap):
            if element.attrib["type"] in ("dataset", "table"):
                yield element

    for leaf_element in iter_dataset_elements(toc_element):
        if from_datetime is None:
            yield leaf_element
            continue

        dataset_code = leaf_element.findtext("nt:code", namespaces=nsmap)
        assert dataset_code

        last_update_str = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap)
        if not last_update_str:
            log.debug("Dataset %r has no lastUpdate attribute: it will be downloaded", dataset_code)
            yield leaf_element
            continue

        assert from_datetime is not None
        # Example: <nt:lastUpdate>18.03.2020</nt:lastUpdate>
        last_update = datetime.strptime(last_update_str, "%d.%m.%Y").replace(tzinfo=timezone.utc)
        if last_update >= from_datetime:
            log.debug(
                "Dataset %r lastUpdate is more recent than %r: it will be downloaded",
                dataset_code,
                from_datetime.isoformat(),
            )
            yield leaf_element


def unzip(zip_file: Path, output_dir: Path):
    return subprocess.run(["unzip", "-q", "-d", str(output_dir), str(zip_file)])


if __name__ == "__main__":
    sys.exit(main())