Newer
Older
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# By: Emmanuel Raviart <emmanuel.raviart@cepremap.org>
#
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Fetch series from Eurostat using bulk download and SDMX formats.
http://ec.europa.eu/eurostat/data/database
EUROSTAT bulk download:
- http://ec.europa.eu/eurostat/fr/data/bulkdownload
- http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing
EUROSTAT SDMX documentation:
- http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome
- http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1
"""
import argparse
Emmanuel Raviart
committed
import os
from dbnomics_data_model.storages import StorageError
from dbnomics_data_model.storages import git as git_storage
from dulwich.errors import NotGitRepository
from dulwich.repo import Repo
Emmanuel Raviart
committed
log = logging.getLogger(__name__)
def iter_datasets_to_download(toc_element, old_toc_element=None):
"""Yield datasets. If old_toc_element is provided, yield only updated datasets."""
def iterate_datasets_elements(toc_element):
for element in toc_element.iterfind(".//nt:leaf", namespaces=nsmap):
if element.attrib["type"] in ("dataset", "table"):
# This is a dataset
yield element
old_last_update_by_dataset_code = {}
# Index lastUpdate attributes in old table_of_contents.xml.
for element in iterate_datasets_elements(old_toc_element):
dataset_code = element.findtext("nt:code", namespaces=nsmap)
old_last_update = element.findtext("nt:lastUpdate", namespaces=nsmap)
old_last_update_by_dataset_code[dataset_code] = old_last_update
for leaf_element in iterate_datasets_elements(toc_element):
dataset_code = leaf_element.findtext("nt:code", namespaces=nsmap)
old_last_update = old_last_update_by_dataset_code.get(dataset_code)
if old_last_update is None:
# This leaf_element is newer in this version of table_of_contents.xml
yield leaf_element
else:
last_update = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap)
if last_update != old_last_update:
yield leaf_element
parser.add_argument(
"target_dir",
type=Path,
help="path of target directory containing Eurostat series",
)
parser.add_argument(
"--full",
action="store_true",
default=os.getenv(FULL_ENV_VAR),
help="download all datasets; default behavior is to download what changed since last commit",
)
parser.add_argument("--log", default="INFO", help="level of logging messages")
parser.add_argument(
"--resume", action="store_true", help="keep existing files in target directory"
)
parser.add_argument(
"--datasets",
nargs="+",
metavar="DATASET_CODE",
help="convert only the given datasets (datasets codes, space separated)",
)
if not args.target_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.target_dir)))
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError("Invalid log level: {}".format(args.log))
logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
parser = etree.XMLParser(remove_blank_text=True)
log.info("Mode: %s", "full" if args.full else "incremental")
if not (args.full or args.datasets):
# let's check that we can access to Git history (to compare table_of_contents.xml versions)
try:
repo = Repo(str(args.target_dir))
except NotGitRepository:
log.error(
"%s is not a Git repository. Use --full option to download all the files of the provider.",
args.target_dir,
)
sys.exit(1)
try:
tree = git_storage.get_commit_tree(repo, "HEAD", args.target_dir)
except StorageError:
log.error(
"%s is not a Git repository. Use --full option to download all the files of the provider.",
args.target_dir,
)
table_of_contents_bytes = git_storage.load_text_blob(
repo, tree, table_of_contents_file_name, decode=False
)
old_toc_element = etree.fromstring(table_of_contents_bytes)
# Fetch new table of contents, abbreviated "toc".
xml_file_path = args.target_dir / table_of_contents_file_name
xml_url = "http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}".format(
table_of_contents_file_name
)
log.info("Fetching table of content {}".format(xml_url))
download(xml_url, xml_file_path)
toc_element = etree.parse(str(xml_file_path), parser=parser)
datastructures_dir.mkdir(exist_ok=True)
# Build set of datasets and set of datasets definitions to download.
datasets = set()
datastructures = set()
for leaf_element in iter_datasets_to_download(toc_element, old_toc_element):
dataset_code = leaf_element.findtext("./nt:code", namespaces=nsmap)
if args.datasets and dataset_code not in args.datasets:
log.debug(
"Skipping dataset %r because it is not mentioned by --datasets option",
dataset_code,
)
dataset_url = leaf_element.findtext(
'./nt:downloadLink[@format="sdmx"]', namespaces=nsmap
)
if dataset_url:
datasets.add((dataset_code, dataset_url))
datastructure_url = leaf_element.findtext(
'./nt:metadata[@format="sdmx"]', namespaces=nsmap
)
datastructure_code = datastructure_url.rsplit("/", 1)[-1].split(".", 1)[0]
datastructures.add((datastructure_code, datastructure_url))
log.info(
"Downloading {} datasets and {} datastructures.".format(
len(datasets), len(datastructures)
)
)
for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
dataset_dir = data_dir / dataset_code
dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code)
if args.resume and dataset_xml_file_path.is_file():
log.info(
"Skipping existing dataset {}/{} {}".format(
index, len(datasets), dataset_url
)
)
if dataset_dir.is_dir():
shutil.rmtree(str(dataset_dir))
log.info(
"Fetching dataset {}/{} {}".format(index, len(datasets), dataset_url)
)
zip_file_path = dataset_dir / "{}.zip".format(dataset_code)
download(dataset_url, zip_file_path)
unzip(zip_file_path, dataset_dir)
zip_file_path.unlink()
# Fetch datasets definitions.
for index, (datastructure_code, datastructure_url) in enumerate(
sorted(datastructures), start=1
):
datastructure_dir = datastructures_dir / datastructure_code
datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(
datastructure_code
)
# Handle particular case for rail_if_esms.
if datastructure_code == "rail_if_esms":
datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(
datastructure_code
)
if args.resume and datastructure_xml_file_path.is_file():
log.info(
"Skipping existing data structure {}/{} {}".format(
index, len(datastructures), datastructure_url
)
)
if datastructure_dir.is_dir():
shutil.rmtree(str(datastructure_dir))
log.info(
"Fetching data structure {}/{} {}".format(
index, len(datastructures), datastructure_url
)
)
zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code)
download(datastructure_url, zip_file_path)
unzip(zip_file_path, datastructure_dir)
zip_file_path.unlink()
def download(url, output_file: Path):
if output_file.is_file():
output_file.unlink()
return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)])
def unzip(zip_file: Path, output_dir: Path):
return subprocess.run(["unzip", "-d", str(output_dir), str(zip_file)])