Skip to content
Snippets Groups Projects
Commit f9c047a8 authored by Christophe Benz's avatar Christophe Benz
Browse files

Rename args, clarify steps

parent 0ce1f52a
No related branches found
No related tags found
No related merge requests found
......@@ -38,12 +38,12 @@ EUROSTAT SDMX documentation:
import argparse
import io
import logging
import os
import re
import shutil
import subprocess
import sys
import zipfile
from pathlib import Path
import requests
from dulwich.repo import Repo
......@@ -58,7 +58,7 @@ nsmap = dict(
prepared_element_re = re.compile('<Prepared>.+</Prepared>')
def iter_datasets(xml_element, old_xml_element=None):
def iter_datasets_to_download(xml_element, old_xml_element=None):
"""Yield datasets. If old_xml_element is provided, yield only updated datasets."""
old_last_update_by_dataset_code = {}
if old_xml_element is not None:
......@@ -85,117 +85,123 @@ def iter_datasets(xml_element, old_xml_element=None):
def main():
parser = argparse.ArgumentParser()
parser.add_argument('target_dir', help='path of target directory containing Eurostat series')
parser.add_argument('--incremental', action='store_true',
help='download only datasets that changed since the last commit')
parser.add_argument('--keep-files', action='store_true', help='keep existing files in target directory')
parser.add_argument('target_dir', type=Path, help='path of target directory containing Eurostat series')
parser.add_argument('--full', action='store_true',
help='download all datasets; default behavior is to download what changed since last commit')
parser.add_argument('--resume', action='store_true', help='keep existing files in target directory')
parser.add_argument('-v', '--verbose', action='store_true', default=False, help='display debug logging messages')
args = parser.parse_args()
logging.basicConfig(
format="%(levelname)s: %(message)s",
format="%(levelname)s:%(asctime)s:%(message)s",
level=logging.DEBUG if args.verbose else logging.INFO,
stream=sys.stdout
)
assert os.path.exists(args.target_dir)
assert args.target_dir.exists()
log.info("Mode: {}".format("full" if args.full else "incremental"))
old_xml_element = None
table_of_contents_file_name = 'table_of_contents.xml'
if args.incremental:
repo = Repo(args.target_dir)
if not args.full:
repo = Repo(str(args.target_dir))
tree = git_storage.get_latest_commit_tree(repo)
if tree is None:
log.error("Incremental mode can't be used when source data repository has no commit.")
old_xml_element = etree.fromstring(git_storage.load_text_blob(repo, tree, table_of_contents_file_name))
table_of_contents_bytes = git_storage.load_text_blob(repo, tree, table_of_contents_file_name, decode=False)
old_xml_element = etree.fromstring(table_of_contents_bytes)
# Fetch list of datasets.
xml_file_path = os.path.join(args.target_dir, table_of_contents_file_name)
xml_file_path = args.target_dir / table_of_contents_file_name
parser = etree.XMLParser(remove_blank_text=True)
if args.keep_files and os.path.exists(xml_file_path):
if args.resume and args.full and xml_file_path.exists():
log.info("Skipping existing file {}".format(table_of_contents_file_name))
xml_element = etree.parse(xml_file_path, parser=parser)
xml_element = etree.parse(str(xml_file_path), parser=parser)
else:
xml_url = 'http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}'.format(
table_of_contents_file_name)
log.info('Fetching table of content {}'.format(xml_url))
response = requests.get(xml_url)
xml_element = etree.fromstring(response.content, parser=parser)
with open(xml_file_path, 'wb') as xml_file:
with open(str(xml_file_path), 'wb') as xml_file:
etree.ElementTree(xml_element).write(xml_file, encoding='utf-8', pretty_print=True, xml_declaration=True)
# Create first-level directories.
data_dir = args.target_dir / 'data'
data_dir.mkdir(exist_ok=True)
datastructures_dir = args.target_dir / 'datastructure'
datastructures_dir.mkdir(exist_ok=True)
# Build set of datasets and set of datasets definitions to download.
datasets = set()
datastructures = set()
for leaf_element in iter_datasets_to_download(xml_element, old_xml_element):
dataset_code = leaf_element.findtext('./nt:code', namespaces=nsmap)
dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap)
if dataset_url:
datasets.add((dataset_code, dataset_url))
datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap)
if datastructure_url:
datastructures.add(datastructure_url)
log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures)))
# Delete directories of datasets and datasets definitions to download.
if not args.resume:
for dataset_code, dataset_url in datasets:
dataset_dir = data_dir / dataset_code
if dataset_dir.exists():
shutil.rmtree(str(dataset_dir))
for datastructure_url in datastructures:
datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0]
datastructure_dir = datastructures_dir / datastructure_code
if datastructure_dir.exists():
shutil.rmtree(str(datastructure_dir))
# Fetch datasets.
data_dir = os.path.join(args.target_dir, 'data')
if os.path.exists(data_dir):
if not args.keep_files and not args.incremental:
for node_name in os.listdir(data_dir):
node_path = os.path.join(data_dir, node_name)
if os.path.isdir(node_path):
shutil.rmtree(node_path)
else:
os.remove(node_path)
else:
os.mkdir(data_dir)
data_urls = set()
metadata_urls = set()
for leaf_element in iter_datasets(xml_element, old_xml_element):
data_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap)
if data_url:
data_urls.add(data_url)
metadata_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap)
if metadata_url:
metadata_urls.add(metadata_url)
for index, data_url in enumerate(data_urls, start=1):
dataset_dir = os.path.join(data_dir, data_url.rsplit('/', 1)[-1].split('.', 1)[0])
if os.path.exists(dataset_dir):
log.info('Skipping existing dataset {}'.format(data_url))
for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
dataset_dir = data_dir / dataset_code
if args.resume and dataset_dir.exists() and list(dataset_dir.iterdir()):
log.info('Skipping existing dataset {}/{} {}'.format(index, len(datasets), dataset_url))
else:
os.mkdir(dataset_dir)
log.info('Fetching dataset {}/{} {}'.format(index, len(data_urls), data_url))
response = requests.get(data_url)
dataset_dir.mkdir(exist_ok=True)
log.info('Fetching dataset {}/{} {}'.format(index, len(datasets), dataset_url))
response = requests.get(dataset_url)
response.raise_for_status()
data_zip_file = zipfile.ZipFile(io.BytesIO(response.content))
for data_zip_info in data_zip_file.infolist():
if data_zip_info.filename.endswith('.xml'):
with data_zip_file.open(data_zip_info) as data_file:
xml_file_path = os.path.join(dataset_dir, data_zip_info.filename)
xml_file_path = dataset_dir / data_zip_info.filename
write_normalized_xml_file(xml_file_path, data_file)
else:
data_zip_file.extract(data_zip_info, dataset_dir)
data_zip_file.extract(data_zip_info, str(dataset_dir))
# Fetch datasets definitions.
data_structures_dir = os.path.join(args.target_dir, 'datastructure')
if os.path.exists(data_structures_dir):
if not args.keep_files and not args.incremental:
for node_name in os.listdir(data_structures_dir):
node_path = os.path.join(data_structures_dir, node_name)
if os.path.isdir(node_path):
shutil.rmtree(node_path)
else:
os.remove(node_path)
else:
os.mkdir(data_structures_dir)
for index, metadata_url in enumerate(metadata_urls, start=1):
metadata_dir = os.path.join(data_structures_dir, metadata_url.rsplit('/', 1)[-1].split('.', 1)[0])
if os.path.exists(metadata_dir):
log.info('Skipping existing data structure {}'.format(metadata_url))
for index, datastructure_url in enumerate(sorted(datastructures), start=1):
datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0]
datastructure_dir = datastructures_dir / datastructure_code
if args.resume and datastructure_dir.exists() and list(dataset_dir.iterdir()):
log.info('Skipping existing data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
else:
os.mkdir(metadata_dir)
log.info('Fetching data structure {}/{} {}'.format(index, len(metadata_urls), metadata_url))
response = requests.get(metadata_url)
datastructure_dir.mkdir(exist_ok=True)
log.info('Fetching data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
response = requests.get(datastructure_url)
response.raise_for_status()
metadata_zip_file = zipfile.ZipFile(io.BytesIO(response.content))
for metadata_zip_info in metadata_zip_file.infolist():
if metadata_zip_info.filename.endswith('.xml'):
with metadata_zip_file.open(metadata_zip_info) as metadata_file:
xml_file_path = os.path.join(metadata_dir, metadata_zip_info.filename)
xml_file_path = datastructure_dir / metadata_zip_info.filename
write_normalized_xml_file(xml_file_path, metadata_file)
else:
metadata_zip_file.extract(metadata_zip_info, metadata_dir)
metadata_zip_file.extract(metadata_zip_info, str(datastructure_dir))
return 0
......@@ -208,7 +214,7 @@ def write_normalized_xml_file(xml_file_path, source_file):
"""
global prepared_element_re
xml_str = source_file.read().decode('utf-8')
with open(xml_file_path, mode="w") as xml_file:
with open(str(xml_file_path), mode="w") as xml_file:
xml_file.write(prepared_element_re.sub("<Prepared>1111-11-11T11:11:11</Prepared>", xml_str, 1))
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment