Commit 0dcc460f authored by Christophe Benz's avatar Christophe Benz

Load series from JSON-lines by scanning if seeking fails

parent c8c4ac1f
Pipeline #47736 passed with stage
in 24 seconds
# Changelog
### 0.12.10 -> 0.12.11
Non-breaking changes:
- in `FileSystemDatasetDir.iter_observations_from_jsonl`, load series from JSON-lines by scanning if seeking fails
### 0.12.9 -> 0.12.10
Non-breaking changes:
......@@ -92,7 +92,7 @@ class FileSystemDatasetDir(AbstractDatasetDir):
return (self.path / name).is_file()
def iter_observations_from_jsonl(self, series_codes, offset_by_series_code={}):
def iter_observations_by_scanning(fp, series_codes):
def iter_observations_by_scanning(fp, series_codes: set):
nb_yielded = 0
for line in fp:
if not line:
......@@ -101,44 +101,50 @@ class FileSystemDatasetDir(AbstractDatasetDir):
series_code = series_json["code"]
if series_code in series_codes:
nb_yielded += 1
log.debug("Found series %r by scanning %r", self.series_id_str(
series_code), str(series_jsonl_file_path))
yield (series_code, series_json["observations"])
if nb_yielded == len(series_codes):
def iter_observations_by_seeking(fp, offset_by_series_code):
"""Yield (exception, series_code, observations)"""
for series_code, offset in offset_by_series_code.items():
line = next(fp)
series_json = json.loads(line)
except ValueError:
log.error("Could not decode JSON line for series ID %r using offset %d, skipping series. "
"Hint: Solr index is probably not up-to-date.", self.series_id_str(series_code), offset)
except ValueError as exc:
# Hint: Solr index is probably not up-to-date.
exc1 = ValueError("Could not decode JSON line for series ID {!r} using offset {}. Reason: {}".format(
self.series_id_str(series_code), offset, str(exc)))
yield (exc1, series_code, None)
if series_json["code"] != series_code:
log.error("Decoded JSON line for series ID %r using offset %d returned unexpected series code %r, skipping series. "
"Hint: Solr index is probably not up-to-date.",
self.series_id_str(series_code), offset, series_json["code"])
exc = ValueError("Wrong series code for series ID {!r}: found {!r} using offset {}".format(
self.series_id_str(series_code), series_json["code"], offset))
yield (exc, series_code, None)
log.debug("Found series %r by seeking %r using offset %d",
self.series_id_str(series_code), str(series_jsonl_file_path), offset)
yield (series_code, series_json["observations"])
yield (None, series_code, series_json["observations"])
observations_by_series_code = {}
series_jsonl_file_path = self.path / self.get_series_jsonl_file_name()
with as fp:
if offset_by_series_code:
observations_by_series_code.update(iter_observations_by_seeking(fp, offset_by_series_code))
series_codes_without_offsets = [
for series_code in series_codes
if series_code not in offset_by_series_code
if series_codes_without_offsets:
observations_by_series_code.update(iter_observations_by_scanning(fp, series_codes_without_offsets))
failed_series_codes = set()
if offset_by_series_code:
with as fp:
for exc, series_code, observations in iter_observations_by_seeking(fp, offset_by_series_code):
if exc is None:
observations_by_series_code[series_code] = observations
# TODO Send exception to error logging application.
# Remaining series are series that were not indexed in Solr (so no offset known) or that could not be loaded by seeking in JSON-lines.
series_codes_without_offset = set(series_codes) - set(offset_by_series_code.keys())
remaining_series = series_codes_without_offset.union(failed_series_codes)
if remaining_series:
with as fp:
observations_by_series_code.update(iter_observations_by_scanning(fp, remaining_series))
for series_code in series_codes:
observations = observations_by_series_code.get(series_code)
......@@ -40,7 +40,7 @@ doc_lines = __doc__.split('\n')
author='DBnomics Team',
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment