Skip to content
Snippets Groups Projects
Commit 0a49add3 authored by Christophe Benz's avatar Christophe Benz
Browse files

Iterate over downloaded data as source of truth...

and produce a category tree containing only the converted datasets
parent 6b977e3c
No related branches found
No related tags found
No related merge requests found
Pipeline #339233 passed with stages
in 3 minutes and 38 seconds
......@@ -32,6 +32,7 @@ import sys
import time
from collections import OrderedDict
from pathlib import Path
from typing import Dict, Iterator, List, Set, Tuple
import humanize
import ujson as json
......@@ -48,7 +49,6 @@ provider_json = {
"website": "http://ec.europa.eu/eurostat/home",
}
args = None # Will be defined by main().
datasets_dir_name = "data"
log = logging.getLogger(__name__)
namespace_url_by_name = {"xml": "http://www.w3.org/XML/1998/namespace"}
......@@ -59,6 +59,36 @@ DATASETS_ENV_VAR = "DATASETS"
FULL_ENV_VAR = "FULL"
def convert_datasets(
datasets_to_convert: List[Tuple[str, Path]], dataset_json_stubs: Dict[str, dict], target_dir: Path
):
log.info("Converting %d datasets...", len(datasets_to_convert))
converted_datasets_codes = set()
for index, (dataset_code, source_dataset_dir) in enumerate(sorted(datasets_to_convert), start=1):
if dataset_code in converted_datasets_codes:
log.debug("Skipping dataset %r because it was already converted", dataset_code)
continue
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
log.info(
"Converting SDMX source file %d/%d %s (%s)",
index,
len(datasets_to_convert),
sdmx_file,
humanize.naturalsize(sdmx_file.stat().st_size, gnu=True),
)
dataset_dir = target_dir / dataset_code
dataset_dir.mkdir(exist_ok=True)
dataset_json_stub = dataset_json_stubs[dataset_code]
convert_sdmx_file(dataset_json_stub, sdmx_file, source_dataset_dir, dataset_dir)
converted_datasets_codes.add(dataset_code)
def convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file):
global timings
......@@ -188,7 +218,7 @@ def convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, seri
dataset_json["updated_at"] = element.text + "Z" # Assume the value is UTC time.
def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, dataset_dir: Path):
def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, source_dataset_dir: Path, dataset_dir: Path):
global timings
timings = {k: 0 for k in {"series_labels", "series_file", "observations_labels", "dsd_infos"}}
......@@ -198,7 +228,7 @@ def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, dataset_dir: Path):
dataset_code = dataset_json_stub["code"]
# Load DSD
dsd_file_path = args.source_dir / datasets_dir_name / dataset_code / "{}.dsd.xml".format(dataset_code)
dsd_file_path = source_dataset_dir / "{}.dsd.xml".format(dataset_code)
dsd_element = etree.parse(str(dsd_file_path)).getroot()
# Initialize dataset.json data
......@@ -263,6 +293,48 @@ def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, dataset_dir: Path):
log.debug("timings: {} total: {:.3f}".format(valmap("{:.3f}".format, timings), sum(timings.values())))
def iter_child_directories(directory: Path) -> Iterator[Path]:
"""Iterate over child directories of a directory."""
for child in directory.iterdir():
if child.is_dir():
yield child
def iter_datasets_to_convert(
source_datasets_dir: Path, target_dir: Path, *, datasets, resume
) -> Iterator[Tuple[str, Path]]:
for source_dataset_dir in sorted(iter_child_directories(source_datasets_dir)):
dataset_code = source_dataset_dir.name
if datasets and dataset_code not in datasets:
log.debug(
"Skipping dataset %r because it is not mentioned by --datasets option",
dataset_code,
)
continue
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
if not sdmx_file.is_file():
log.error(
"Skipping dataset %s because SDMX file %s is missing",
dataset_code,
str(sdmx_file),
)
continue
dataset_dir = target_dir / dataset_code
if resume and dataset_dir.is_dir():
log.debug(
"Skipping dataset %r because it already exists (due to --resume option)",
dataset_code,
)
continue
yield dataset_code, source_dataset_dir
def iter_normalized_observations(observations, frequency):
for observation in observations:
period = observation[0]
......@@ -279,16 +351,30 @@ def normalize_period(s, frequency):
return s
def toc_to_category_tree(xml_element, toc_dataset_json_stub_by_code):
def toc_to_category_tree(source_dir: Path, dataset_codes_to_convert: Set[str]):
"""Walk recursively table_of_contents.xml and return category_tree_json and dataset.json stubs."""
# Parse "table_of_contents", abbreviated "toc".
toc_element = etree.parse(str(source_dir / "table_of_contents.xml")).getroot()
dataset_json_stubs = {}
category_tree_json = toc_element_to_category_tree(toc_element, dataset_json_stubs, dataset_codes_to_convert)
return category_tree_json, dataset_json_stubs
def toc_element_to_category_tree(xml_element, dataset_json_stubs, dataset_codes_to_convert: Set[str]):
"""Walk recursively xml_element (table_of_contents.xml) and return category_tree_json.
Side-effects: fill toc_dataset_json_stub_by_code.
Side-effects: fill dataset_json_stubs.
"""
xml_element_tag = xml_element.tag[len("urn:eu.europa.ec.eurostat.navtree") + 2 :]
if xml_element_tag == "tree":
return list(
filter(
None,
(toc_to_category_tree(child_element, toc_dataset_json_stub_by_code) for child_element in xml_element),
(
toc_element_to_category_tree(child_element, dataset_json_stubs, dataset_codes_to_convert)
for child_element in xml_element
),
)
)
elif xml_element_tag == "branch":
......@@ -302,7 +388,7 @@ def toc_to_category_tree(xml_element, toc_dataset_json_stub_by_code):
filter(
None,
(
toc_to_category_tree(child_element, toc_dataset_json_stub_by_code)
toc_element_to_category_tree(child_element, dataset_json_stubs, dataset_codes_to_convert)
for child_element in xml_element.iterfind("{*}children/*")
),
)
......@@ -323,9 +409,12 @@ def toc_to_category_tree(xml_element, toc_dataset_json_stub_by_code):
"table",
):
dataset_code = xml_element.findtext("{*}code")
if dataset_code not in dataset_codes_to_convert:
return None
dataset_name = xml_element.findtext("{*}title[@language='en']")
if dataset_code not in toc_dataset_json_stub_by_code:
toc_dataset_json_stub_by_code[dataset_code] = {
if dataset_code not in dataset_json_stubs:
dataset_json_stubs[dataset_code] = {
"code": dataset_code,
"name": dataset_name,
"description": xml_element.findtext("{*}shortDescription[@language='en']") or None,
......@@ -347,7 +436,6 @@ def toc_to_category_tree(xml_element, toc_dataset_json_stub_by_code):
def main():
global args
global timings
datasets_from_env = os.getenv(DATASETS_ENV_VAR)
......@@ -374,7 +462,6 @@ def main():
)
parser.add_argument("--log", default="INFO", help="level of logging messages")
parser.add_argument("--resume", action="store_true", help="do not process already written datasets")
parser.add_argument("--start-from", metavar="DATASET_CODE", help="start converting from dataset code")
args = parser.parse_args()
if not args.source_dir.is_dir():
......@@ -391,81 +478,25 @@ def main():
write_json_file(args.target_dir / "provider.json", provider_json)
# Parse "table_of_contents", abbreviated "toc".
toc_element = etree.parse(str(args.source_dir / "table_of_contents.xml")).getroot()
source_datasets_dir = args.source_dir / datasets_dir_name
# Walk recursively table_of_contents.xml and return category_tree_json.
# Side-effects: fill toc_dataset_json_stub_by_code.
toc_dataset_json_stub_by_code = {}
category_tree_json = toc_to_category_tree(toc_element, toc_dataset_json_stub_by_code)
if category_tree_json:
write_json_file(args.target_dir / "category_tree.json", category_tree_json)
# Build list of datasets codes to convert
datasets_codes_to_convert = set()
for dataset_code in sorted(toc_dataset_json_stub_by_code):
if args.datasets and dataset_code not in args.datasets:
log.debug(
"Skipping dataset %r because it is not mentioned by --datasets option",
dataset_code,
)
continue
if args.start_from is not None and dataset_code < args.start_from:
log.debug("Skipping dataset %r because of --start-from option", dataset_code)
continue
source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code
if not source_dataset_dir.is_dir():
log.error(
"Skipping dataset %s because source directory %s is missing",
dataset_code,
str(source_dataset_dir),
)
continue
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
if not sdmx_file.is_file():
log.error(
"Skipping dataset %s because SDMX file %s is missing",
dataset_code,
str(sdmx_file),
)
continue
dataset_dir = args.target_dir / dataset_code
if args.resume and dataset_dir.is_dir():
log.debug(
"Skipping dataset %r because it already exists (due to --resume option)",
dataset_code,
)
continue
datasets_codes_to_convert.add(dataset_code)
log.info("Converting %d datasets...", len(datasets_codes_to_convert))
# Convert SDMX files. Side-effect: write files for each dataset.
converted_datasets_codes = set()
for index, dataset_code in enumerate(sorted(datasets_codes_to_convert), start=1):
if dataset_code in converted_datasets_codes:
log.debug("Skipping dataset %r because it was already converted", dataset_code)
continue
source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
log.info(
"Converting SDMX source file %d/%d %s (%s)",
index,
len(datasets_codes_to_convert),
sdmx_file,
humanize.naturalsize(sdmx_file.stat().st_size, gnu=True),
datasets_to_convert = list(
iter_datasets_to_convert(
source_datasets_dir, target_dir=args.target_dir, datasets=args.datasets, resume=args.resume
)
)
dataset_dir = args.target_dir / dataset_code
dataset_dir.mkdir(exist_ok=True)
dataset_codes_to_convert = set(dataset_code for (dataset_code, _) in datasets_to_convert)
category_tree_json, dataset_json_stubs = toc_to_category_tree(
source_dir=args.source_dir, dataset_codes_to_convert=dataset_codes_to_convert
)
dataset_json_stub = toc_dataset_json_stub_by_code[dataset_code]
convert_sdmx_file(dataset_json_stub, sdmx_file, dataset_dir)
convert_datasets(
datasets_to_convert=datasets_to_convert, dataset_json_stubs=dataset_json_stubs, target_dir=args.target_dir
)
converted_datasets_codes.add(dataset_code)
log.info("Writing category tree...")
write_json_file(args.target_dir / "category_tree.json", category_tree_json)
return 0
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment