Newer
Older
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Convert Eurostat provider, categories, datasets and time series to DBnomics JSON and TSV files."""
from toolz import get_in, valmap
"terms_of_use": "http://ec.europa.eu/eurostat/about/policies/copyright",
"website": "http://ec.europa.eu/eurostat/home",
}
args = None # Will be defined by main().
log = logging.getLogger(__name__)
namespace_url_by_name = {"xml": "http://www.w3.org/XML/1998/namespace"}
DAILY_PERIOD_RE = re.compile(r"(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})")
DATASETS_ENV_VAR = 'DATASETS'
FULL_ENV_VAR = 'FULL'
def convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file):
# Due to event=end, given to iterparse, we receive <Obs> then <Series> elements, in this order.
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
if element.tag.endswith("Series"):
# Ignore some specific XML element attributes corresponding to series SDMX attributes,
# because series SDMX attributes do not exist in DBnomics.
series_element_attributes = OrderedDict([
(attribute_key, attribute_value)
for attribute_key, attribute_value in element.attrib.items()
if attribute_key not in {"TIME_FORMAT"} # Redundant with FREQ.
])
dimensions_codes_order = list(series_element_attributes.keys())
if dataset_json["dimensions_codes_order"] is None:
dataset_json["dimensions_codes_order"] = dimensions_codes_order
else:
# dimensions_codes_order must not change between series.
assert dataset_json["dimensions_codes_order"] == dimensions_codes_order, \
(dataset_json["dimensions_codes_order"], dimensions_codes_order)
# Fill series dimensions labels in dataset.json.
t0 = time.time()
for dimension_code, dimension_value_code in series_element_attributes.items():
if dimension_code not in dataset_json["dimensions_labels"]:
dimension_label = dsd_infos["concepts"].get(dimension_code)
if dimension_label and dimension_code not in dataset_json["dimensions_labels"]:
# Some dimensions labels are an empty string: e.g. bs_bs12_04.sdmx.xml
dataset_json["dimensions_labels"][dimension_code] = dimension_label
if dimension_code in dataset_json["dimensions_values_labels"] and \
dimension_value_code in dataset_json["dimensions_values_labels"][dimension_code]:
continue
codelist_code = dsd_infos["codelist_by_concept"][dimension_code]
dimension_value_label = get_in([codelist_code, dimension_value_code], dsd_infos["codelists"])
if dimension_value_label:
dataset_json["dimensions_values_labels"].setdefault(
dimension_code, {})[dimension_value_code] = dimension_value_label
timings["series_labels"] += time.time() - t0
# Series code is not defined by provider: create it from dimensions values codes.
series_code = ".".join(
series_element_attributes[dimension_code]
for dimension_code in dimensions_codes_order
)
observations_header = [["PERIOD", "VALUE"] + dsd_infos["attributes"]]
frequency = series_element_attributes["FREQ"]
observations_body = list(iter_normalized_observations(dataset_context["current_series_observations"], frequency)) \
if frequency == "D" \
else dataset_context["current_series_observations"]
series_json = {
"code": series_code,
"dimensions": [
series_element_attributes[dimension_code] # Every dimension MUST be defined for each series.
for dimension_code in dimensions_codes_order
],
"observations": observations_header + observations_body,
json.dump(series_json, series_jsonl_file, ensure_ascii=False, sort_keys=True)
series_jsonl_file.write("\n")
timings["series_file"] += time.time() - t0
# Reset context for next series.
dataset_context["current_series_observations"] = []
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
elif element.tag.endswith("Obs"):
# Fill observations attributes labels in dataset.json.
t0 = time.time()
for attribute_code, attribute_value_code in element.attrib.items():
# Ignore period and value observations XML attributes, because they don't need labels.
if attribute_code in ["TIME_PERIOD", "OBS_VALUE"]:
continue
attribute_label = dsd_infos["concepts"].get(attribute_code)
if attribute_label and attribute_code not in dataset_json["attributes_labels"]:
dataset_json["attributes_labels"][attribute_code] = attribute_label
# Some attributes values codes are multi-valued and concatenated into the same string.
attribute_value_codes = list(attribute_value_code) \
if attribute_code == "OBS_STATUS" \
else [attribute_value_code]
for attribute_value_code in attribute_value_codes:
if attribute_code in dataset_json["attributes_values_labels"] and \
attribute_value_code in dataset_json["attributes_values_labels"][attribute_code]:
continue
codelist_code = dsd_infos["codelist_by_concept"][attribute_code]
attribute_value_label = get_in([codelist_code, attribute_value_code], dsd_infos["codelists"])
if attribute_value_label:
dataset_json["attributes_values_labels"].setdefault(
attribute_code, {})[attribute_value_code] = attribute_value_label
timings["observations_labels"] += time.time() - t0
obs_value = observations.value_to_float(element.attrib.get("OBS_VALUE"))
dataset_context["current_series_observations"].append([
element.attrib["TIME_PERIOD"], # Will be normalized later, if needed.
obs_value,
] + [
element.attrib.get(attribute_name, "")
for attribute_name in dsd_infos["attributes"]
])
elif element.tag.endswith("Extracted"):
dataset_json["updated_at"] = element.text + "Z" # Assume the value is UTC time.
def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, dataset_dir: Path):
global timings
timings = {
k: 0
for k in {"series_labels", "series_file", "observations_labels", "dsd_infos"}
assert dataset_json_stub.get("name"), dataset_json_stub
assert dataset_dir.is_dir(), dataset_dir
dataset_code = dataset_json_stub["code"]
dsd_file_path = args.source_dir / datasets_dir_name / dataset_code / "{}.dsd.xml".format(dataset_code)
dsd_element = etree.parse(str(dsd_file_path)).getroot()
# Initialize dataset.json data
dataset_json = {
"attributes_labels": {}, # Will be defined by each series.
"attributes_values_labels": {}, # Will be defined by each series.
"dimensions_codes_order": None, # Will be defined by first series.
"dimensions_labels": {}, # Will be defined by each series.
"dimensions_values_labels": {}, # Will be defined by each series.
dataset_json.update(dataset_json_stub)
dsd_infos = {
"attributes": [
element.attrib["conceptRef"]
for element in dsd_element.iterfind(".//{*}Attribute[@attachmentLevel='Observation']")
],
"codelists": {
element.attrib["id"]: {
code_element.attrib["value"]: code_element.findtext(
"./{*}Description[@xml:lang='en']", namespaces=namespace_url_by_name)
for code_element in element.iterfind("./{*}Code")
}
for element in dsd_element.iterfind('.//{*}CodeList')
},
"concepts": {
element.attrib["id"]: element.findtext("./{*}Name[@xml:lang='en']", namespaces=namespace_url_by_name)
for element in dsd_element.iterfind('.//{*}Concept')
},
"codelist_by_concept": {
element.attrib["conceptRef"]: element.attrib["codelist"]
for element in dsd_element.find(".//{*}Components")
if "conceptRef" in element.attrib and "codelist" in element.attrib
},
}
timings["dsd_infos"] += time.time() - t0
with (dataset_dir / "series.jsonl").open("w") as series_jsonl_file:
dataset_context = {
"current_series_observations": [],
}
# Side-effects: mutate dataset_context, write files.
context = etree.iterparse(str(sdmx_file), events=["end"])
for event, element in context:
convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file)
if event == "end":
# Inspired from fast_iter, cf https://www.ibm.com/developerworks/xml/library/x-hiperfparse/
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]
continue
del context
write_json_file(dataset_dir / "dataset.json", without_falsy_values(dataset_json))
log.debug("timings: {} total: {:.3f}".format(valmap("{:.3f}".format, timings), sum(timings.values())))
def iter_normalized_observations(observations, frequency):
for observation in observations:
period = observation[0]
if frequency == "D":
period = normalize_period(period, frequency)
yield [period] + observation[1:]
def normalize_period(s, frequency):
if frequency == "D":
m = DAILY_PERIOD_RE.match(s)
if m:
return '{}-{}-{}'.format(m.group('year'), m.group('month'), m.group('day'))
return s
def toc_to_category_tree(xml_element, toc_dataset_json_stub_by_code):
"""Walk recursively xml_element (table_of_contents.xml) and return category_tree_json.
Side-effects: fill toc_dataset_json_stub_by_code.
"""
xml_element_tag = xml_element.tag[len("urn:eu.europa.ec.eurostat.navtree") + 2:]
if xml_element_tag == "tree":
toc_to_category_tree(child_element, toc_dataset_json_stub_by_code)
for child_element in xml_element
)))
elif xml_element_tag == "branch":

Bruno Duyé
committed
code = xml_element.findtext("{*}code")
if code == 'DS-1062396':
# Remove "International trade in goods - detailed data" branch from category tree as it doesn't contains any data here.
# https://git.nomics.world/dbnomics-fetchers/management/issues/435
return None
toc_to_category_tree(child_element, toc_dataset_json_stub_by_code)
for child_element in xml_element.iterfind("{*}children/*")
)))

Bruno Duyé
committed
"code": code,
"children": children,
}) if children else None
elif (xml_element_tag == "leaf" and xml_element.attrib["type"] in ("dataset", "table")):
dataset_code = xml_element.findtext("{*}code")
dataset_name = xml_element.findtext("{*}title[@language='en']")
if dataset_code not in toc_dataset_json_stub_by_code:
toc_dataset_json_stub_by_code[dataset_code] = {
"code": dataset_code,
"name": dataset_name,
"description": xml_element.findtext("{*}shortDescription[@language='en']") or None,
"doc_href": xml_element.findtext("{*}metadata[@format='html']") or None,
}
return {
"code": dataset_code,
"name": dataset_name,
}
else:
log.warning("Unexpected node type: {!r}, type {!r} (code {!r})".format(xml_element_tag, xml_element.attrib["type"],
xml_element.findtext("{*}code")))
return None
datasets_from_env = os.getenv(DATASETS_ENV_VAR)
if datasets_from_env:
datasets_from_env = datasets_from_env.split(",")
parser.add_argument('source_dir', type=Path,
help='path of source directory containing Eurostat series in source format')
parser.add_argument('target_dir', type=Path, help='path of target directory containing datasets & '
'series in DBnomics JSON and TSV formats')
parser.add_argument('--datasets', nargs='+', metavar='DATASET_CODE',
default=datasets_from_env,
help='convert only the given datasets (datasets codes, space separated)')
parser.add_argument('--full', action='store_true', default=os.getenv(FULL_ENV_VAR),
help='convert all datasets; default behavior is to convert what changed since last commit')
parser.add_argument('--log', default='INFO', help='level of logging messages')
parser.add_argument('--resume', action='store_true', help='do not process already written datasets')
parser.add_argument('--start-from', metavar='DATASET_CODE', help='start indexing from dataset code')
if not args.source_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.source_dir)))
if not args.target_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.target_dir)))
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: {}'.format(args.log))
logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
if args.datasets:
args.full = True
# Ask Git which datasets directories were modified in latest commit in source-data repository.
if not args.full:
try:
output = subprocess.check_output(["git", "diff", "--name-status", "HEAD^", datasets_dir_name],
cwd=str(args.source_dir), universal_newlines=True)
except subprocess.CalledProcessError:
args.full = True
else:
modified_datasets_codes = set()
deleted_datasets_codes = set()
for line in StringIO(output):
action, file_path = line.strip().split()
try:
dataset_code = Path(file_path).parent.relative_to(datasets_dir_name).name
except ValueError:
continue
if action in {"A", "M"}:
modified_datasets_codes.add(dataset_code)
else:
assert action == "D", action
deleted_datasets_codes.add(dataset_code)
log.info("%d datasets were modified and %d were deleted by last download",
len(modified_datasets_codes), len(deleted_datasets_codes))
log.info("Mode: %s", "full" if args.full else "incremental")
# Parse "table_of_contents", abbreviated "toc".
toc_element = etree.parse(str(args.source_dir / "table_of_contents.xml")).getroot()
# Walk recursively table_of_contents.xml and return category_tree_json.
# Side-effects: fill toc_dataset_json_stub_by_code.
toc_dataset_json_stub_by_code = {}
category_tree_json = toc_to_category_tree(toc_element, toc_dataset_json_stub_by_code)
# Build list of datasets codes to convert
datasets_codes_to_convert = set()
for dataset_code in sorted(toc_dataset_json_stub_by_code):
if args.datasets and dataset_code not in args.datasets:
log.debug("Skipping dataset %r because it is not mentioned by --datasets option", dataset_code)
if not args.full and dataset_code not in modified_datasets_codes:
log.debug("Skipping dataset %r because it was not modified by last download (due to incremental mode)",
dataset_code)
if args.start_from is not None and dataset_code < args.start_from:
log.debug("Skipping dataset %r because of --start-from option", dataset_code)
continue
source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code
if not source_dataset_dir.is_dir():
log.error("Skipping dataset %s because source directory %s is missing",
dataset_code, str(source_dataset_dir))
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
if not sdmx_file.is_file():
log.error("Skipping dataset %s because SDMX file %s is missing", dataset_code, str(sdmx_file))
dataset_dir = args.target_dir / dataset_code
if args.resume and (dataset_dir / "dataset.json").is_file():
log.debug("Skipping dataset %r because it already exists (due to --resume option)", dataset_code)
continue
datasets_codes_to_convert.add(dataset_code)
log.info("Converting %d datasets...", len(datasets_codes_to_convert))
# Remove directories of datasets to be converted before converting.
if not args.resume:
datasets_codes_to_delete = datasets_codes_to_convert
if not args.full:
datasets_codes_to_delete = datasets_codes_to_delete.union(deleted_datasets_codes)
log.info("Removing directories of deleted datasets and datasets to be converted: %r", datasets_codes_to_delete)
for dataset_code in datasets_codes_to_delete:
dataset_dir = args.target_dir / dataset_code
if dataset_dir.is_dir():
shutil.rmtree(str(dataset_dir))
# Convert SDMX files. Side-effect: write files for each dataset.
converted_datasets_codes = set()
for index, dataset_code in enumerate(sorted(datasets_codes_to_convert), start=1):
if dataset_code in converted_datasets_codes:
log.debug("Skipping dataset %r because it was already converted", dataset_code)
continue
source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
log.info("Converting SDMX source file %d/%d %s (%s)", index, len(datasets_codes_to_convert), sdmx_file,
humanize.naturalsize(sdmx_file.stat().st_size, gnu=True))
dataset_dir = args.target_dir / dataset_code
dataset_dir.mkdir(exist_ok=True)
dataset_json_stub = toc_dataset_json_stub_by_code[dataset_code]
convert_sdmx_file(dataset_json_stub, sdmx_file, dataset_dir)
converted_datasets_codes.add(dataset_code)
write_json_file(args.target_dir / "provider.json", provider_json)
if category_tree_json:
write_json_file(args.target_dir / "category_tree.json", category_tree_json)
return {
k: v
for k, v in mapping.items()
if v
}
def write_json_file(path, data):
with path.open("w") as f:
json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)