Newer
Older
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# By: Emmanuel Raviart <emmanuel.raviart@cepremap.org>
#
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Fetch series from Eurostat, the statistical office of the European Union, using bulk download and SDMX formats.
http://ec.europa.eu/eurostat/data/database
EUROSTAT bulk download:
- http://ec.europa.eu/eurostat/fr/data/bulkdownload
- http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing
EUROSTAT SDMX documentation:
- http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome
- http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1
"""
import argparse
from dulwich.errors import NotGitRepository
from dulwich.repo import Repo
from dbnomics_data_model.storages import StorageError
from dbnomics_data_model.storages import git as git_storage
log = logging.getLogger(__name__)
def iter_datasets_to_download(toc_element, old_toc_element=None):
"""Yield datasets. If old_toc_element is provided, yield only updated datasets."""
old_last_update_by_dataset_code = {}
# Index lastUpdate attributes in old table_of_contents.xml.
for element in old_toc_element.iterfind('.//nt:leaf[@type="dataset"]', namespaces=nsmap):
dataset_code = element.findtext("nt:code", namespaces=nsmap)
old_last_update = element.findtext("nt:lastUpdate", namespaces=nsmap)
old_last_update_by_dataset_code[dataset_code] = old_last_update
for leaf_element in toc_element.iterfind('.//nt:leaf[@type="dataset"]', namespaces=nsmap):
if old_toc_element is None:
yield leaf_element
else:
dataset_code = leaf_element.findtext('nt:code', namespaces=nsmap)
old_last_update = old_last_update_by_dataset_code.get(dataset_code)
if old_last_update is None:
# This leaf_element is newer in this version of table_of_contents.xml
yield leaf_element
else:
last_update = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap)
if last_update != old_last_update:
yield leaf_element
parser.add_argument('target_dir', type=Path, help='path of target directory containing Eurostat series')
parser.add_argument('--full', action='store_true',
help='download all datasets; default behavior is to download what changed since last commit')
parser.add_argument('--log', default='INFO', help='level of logging messages')
parser.add_argument('--resume', action='store_true', help='keep existing files in target directory')
if not args.target_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.target_dir)))
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: {}'.format(args.log))
logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
parser = etree.XMLParser(remove_blank_text=True)
log.info("Mode: %s", "full" if args.full else "incremental")
table_of_contents_file_name = 'table_of_contents.xml'
if not args.full:
try:
repo = Repo(str(args.target_dir))
except NotGitRepository:
log.error("%s is not a Git repository. Use --full option to download all the files of the provider.",
args.target_dir)
sys.exit(1)
try:
tree = git_storage.get_commit_tree(repo, "HEAD", args.target_dir)
except StorageError:
log.error("%s is not a Git repository. Use --full option to download all the files of the provider.",
args.target_dir)
sys.exit(1)
table_of_contents_bytes = git_storage.load_text_blob(repo, tree, table_of_contents_file_name, decode=False)
old_toc_element = etree.fromstring(table_of_contents_bytes)
# Fetch new table of contents, abbreviated "toc".
xml_file_path = args.target_dir / table_of_contents_file_name
xml_url = 'http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}'.format(
table_of_contents_file_name)
log.info('Fetching table of content {}'.format(xml_url))
download(xml_url, xml_file_path)
toc_element = etree.parse(str(xml_file_path), parser=parser)
# Create first-level directories.
data_dir = args.target_dir / 'data'
data_dir.mkdir(exist_ok=True)
datastructures_dir = args.target_dir / 'datastructure'
datastructures_dir.mkdir(exist_ok=True)
# Build set of datasets and set of datasets definitions to download.
datasets = set()
datastructures = set()
for leaf_element in iter_datasets_to_download(toc_element, old_toc_element):
dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap)
if dataset_url:
dataset_code = leaf_element.findtext('./nt:code', namespaces=nsmap)
datasets.add((dataset_code, dataset_url))
datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap)
if datastructure_url:
datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0]
datastructures.add((datastructure_code, datastructure_url))
log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures)))
for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
dataset_dir = data_dir / dataset_code
dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code)
if args.resume and dataset_xml_file_path.is_file():
log.info('Skipping existing dataset {}/{} {}'.format(index, len(datasets), dataset_url))
if dataset_dir.is_dir():
shutil.rmtree(str(dataset_dir))
dataset_dir.mkdir(exist_ok=True)
log.info('Fetching dataset {}/{} {}'.format(index, len(datasets), dataset_url))
zip_file_path = dataset_dir / "{}.zip".format(dataset_code)
download(dataset_url, zip_file_path)
unzip(zip_file_path, dataset_dir)
zip_file_path.unlink()
# Fetch datasets definitions.
for index, (datastructure_code, datastructure_url) in enumerate(sorted(datastructures), start=1):
datastructure_dir = datastructures_dir / datastructure_code
datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(datastructure_code)
# Handle particular case for rail_if_esms.
if datastructure_code == "rail_if_esms":
datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(datastructure_code)
if args.resume and datastructure_xml_file_path.is_file():
log.info('Skipping existing data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
if datastructure_dir.is_dir():
shutil.rmtree(str(datastructure_dir))
datastructure_dir.mkdir(exist_ok=True)
log.info('Fetching data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code)
download(datastructure_url, zip_file_path)
unzip(zip_file_path, datastructure_dir)
zip_file_path.unlink()
def download(url, output_file: Path):
if output_file.is_file():
output_file.unlink()
return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)])
def unzip(zip_file: Path, output_dir: Path):
return subprocess.run(["unzip", "-d", str(output_dir), str(zip_file)])
if __name__ == '__main__':
sys.exit(main())