#! /usr/bin/env python3 # eurostat-fetcher -- Fetch series from Eurostat database # By: Christophe Benz <christophe.benz@cepremap.org> # # Copyright (C) 2017-2018 Cepremap # https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher # # eurostat-fetcher is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # eurostat-fetcher is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. """Convert Eurostat provider, categories, datasets and time series to DBnomics JSON and TSV files.""" import argparse import logging import re import shutil import subprocess import sys import time from collections import OrderedDict from io import StringIO from pathlib import Path import humanize from lxml import etree from toolz import get_in, valmap import ujson as json from dbnomics_data_model import observations provider_code = 'Eurostat' provider_json = { "code": provider_code, "name": provider_code, "region": "EU", "terms_of_use": "http://ec.europa.eu/eurostat/about/policies/copyright", "website": "http://ec.europa.eu/eurostat/home", } args = None # Will be defined by main(). datasets_dir_name = "data" log = logging.getLogger(__name__) namespace_url_by_name = {"xml": "http://www.w3.org/XML/1998/namespace"} timings = None DAILY_PERIOD_RE = re.compile(r"(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})") def convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file): global timings # Due to event=end, given to iterparse, we receive <Obs> then <Series> elements, in this order. if element.tag.endswith("Series"): # Ignore some specific XML element attributes corresponding to series SDMX attributes, # because series SDMX attributes do not exist in DBnomics. series_element_attributes = OrderedDict([ (attribute_key, attribute_value) for attribute_key, attribute_value in element.attrib.items() if attribute_key not in {"TIME_FORMAT"} # Redundant with FREQ. ]) dimensions_codes_order = list(series_element_attributes.keys()) if dataset_json["dimensions_codes_order"] is None: dataset_json["dimensions_codes_order"] = dimensions_codes_order else: # dimensions_codes_order must not change between series. assert dataset_json["dimensions_codes_order"] == dimensions_codes_order, \ (dataset_json["dimensions_codes_order"], dimensions_codes_order) # Fill series dimensions labels in dataset.json. t0 = time.time() for dimension_code, dimension_value_code in series_element_attributes.items(): if dimension_code not in dataset_json["dimensions_labels"]: dimension_label = dsd_infos["concepts"].get(dimension_code) if dimension_label and dimension_code not in dataset_json["dimensions_labels"]: # Some dimensions labels are an empty string: e.g. bs_bs12_04.sdmx.xml dataset_json["dimensions_labels"][dimension_code] = dimension_label if dimension_code in dataset_json["dimensions_values_labels"] and \ dimension_value_code in dataset_json["dimensions_values_labels"][dimension_code]: continue codelist_code = dsd_infos["codelist_by_concept"][dimension_code] dimension_value_label = get_in([codelist_code, dimension_value_code], dsd_infos["codelists"]) if dimension_value_label: dataset_json["dimensions_values_labels"].setdefault( dimension_code, {})[dimension_value_code] = dimension_value_label timings["series_labels"] += time.time() - t0 # Series code is not defined by provider: create it from dimensions values codes. series_code = ".".join( series_element_attributes[dimension_code] for dimension_code in dimensions_codes_order ) # Write series JSON to file. t0 = time.time() observations_header = [["PERIOD", "VALUE"] + dsd_infos["attributes"]] frequency = series_element_attributes["FREQ"] observations_body = list(iter_normalized_observations(dataset_context["current_series_observations"], frequency)) \ if frequency == "D" \ else dataset_context["current_series_observations"] series_json = { "code": series_code, "dimensions": [ series_element_attributes[dimension_code] # Every dimension MUST be defined for each series. for dimension_code in dimensions_codes_order ], "observations": observations_header + observations_body, } json.dump(series_json, series_jsonl_file, ensure_ascii=False, sort_keys=True) series_jsonl_file.write("\n") timings["series_file"] += time.time() - t0 # Reset context for next series. dataset_context["current_series_observations"] = [] elif element.tag.endswith("Obs"): # Fill observations attributes labels in dataset.json. t0 = time.time() for attribute_code, attribute_value_code in element.attrib.items(): # Ignore period and value observations XML attributes, because they don't need labels. if attribute_code in ["TIME_PERIOD", "OBS_VALUE"]: continue attribute_label = dsd_infos["concepts"].get(attribute_code) if attribute_label and attribute_code not in dataset_json["attributes_labels"]: dataset_json["attributes_labels"][attribute_code] = attribute_label # Some attributes values codes are multi-valued and concatenated into the same string. attribute_value_codes = list(attribute_value_code) \ if attribute_code == "OBS_STATUS" \ else [attribute_value_code] for attribute_value_code in attribute_value_codes: if attribute_code in dataset_json["attributes_values_labels"] and \ attribute_value_code in dataset_json["attributes_values_labels"][attribute_code]: continue codelist_code = dsd_infos["codelist_by_concept"][attribute_code] attribute_value_label = get_in([codelist_code, attribute_value_code], dsd_infos["codelists"]) if attribute_value_label: dataset_json["attributes_values_labels"].setdefault( attribute_code, {})[attribute_value_code] = attribute_value_label timings["observations_labels"] += time.time() - t0 obs_value = observations.value_to_float(element.attrib.get("OBS_VALUE")) dataset_context["current_series_observations"].append([ element.attrib["TIME_PERIOD"], # Will be normalized later, if needed. obs_value, ] + [ element.attrib.get(attribute_name, "") for attribute_name in dsd_infos["attributes"] ]) elif element.tag.endswith("Extracted"): dataset_json["updated_at"] = element.text + "Z" # Assume the value is UTC time. def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, dataset_dir: Path): global timings timings = { k: 0 for k in {"series_labels", "series_file", "observations_labels", "dsd_infos"} } assert dataset_json_stub.get("name"), dataset_json_stub assert dataset_dir.is_dir(), dataset_dir dataset_code = dataset_json_stub["code"] # Load DSD dsd_file_path = args.source_dir / datasets_dir_name / dataset_code / "{}.dsd.xml".format(dataset_code) dsd_element = etree.parse(str(dsd_file_path)).getroot() # Initialize dataset.json data dataset_json = { "attributes_labels": {}, # Will be defined by each series. "attributes_values_labels": {}, # Will be defined by each series. "dimensions_codes_order": None, # Will be defined by first series. "dimensions_labels": {}, # Will be defined by each series. "dimensions_values_labels": {}, # Will be defined by each series. } dataset_json.update(dataset_json_stub) t0 = time.time() dsd_infos = { "attributes": [ element.attrib["conceptRef"] for element in dsd_element.iterfind(".//{*}Attribute[@attachmentLevel='Observation']") ], "codelists": { element.attrib["id"]: { code_element.attrib["value"]: code_element.findtext( "./{*}Description[@xml:lang='en']", namespaces=namespace_url_by_name) for code_element in element.iterfind("./{*}Code") } for element in dsd_element.iterfind('.//{*}CodeList') }, "concepts": { element.attrib["id"]: element.findtext("./{*}Name[@xml:lang='en']", namespaces=namespace_url_by_name) for element in dsd_element.iterfind('.//{*}Concept') }, "codelist_by_concept": { element.attrib["conceptRef"]: element.attrib["codelist"] for element in dsd_element.find(".//{*}Components") if "conceptRef" in element.attrib and "codelist" in element.attrib }, } timings["dsd_infos"] += time.time() - t0 with (dataset_dir / "series.jsonl").open("w") as series_jsonl_file: dataset_context = { "current_series_observations": [], } # Side-effects: mutate dataset_context, write files. context = etree.iterparse(str(sdmx_file), events=["end"]) for event, element in context: convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file) if event == "end": # Inspired from fast_iter, cf https://www.ibm.com/developerworks/xml/library/x-hiperfparse/ element.clear() while element.getprevious() is not None: del element.getparent()[0] continue del context write_json_file(dataset_dir / "dataset.json", without_falsy_values(dataset_json)) log.debug("timings: {} total: {:.3f}".format(valmap("{:.3f}".format, timings), sum(timings.values()))) def iter_normalized_observations(observations, frequency): for observation in observations: period = observation[0] if frequency == "D": period = normalize_period(period, frequency) yield [period] + observation[1:] def normalize_period(s, frequency): if frequency == "D": m = DAILY_PERIOD_RE.match(s) if m: return '{}-{}-{}'.format(m.group('year'), m.group('month'), m.group('day')) return s def toc_to_category_tree(xml_element, toc_dataset_json_stub_by_code): """Walk recursively xml_element (table_of_contents.xml) and return category_tree_json. Side-effects: fill toc_dataset_json_stub_by_code. """ xml_element_tag = xml_element.tag[len("urn:eu.europa.ec.eurostat.navtree") + 2:] if xml_element_tag == "tree": return list(filter(None, ( toc_to_category_tree(child_element, toc_dataset_json_stub_by_code) for child_element in xml_element ))) elif xml_element_tag == "branch": children = list(filter(None, ( toc_to_category_tree(child_element, toc_dataset_json_stub_by_code) for child_element in xml_element.iterfind("{*}children/*") ))) return without_falsy_values({ "code": xml_element.findtext("{*}code"), "name": xml_element.findtext("{*}title[@language='en']"), "children": children, }) if children else None elif xml_element_tag == "leaf" and xml_element.attrib["type"] == "dataset": dataset_code = xml_element.findtext("{*}code") dataset_name = xml_element.findtext("{*}title[@language='en']") if dataset_code not in toc_dataset_json_stub_by_code: toc_dataset_json_stub_by_code[dataset_code] = { "code": dataset_code, "name": dataset_name, "description": xml_element.findtext("{*}shortDescription[@language='en']") or None, "doc_href": xml_element.findtext("{*}metadata[@format='html']") or None, } return { "code": dataset_code, "name": dataset_name, } def main(): global args global timings parser = argparse.ArgumentParser() parser.add_argument('source_dir', type=Path, help='path of source directory containing Eurostat series in source format') parser.add_argument('target_dir', type=Path, help='path of target directory containing datasets & ' 'series in DBnomics JSON and TSV formats') parser.add_argument('--datasets', nargs='+', metavar='DATASET_CODE', help='convert only the given datasets') parser.add_argument('--full', action='store_true', help='convert all datasets; default behavior is to convert what changed since last commit') parser.add_argument('--log', default='INFO', help='level of logging messages') parser.add_argument('--resume', action='store_true', help='do not process already written datasets') parser.add_argument('--start-from', metavar='DATASET_CODE', help='start indexing from dataset code') args = parser.parse_args() if not args.source_dir.is_dir(): parser.error("Could not find directory {!r}".format(str(args.source_dir))) if not args.target_dir.is_dir(): parser.error("Could not find directory {!r}".format(str(args.target_dir))) numeric_level = getattr(logging, args.log.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: {}'.format(args.log)) logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level) # Ask Git which datasets directories were modified in latest commit in source-data repository. if not args.full: try: output = subprocess.check_output(["git", "diff", "--name-status", "HEAD^", datasets_dir_name], cwd=str(args.source_dir), universal_newlines=True) except subprocess.CalledProcessError: args.full = True else: modified_datasets_codes = set() deleted_datasets_codes = set() for line in StringIO(output): action, file_path = line.strip().split() try: dataset_code = Path(file_path).parent.relative_to(datasets_dir_name).name except ValueError: continue if action in {"A", "M"}: modified_datasets_codes.add(dataset_code) else: assert action == "D", action deleted_datasets_codes.add(dataset_code) log.info("%d datasets were modified and %d were deleted by last download", len(modified_datasets_codes), len(deleted_datasets_codes)) log.info("Mode: %s", "full" if args.full else "incremental") # Parse "table_of_contents", abbreviated "toc". toc_element = etree.parse(str(args.source_dir / "table_of_contents.xml")).getroot() # Walk recursively table_of_contents.xml and return category_tree_json. # Side-effects: fill toc_dataset_json_stub_by_code. toc_dataset_json_stub_by_code = {} category_tree_json = toc_to_category_tree(toc_element, toc_dataset_json_stub_by_code) datasets_codes_to_convert = set() for dataset_code in sorted(toc_dataset_json_stub_by_code): if args.datasets and dataset_code not in args.datasets: log.debug("Skipping dataset %r because it is not mentioned by --datasets option", dataset_code) continue if not args.full and dataset_code not in modified_datasets_codes: log.debug("Skipping dataset %r because it was not modified by last download (due to incremental mode)", dataset_code) continue if args.start_from is not None and dataset_code < args.start_from: log.debug("Skipping dataset %r because of --start-from option", dataset_code) continue source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code if not source_dataset_dir.is_dir(): log.error("Skipping dataset %s because source directory %s is missing", dataset_code, str(source_dataset_dir)) continue sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code) if not sdmx_file.is_file(): log.error("Skipping dataset %s because SDMX file %s is missing", dataset_code, str(sdmx_file)) continue dataset_dir = args.target_dir / dataset_code if args.resume and (dataset_dir / "dataset.json").is_file(): log.debug("Skipping dataset %r because it already exists (due to --resume option)", dataset_code) continue datasets_codes_to_convert.add(dataset_code) log.info("Converting %d datasets...", len(datasets_codes_to_convert)) # Remove directories of datasets to be converted before converting. if not args.resume: log.info("Removing directories of deleted datasets and datasets to be converted...") datasets_codes_to_delete = datasets_codes_to_convert if not args.full: datasets_codes_to_delete = datasets_codes_to_delete.union(deleted_datasets_codes) for dataset_code in datasets_codes_to_delete: dataset_dir = args.target_dir / dataset_code if dataset_dir.is_dir(): shutil.rmtree(str(dataset_dir)) # Convert SDMX files. Side-effect: write files for each dataset. converted_datasets_codes = set() for index, dataset_code in enumerate(sorted(datasets_codes_to_convert), start=1): if dataset_code in converted_datasets_codes: log.debug("Skipping dataset %r because it was already converted", dataset_code) continue source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code) log.info("Converting SDMX source file %d/%d %s (%s)", index, len(datasets_codes_to_convert), sdmx_file, humanize.naturalsize(sdmx_file.stat().st_size, gnu=True)) dataset_dir = args.target_dir / dataset_code dataset_dir.mkdir(exist_ok=True) dataset_json_stub = toc_dataset_json_stub_by_code[dataset_code] convert_sdmx_file(dataset_json_stub, sdmx_file, dataset_dir) converted_datasets_codes.add(dataset_code) write_json_file(args.target_dir / "provider.json", provider_json) if category_tree_json: write_json_file(args.target_dir / "category_tree.json", category_tree_json) return 0 def without_falsy_values(mapping): return { k: v for k, v in mapping.items() if v } def write_json_file(path, data): with path.open("w") as f: json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True) if __name__ == '__main__': sys.exit(main())