Newer
Older
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Convert Eurostat provider, categories, datasets and time series to DBnomics JSON and TSV files."""
from collections import defaultdict
from contexttimer import Timer
from humanfriendly import format_timespan
from more_itertools import one
from sdmx.message import StructureMessage
from sdmx.model.common import Dimension
from sdmx.model.v21 import DataflowDefinition, DataStructureDefinition
from eurostat_fetcher.convert_utils import iter_series_json
"terms_of_use": "https://ec.europa.eu/eurostat/about/policies/copyright",
"website": "https://ec.europa.eu/eurostat",
log = logging.getLogger(__name__)
namespace_url_by_name = {"xml": "http://www.w3.org/XML/1998/namespace"}
DAILY_PERIOD_RE = re.compile(r"(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})")
def convert_datasets(
datasets_to_convert: list[tuple[str, Path]], dataset_json_stubs: dict[str, dict], target_dir: Path
) -> None:
log.info("Converting %d datasets...", len(datasets_to_convert))
converted_datasets_codes = set()
for index, (dataset_code, source_dataset_dir) in enumerate(sorted(datasets_to_convert), start=1):
if dataset_code in converted_datasets_codes:
log.debug("Skipping dataset %r because it was already converted", dataset_code)
continue
log.info("Converting dataset %d/%d %r", index, len(datasets_to_convert), dataset_code)
dataset_dir = target_dir / dataset_code
dataset_dir.mkdir(exist_ok=True)
dataset_json_stub = dataset_json_stubs[dataset_code]
convert_dataset(dataset_code, dataset_json_stub, source_dataset_dir, dataset_dir)
converted_datasets_codes.add(dataset_code)
def convert_dataset(
dataset_code: str, dataset_json_stub: dict[str, Any], source_dataset_dir: Path, dataset_dir: Path
) -> None:
dsd_file_path = source_dataset_dir / f"{dataset_code}.dsd.xml"
with Timer() as timer:
dataflow_message = cast(StructureMessage, sdmx.read_sdmx(dsd_file_path))
log.debug("%s file was read in %s", str(dsd_file_path), format_timespan(timer.elapsed))
dataflow_definition = cast(DataflowDefinition, one(dataflow_message.dataflow.values()))
data_structure_definition = cast(DataStructureDefinition, one(dataflow_message.structure.values()))
sdmx_file_path = source_dataset_dir / f"{dataset_code}.sdmx.xml"
dimensions_codes_order = [
dimension.id for dimension in data_structure_definition.dimensions if isinstance(dimension, Dimension)
]
attribute_codes = sorted(attribute.id for attribute in data_structure_definition.attributes)
used_attribute_value_codes: defaultdict[str, set[str]] = defaultdict(set)
used_dimension_value_codes: defaultdict[str, set[str]] = defaultdict(set)
with Timer() as timer, series_jsonl_file.open("w") as series_jsonl_fp:
iterparse_context = etree.iterparse(sdmx_file_path, huge_tree=True, tag="{*}Series", events=["end"])
for series_json in sorted(
iter_series_json(
iterparse_context,
attribute_codes=attribute_codes,
dimensions_codes_order=dimensions_codes_order,
used_attribute_value_codes=used_attribute_value_codes,
used_dimension_value_codes=used_dimension_value_codes,
),
key=lambda series_json: series_json["code"],
):
json.dump(series_json, series_jsonl_fp, ensure_ascii=False, sort_keys=True)
series_jsonl_fp.write("\n")
log.debug("Series were written to %s in %s", str(series_jsonl_file), format_timespan(timer.elapsed))
with Timer() as timer:
updated_at = extract_updated_at(dataflow_definition)
dataset_json = {
**dataset_json_stub,
"attributes_labels": {
attribute.id: attribute.concept_identity.name["en"]
for attribute in data_structure_definition.attributes
},
"attributes_values_labels": {
attribute.id: {
k: v.name["en"]
for k, v in attribute.local_representation.enumerated.items.items()
if k in used_attribute_value_codes[attribute.id]
}
for attribute in data_structure_definition.attributes
},
"dimensions_codes_order": dimensions_codes_order,
"dimensions_labels": {
dimension.id: dimension.concept_identity.name["en"]
for dimension in data_structure_definition.dimensions
if isinstance(dimension, Dimension)
},
"dimensions_values_labels": {
dimension.id: {
k: v.name["en"]
for k, v in dimension.local_representation.enumerated.items.items()
if k in used_dimension_value_codes[dimension.id]
}
for dimension in data_structure_definition.dimensions
if isinstance(dimension, Dimension)
},
"updated_at": updated_at.isoformat() if updated_at is not None else None,
}
dataset_json_file = dataset_dir / "dataset.json"
write_json_file(dataset_json_file, without_falsy_values(dataset_json))
log.debug("Dataset metadata was written to %s in %s", str(dataset_json_file), format_timespan(timer.elapsed))
def extract_updated_at(dataflow_definition: DataflowDefinition) -> datetime | None:
return max(
(
for annotation in dataflow_definition.annotations
if annotation.type in {"UPDATE_DATA", "UPDATE_STRUCTURE"}
),
default=None,
)
def iter_child_directories(directory: Path) -> Iterator[Path]:
"""Iterate over child directories of a directory."""
for child in directory.iterdir():
if child.is_dir():
yield child
def iter_datasets_to_convert(
source_datasets_dir: Path, target_dir: Path, *, datasets, resume
for source_dataset_dir in sorted(iter_child_directories(source_datasets_dir)):
dataset_code = source_dataset_dir.name
if datasets and dataset_code not in datasets:
log.debug(
"Skipping dataset %r because it is not mentioned by --datasets option",
dataset_code,
)
continue
sdmx_file = source_dataset_dir / f"{dataset_code}.sdmx.xml"
if not sdmx_file.is_file():
log.error(
"Skipping dataset %s because SDMX file %s is missing",
dataset_code,
str(sdmx_file),
)
continue
dataset_dir = target_dir / dataset_code
if resume and dataset_dir.is_dir():
log.debug(
"Skipping dataset %r because it already exists (due to --resume option)",
dataset_code,
)
continue
yield dataset_code, source_dataset_dir
def toc_to_category_tree(source_dir: Path):
"""Walk recursively table_of_contents.xml and return category_tree_json and dataset.json stubs."""
# Parse "table_of_contents", abbreviated "toc".
toc_element = etree.parse(str(source_dir / "table_of_contents.xml")).getroot()
dataset_json_stubs = {}
category_tree_json = list(iter_category_tree_nodes(toc_element, dataset_json_stubs))
return category_tree_json, dataset_json_stubs
def iter_category_tree_nodes(xml_element, dataset_json_stubs) -> Iterator[dict[str, Any]]:
"""Walk recursively xml_element (table_of_contents.xml) and return category_tree_json.
Side-effects: fill dataset_json_stubs.
xml_element_tag = xml_element.tag[len("urn:eu.europa.ec.eurostat.navtree") + 2 :]
if xml_element_tag == "tree":
for child_element in xml_element:
yield from iter_category_tree_nodes(child_element, dataset_json_stubs)
elif xml_element_tag == "branch":
children = [
child
for child_element in xml_element.iterfind("{*}children/*")
for child in iter_category_tree_nodes(child_element, dataset_json_stubs)
]
if children:
yield without_falsy_values(
"code": xml_element.findtext("{*}code"),
"name": xml_element.findtext("{*}title[@language='en']"),
"children": children,
}
)
elif xml_element_tag == "leaf" and xml_element.attrib["type"] in {"dataset", "table"}:
dataset_code = xml_element.findtext("{*}code")
dataset_name = xml_element.findtext("{*}title[@language='en']")
# Datasets can appear multiple time in the category tree
if dataset_code not in dataset_json_stubs:
dataset_json_stubs[dataset_code] = {
"code": dataset_code,
"name": dataset_name,
"description": xml_element.findtext("{*}shortDescription[@language='en']") or None,
"doc_href": xml_element.findtext("{*}metadata[@format='html']") or None,
}
yield {
"code": dataset_code,
"name": dataset_name,
}
for child_element in xml_element.iterfind("{*}children/*"):
yield from iter_category_tree_nodes(child_element, dataset_json_stubs)
else:
"Unexpected node type: %r, type %r (code %r)",
xml_element_tag,
xml_element.attrib["type"],
xml_element.findtext("{*}code"),
datasets_from_env = os.getenv(DATASETS_ENV_VAR)
if datasets_from_env:
datasets_from_env = datasets_from_env.split(",")
parser.add_argument(
"source_dir",
type=Path,
help="path of source directory containing Eurostat series in source format",
)
parser.add_argument(
"target_dir",
type=Path,
help="path of target directory containing datasets & series in DBnomics JSON and TSV formats",
)
parser.add_argument(
"--datasets",
nargs="+",
metavar="DATASET_CODE",
default=datasets_from_env,
help="convert only the given datasets (datasets codes, space separated)",
)
parser.add_argument("--log", default="INFO", help="level of logging messages")
parser.add_argument("--resume", action="store_true", help="do not process already written datasets")
parser.error(f"Could not find directory {str(args.source_dir)!r}")
parser.error(f"Could not find directory {str(args.target_dir)!r}")
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
msg = f"Invalid log level: {args.log}"
raise ValueError(msg)
logging.basicConfig()
log.setLevel(numeric_level)
log.info("Command-line arguments: %r", args)
write_json_file(args.target_dir / "provider.json", provider_json)
source_datasets_dir = args.source_dir / datasets_dir_name
datasets_to_convert = list(
iter_datasets_to_convert(
source_datasets_dir, target_dir=args.target_dir, datasets=args.datasets, resume=args.resume
category_tree_json, dataset_json_stubs = toc_to_category_tree(source_dir=args.source_dir)
convert_datasets(
datasets_to_convert=datasets_to_convert, dataset_json_stubs=dataset_json_stubs, target_dir=args.target_dir
)
log.info("Writing category tree...")
write_json_file(args.target_dir / "category_tree.json", category_tree_json)
with path.open("w") as f:
json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)