Skip to content
Snippets Groups Projects
download.py 9.37 KiB
Newer Older
Emmanuel Raviart's avatar
Emmanuel Raviart committed
#! /usr/bin/env python3


# eurostat-fetcher -- Fetch series from Eurostat database
# By: Emmanuel Raviart <emmanuel.raviart@cepremap.org>
#
Christophe Benz's avatar
Christophe Benz committed
# Copyright (C) 2017-2018 Cepremap
Emmanuel Raviart's avatar
Emmanuel Raviart committed
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
Christophe Benz's avatar
Christophe Benz committed
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
Emmanuel Raviart's avatar
Emmanuel Raviart committed


"""Fetch series from Eurostat, the statistical office of the European Union, using bulk download and SDMX formats.

http://ec.europa.eu/eurostat/data/database

EUROSTAT bulk download:
- http://ec.europa.eu/eurostat/fr/data/bulkdownload
- http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing

EUROSTAT SDMX documentation:
- http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome
- http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1
"""


import argparse
Emmanuel Raviart's avatar
Emmanuel Raviart committed
import shutil
import subprocess
import sys
from pathlib import Path
from dulwich.errors import NotGitRepository
from dulwich.repo import Repo
Emmanuel Raviart's avatar
Emmanuel Raviart committed
from lxml import etree

from dbnomics_data_model.storages import StorageError
from dbnomics_data_model.storages import git as git_storage
log = logging.getLogger(__name__)
Emmanuel Raviart's avatar
Emmanuel Raviart committed
    nt='urn:eu.europa.ec.eurostat.navtree',
Christophe Benz's avatar
Christophe Benz committed
)
def iter_datasets_to_download(toc_element, old_toc_element=None):
    """Yield datasets. If old_toc_element is provided, yield only updated datasets."""

    def iterate_datasets_elements(toc_element):
        for element in toc_element.iterfind('.//nt:leaf', namespaces=nsmap):
            if element.attrib["type"] in ("dataset", "table"):
                # This is a dataset
                yield element

    old_last_update_by_dataset_code = {}
    if old_toc_element is not None:
        # Index lastUpdate attributes in old table_of_contents.xml.
        for element in iterate_datasets_elements(old_toc_element):
            dataset_code = element.findtext("nt:code", namespaces=nsmap)
            old_last_update = element.findtext("nt:lastUpdate", namespaces=nsmap)
            old_last_update_by_dataset_code[dataset_code] = old_last_update

    for leaf_element in iterate_datasets_elements(toc_element):
        if old_toc_element is None:
            yield leaf_element
        else:
            dataset_code = leaf_element.findtext('nt:code', namespaces=nsmap)
            old_last_update = old_last_update_by_dataset_code.get(dataset_code)
            if old_last_update is None:
                # This leaf_element is newer in this version of table_of_contents.xml
                yield leaf_element
            else:
                last_update = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap)
                if last_update != old_last_update:
                    yield leaf_element


Emmanuel Raviart's avatar
Emmanuel Raviart committed
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('target_dir', type=Path, help='path of target directory containing Eurostat series')
    parser.add_argument('--full', action='store_true',
                        help='download all datasets; default behavior is to download what changed since last commit')
    parser.add_argument('--log', default='INFO', help='level of logging messages')
    parser.add_argument('--resume', action='store_true', help='keep existing files in target directory')
    parser.add_argument('--datasets', nargs='+', metavar='DATASET_CODE', help='convert only the given datasets (datasets codes, space separated)')
Emmanuel Raviart's avatar
Emmanuel Raviart committed
    args = parser.parse_args()
    if not args.target_dir.is_dir():
        parser.error("Could not find directory {!r}".format(str(args.target_dir)))

    numeric_level = getattr(logging, args.log.upper(), None)
    if not isinstance(numeric_level, int):
        raise ValueError('Invalid log level: {}'.format(args.log))
    logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
    parser = etree.XMLParser(remove_blank_text=True)

    log.info("Mode: %s", "full" if args.full else "incremental")
    # Read old table of contents.
    table_of_contents_file_name = 'table_of_contents.xml'

    old_toc_element = None
    if not (args.full or args.datasets):
        # let's check that we can access to Git history (to compare table_of_contents.xml versions)
        try:
            repo = Repo(str(args.target_dir))
        except NotGitRepository:
            log.error("%s is not a Git repository. Use --full option to download all the files of the provider.",
                      args.target_dir)
            sys.exit(1)
        try:
            tree = git_storage.get_commit_tree(repo, "HEAD", args.target_dir)
        except StorageError:
            log.error("%s is not a Git repository. Use --full option to download all the files of the provider.",
                      args.target_dir)
            sys.exit(1)
        table_of_contents_bytes = git_storage.load_text_blob(repo, tree, table_of_contents_file_name, decode=False)
        old_toc_element = etree.fromstring(table_of_contents_bytes)
    # Fetch new table of contents, abbreviated "toc".
    xml_file_path = args.target_dir / table_of_contents_file_name
    xml_url = 'http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}'.format(
        table_of_contents_file_name)
    log.info('Fetching table of content {}'.format(xml_url))
    download(xml_url, xml_file_path)
    toc_element = etree.parse(str(xml_file_path), parser=parser)
    # Create first-level directories.

    data_dir = args.target_dir / 'data'
    data_dir.mkdir(exist_ok=True)
    datastructures_dir = args.target_dir / 'datastructure'
    datastructures_dir.mkdir(exist_ok=True)

    # Build set of datasets and set of datasets definitions to download.

    datasets = set()
    datastructures = set()
    for leaf_element in iter_datasets_to_download(toc_element, old_toc_element):
        dataset_code = leaf_element.findtext('./nt:code', namespaces=nsmap)
        if args.datasets and dataset_code not in args.datasets:
            log.debug("Skipping dataset %r because it is not mentioned by --datasets option", dataset_code)
            continue
        dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap)
        if dataset_url:
            datasets.add((dataset_code, dataset_url))
        datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap)
        if datastructure_url:
            datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0]
            datastructures.add((datastructure_code, datastructure_url))

    log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures)))

    for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
        dataset_dir = data_dir / dataset_code
        dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code)
        if args.resume and dataset_xml_file_path.is_file():
            log.info('Skipping existing dataset {}/{} {}'.format(index, len(datasets), dataset_url))
            if dataset_dir.is_dir():
                shutil.rmtree(str(dataset_dir))
            dataset_dir.mkdir(exist_ok=True)
            log.info('Fetching dataset {}/{} {}'.format(index, len(datasets), dataset_url))
            zip_file_path = dataset_dir / "{}.zip".format(dataset_code)
            download(dataset_url, zip_file_path)
            unzip(zip_file_path, dataset_dir)
            zip_file_path.unlink()
    # Fetch datasets definitions.
    for index, (datastructure_code, datastructure_url) in enumerate(sorted(datastructures), start=1):
        datastructure_dir = datastructures_dir / datastructure_code
        datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(datastructure_code)
        # Handle particular case for rail_if_esms.
        if datastructure_code == "rail_if_esms":
            datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(datastructure_code)
        if args.resume and datastructure_xml_file_path.is_file():
            log.info('Skipping existing data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
Emmanuel Raviart's avatar
Emmanuel Raviart committed
        else:
            if datastructure_dir.is_dir():
                shutil.rmtree(str(datastructure_dir))
            datastructure_dir.mkdir(exist_ok=True)
            log.info('Fetching data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
            zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code)
            download(datastructure_url, zip_file_path)
            unzip(zip_file_path, datastructure_dir)
            zip_file_path.unlink()
def download(url, output_file: Path):
    if output_file.is_file():
        output_file.unlink()
    return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)])


def unzip(zip_file: Path, output_dir: Path):
    return subprocess.run(["unzip", "-d", str(output_dir), str(zip_file)])
Emmanuel Raviart's avatar
Emmanuel Raviart committed


if __name__ == '__main__':
    sys.exit(main())