Commit b8b2fc67 authored by Michel Juillard's avatar Michel Juillard
Browse files

chenges from *.tsv to series.jsonl

parent 653f1d01
Pipeline #309138 passed with stages
in 2 minutes and 27 seconds
......@@ -162,22 +162,31 @@ class CSVLineHelper:
@staticmethod
def norm_obs_values(obs_values):
return ["NA" if elt in ("", "-", "***") else elt for elt in obs_values]
return ['"NA"' if elt in ("", "-", "***", "X") else elt for elt in obs_values]
def generate_tsv_from_csv_row(self, cols):
"""Generates all TSV from a CSV line"""
def generate_jsonl_from_csv_row(self, cols, so_val, concept_val, jsonl_fd):
"""Generates series.jsonl from a CSV line"""
ts_infos = []
row_code = cols[0]
row_name = cols[1].strip()
for period_info in self.period_infos:
for i, period_info in enumerate(self.period_infos):
if i > 0:
jsonl_fd.write('\n')
code = period_info["code"]
ts_code = "{}.{}.{}".format(self.csv_code, row_code, code)
jsonl_fd.write('{{"code":"{}"'.format(ts_code))
ts_name = "{} - {} ({})".format(
self.csv_name, row_name, PERIOD_CODE_TO_LABEL[code]
)
jsonl_fd.write(',"name":"{}"'.format(ts_name))
jsonl_fd.write(',"dimensions":{{"frequency":"{}"'.format(period_info["freq"]))
jsonl_fd.write(',"seasonal_adjustment":"{}"'.format(so_val))
if concept_val:
jsonl_fd.write(',"concept":"{}"'.format(concept_val))
jsonl_fd.write('}')
period_values = period_info["norm_period_values"]
ci = period_info["col_interval"]
......@@ -185,23 +194,23 @@ class CSVLineHelper:
assert len(period_values) == len(obs_values)
# Prepare data to be written
header = ["PERIOD", "VALUE"]
value_list = [period_values, obs_values]
status = False
if any([status != "" for status in period_info["obs_status"]]):
header.append("OBS_STATUS")
value_list.append(period_info["obs_status"])
# Write TSV file
tsv_filepath = self.ds_dir / "{}.tsv".format(ts_code)
with tsv_filepath.open("w", encoding="utf-8") as tsv_fd:
tsv_fd.write("\t".join(header) + "\n")
for t in zip(*value_list):
tsv_fd.write("\t".join(t) + "\n")
# Append to ts_infos
ts_infos.append((ts_code, ts_name, {"frequency": period_info["freq"]}))
return ts_infos
jsonl_fd.write(',"observations":[["PERIOD","VALUE","OBS_STATUS"]')
status = True
else:
jsonl_fd.write(',"observations":[["PERIOD","VALUE"]')
status = False
# Write observations
for obs in zip(*value_list):
if status:
jsonl_fd.write(',["{}",{},"{}"]'.format(*obs))
else:
jsonl_fd.write(',["{}",{}]'.format(*obs))
jsonl_fd.write(']}')
def extract_concept(csv_name):
"""extract 'IP' (Industrial production) from
......@@ -221,33 +230,39 @@ def extract_concept(csv_name):
def extract_timeseries_from_csv(
csv_code, csv_filepath: Path, ds_dir: Path, with_concept=False
csv_code, csv_filepath: Path, ds_dir: Path, with_concept=False
):
"""
Extracts time series from csv_filepath
Generates tsv files
Returns time series info to write dataset.json
"""
series_info: List = []
with csv_filepath.open("rt", encoding="ascii") as csv_fd:
jsonl_filepath = ds_dir / "series.jsonl"
with csv_filepath.open("rt", encoding="ascii") as csv_fd, \
jsonl_filepath.open("w", encoding="utf-8") as jsonl_fd:
csv_name = None
in_data = False
csv_lh = None
reader = csv.reader(csv_fd, delimiter=",", quotechar='"')
# First line: get csv name
first_row = next(reader)
csv_name = first_row[0]
# Other dimensions
so_val = csv_code[7].upper()
concept_val = extract_concept(csv_name) if with_concept else None
# Reads input CSV line by line
for cols in reader:
# Most frequently use case
if in_data:
if csv_lh:
series_info.extend(csv_lh.generate_tsv_from_csv_row(cols))
continue
# First line: get csv name
if csv_name is None:
csv_name = cols[0]
csv_lh.generate_jsonl_from_csv_row(cols, so_val,
concept_val, jsonl_fd)
jsonl_fd.write('\n')
continue
# Header
......@@ -257,20 +272,6 @@ def extract_timeseries_from_csv(
in_data = True
continue
# Adds dimensions values for all ts in the same CSV
_series_info = []
so_val = csv_code[7].upper()
# print('csv_code = [{}], seasonal_adjustment = [{}]'.format(csv_code, so_val))
concept_val = extract_concept(csv_name) if with_concept else None
for si in series_info:
dim_dict = si[2]
dim_dict["seasonal_adjustment"] = so_val
if with_concept:
dim_dict["concept"] = concept_val
_series_info.append((si[0], si[1], dim_dict))
return _series_info
def generate_dataset(
ds_code, source_dir: Path, ds_name, target_dir: Path, with_concept=False
):
......@@ -290,13 +291,11 @@ def generate_dataset(
for csv_filepath in sorted(source_dir.glob("*.csv")):
csv_code = csv_filepath.stem
series_info.extend(
extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept)
)
extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept)
datasetjson_filepath = ds_dir / "dataset.json"
write_dataset_json(
datasetjson_filepath, ds_code, ds_name, series_info, with_concept
datasetjson_filepath, ds_code, ds_name, with_concept
)
......@@ -326,7 +325,6 @@ def write_dataset_json(
"frequency",
"seasonal_adjustment",
],
"series": [],
}
if with_concept:
......@@ -340,10 +338,6 @@ def write_dataset_json(
}
dataset_data["dimensions_codes_order"].append("concept")
for si in series_info:
series_dict = dict(code=si[0], name=si[1], dimensions=si[2])
dataset_data["series"].append(series_dict)
cu.write_json_file(json_filepath, dataset_data)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment