Skip to content
Snippets Groups Projects
download.py 9.98 KiB
Newer Older
Emmanuel Raviart's avatar
Emmanuel Raviart committed
#! /usr/bin/env python3


# eurostat-fetcher -- Fetch series from Eurostat database
# By: Emmanuel Raviart <emmanuel.raviart@cepremap.org>
#
Christophe Benz's avatar
Christophe Benz committed
# Copyright (C) 2017-2018 Cepremap
Emmanuel Raviart's avatar
Emmanuel Raviart committed
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
Christophe Benz's avatar
Christophe Benz committed
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
Christophe Benz's avatar
Christophe Benz committed
"""Fetch series from Eurostat using bulk download and SDMX formats.
Emmanuel Raviart's avatar
Emmanuel Raviart committed

http://ec.europa.eu/eurostat/data/database

EUROSTAT bulk download:
- http://ec.europa.eu/eurostat/fr/data/bulkdownload
- http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing

EUROSTAT SDMX documentation:
- http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome
- http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1
"""


import argparse
Emmanuel Raviart's avatar
Emmanuel Raviart committed
import shutil
import subprocess
import sys
from pathlib import Path
Christophe Benz's avatar
Christophe Benz committed
from dbnomics_data_model.storages import StorageError
from dbnomics_data_model.storages import git as git_storage
from dulwich.errors import NotGitRepository
from dulwich.repo import Repo
Emmanuel Raviart's avatar
Emmanuel Raviart committed
from lxml import etree

Christophe Benz's avatar
Christophe Benz committed
FULL_ENV_VAR = "FULL"
log = logging.getLogger(__name__)
Christophe Benz's avatar
Christophe Benz committed
    nt="urn:eu.europa.ec.eurostat.navtree",
Christophe Benz's avatar
Christophe Benz committed
)
def iter_datasets_to_download(toc_element, old_toc_element=None):
    """Yield datasets. If old_toc_element is provided, yield only updated datasets."""

    def iterate_datasets_elements(toc_element):
Christophe Benz's avatar
Christophe Benz committed
        for element in toc_element.iterfind(".//nt:leaf", namespaces=nsmap):
            if element.attrib["type"] in ("dataset", "table"):
                # This is a dataset
                yield element

    old_last_update_by_dataset_code = {}
    if old_toc_element is not None:
        # Index lastUpdate attributes in old table_of_contents.xml.
        for element in iterate_datasets_elements(old_toc_element):
            dataset_code = element.findtext("nt:code", namespaces=nsmap)
            old_last_update = element.findtext("nt:lastUpdate", namespaces=nsmap)
            old_last_update_by_dataset_code[dataset_code] = old_last_update

    for leaf_element in iterate_datasets_elements(toc_element):
        if old_toc_element is None:
            yield leaf_element
        else:
Christophe Benz's avatar
Christophe Benz committed
            dataset_code = leaf_element.findtext("nt:code", namespaces=nsmap)
            old_last_update = old_last_update_by_dataset_code.get(dataset_code)
            if old_last_update is None:
                # This leaf_element is newer in this version of table_of_contents.xml
                yield leaf_element
            else:
                last_update = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap)
                if last_update != old_last_update:
                    yield leaf_element


Emmanuel Raviart's avatar
Emmanuel Raviart committed
def main():
    parser = argparse.ArgumentParser()
Christophe Benz's avatar
Christophe Benz committed
    parser.add_argument(
        "target_dir",
        type=Path,
        help="path of target directory containing Eurostat series",
    )
    parser.add_argument(
        "--full",
        action="store_true",
        default=os.getenv(FULL_ENV_VAR),
        help="download all datasets; default behavior is to download what changed since last commit",
    )
    parser.add_argument("--log", default="INFO", help="level of logging messages")
    parser.add_argument(
        "--resume", action="store_true", help="keep existing files in target directory"
    )
    parser.add_argument(
        "--datasets",
        nargs="+",
        metavar="DATASET_CODE",
        help="convert only the given datasets (datasets codes, space separated)",
    )
Emmanuel Raviart's avatar
Emmanuel Raviart committed
    args = parser.parse_args()
    if not args.target_dir.is_dir():
        parser.error("Could not find directory {!r}".format(str(args.target_dir)))

    numeric_level = getattr(logging, args.log.upper(), None)
    if not isinstance(numeric_level, int):
Christophe Benz's avatar
Christophe Benz committed
        raise ValueError("Invalid log level: {}".format(args.log))
    logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
    parser = etree.XMLParser(remove_blank_text=True)

    log.info("Mode: %s", "full" if args.full else "incremental")
    # Read old table of contents.
Christophe Benz's avatar
Christophe Benz committed
    table_of_contents_file_name = "table_of_contents.xml"
    old_toc_element = None
    if not (args.full or args.datasets):
        # let's check that we can access to Git history (to compare table_of_contents.xml versions)
        try:
            repo = Repo(str(args.target_dir))
        except NotGitRepository:
Christophe Benz's avatar
Christophe Benz committed
            log.error(
                "%s is not a Git repository. Use --full option to download all the files of the provider.",
                args.target_dir,
            )
            sys.exit(1)
        try:
            tree = git_storage.get_commit_tree(repo, "HEAD", args.target_dir)
        except StorageError:
Christophe Benz's avatar
Christophe Benz committed
            log.error(
                "%s is not a Git repository. Use --full option to download all the files of the provider.",
                args.target_dir,
            )
            sys.exit(1)
Christophe Benz's avatar
Christophe Benz committed
        table_of_contents_bytes = git_storage.load_text_blob(
            repo, tree, table_of_contents_file_name, decode=False
        )
        old_toc_element = etree.fromstring(table_of_contents_bytes)
    # Fetch new table of contents, abbreviated "toc".
    xml_file_path = args.target_dir / table_of_contents_file_name
Christophe Benz's avatar
Christophe Benz committed
    xml_url = "http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}".format(
        table_of_contents_file_name
    )
    log.info("Fetching table of content {}".format(xml_url))
    download(xml_url, xml_file_path)
    toc_element = etree.parse(str(xml_file_path), parser=parser)
    # Create first-level directories.

Christophe Benz's avatar
Christophe Benz committed
    data_dir = args.target_dir / "data"
    data_dir.mkdir(exist_ok=True)
Christophe Benz's avatar
Christophe Benz committed
    datastructures_dir = args.target_dir / "datastructure"
    datastructures_dir.mkdir(exist_ok=True)

    # Build set of datasets and set of datasets definitions to download.

    datasets = set()
    datastructures = set()
    for leaf_element in iter_datasets_to_download(toc_element, old_toc_element):
Christophe Benz's avatar
Christophe Benz committed
        dataset_code = leaf_element.findtext("./nt:code", namespaces=nsmap)
        if args.datasets and dataset_code not in args.datasets:
Christophe Benz's avatar
Christophe Benz committed
            log.debug(
                "Skipping dataset %r because it is not mentioned by --datasets option",
                dataset_code,
            )
Christophe Benz's avatar
Christophe Benz committed
        dataset_url = leaf_element.findtext(
            './nt:downloadLink[@format="sdmx"]', namespaces=nsmap
        )
        if dataset_url:
            datasets.add((dataset_code, dataset_url))
Christophe Benz's avatar
Christophe Benz committed
        datastructure_url = leaf_element.findtext(
            './nt:metadata[@format="sdmx"]', namespaces=nsmap
        )
        if datastructure_url:
Christophe Benz's avatar
Christophe Benz committed
            datastructure_code = datastructure_url.rsplit("/", 1)[-1].split(".", 1)[0]
            datastructures.add((datastructure_code, datastructure_url))
Christophe Benz's avatar
Christophe Benz committed
    log.info(
        "Downloading {} datasets and {} datastructures.".format(
            len(datasets), len(datastructures)
        )
    )
    for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
        dataset_dir = data_dir / dataset_code
        dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code)
        if args.resume and dataset_xml_file_path.is_file():
Christophe Benz's avatar
Christophe Benz committed
            log.info(
                "Skipping existing dataset {}/{} {}".format(
                    index, len(datasets), dataset_url
                )
            )
            if dataset_dir.is_dir():
                shutil.rmtree(str(dataset_dir))
            dataset_dir.mkdir(exist_ok=True)
Christophe Benz's avatar
Christophe Benz committed
            log.info(
                "Fetching dataset {}/{} {}".format(index, len(datasets), dataset_url)
            )
            zip_file_path = dataset_dir / "{}.zip".format(dataset_code)
            download(dataset_url, zip_file_path)
            unzip(zip_file_path, dataset_dir)
            zip_file_path.unlink()
    # Fetch datasets definitions.
Christophe Benz's avatar
Christophe Benz committed
    for index, (datastructure_code, datastructure_url) in enumerate(
        sorted(datastructures), start=1
    ):
        datastructure_dir = datastructures_dir / datastructure_code
Christophe Benz's avatar
Christophe Benz committed
        datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(
            datastructure_code
        )
        # Handle particular case for rail_if_esms.
        if datastructure_code == "rail_if_esms":
Christophe Benz's avatar
Christophe Benz committed
            datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(
                datastructure_code
            )
        if args.resume and datastructure_xml_file_path.is_file():
Christophe Benz's avatar
Christophe Benz committed
            log.info(
                "Skipping existing data structure {}/{} {}".format(
                    index, len(datastructures), datastructure_url
                )
            )
Emmanuel Raviart's avatar
Emmanuel Raviart committed
        else:
            if datastructure_dir.is_dir():
                shutil.rmtree(str(datastructure_dir))
            datastructure_dir.mkdir(exist_ok=True)
Christophe Benz's avatar
Christophe Benz committed
            log.info(
                "Fetching data structure {}/{} {}".format(
                    index, len(datastructures), datastructure_url
                )
            )
            zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code)
            download(datastructure_url, zip_file_path)
            unzip(zip_file_path, datastructure_dir)
            zip_file_path.unlink()
def download(url, output_file: Path):
    if output_file.is_file():
        output_file.unlink()
    return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)])


def unzip(zip_file: Path, output_dir: Path):
    return subprocess.run(["unzip", "-d", str(output_dir), str(zip_file)])
Christophe Benz's avatar
Christophe Benz committed
if __name__ == "__main__":
Emmanuel Raviart's avatar
Emmanuel Raviart committed
    sys.exit(main())