Commit c542be2d authored by Pierre Dittgen's avatar Pierre Dittgen
Browse files

Modernizing code

parent aae7887f
Pipeline #19875 passed with stage
in 17 seconds
......@@ -25,20 +25,14 @@
import argparse
import csv
import logging
import os
import shutil
import convert_util as cu
LOG = logging.Logger('meti fetcher')
import sys
from pathlib import Path
import convert_util as cu
DATA_MODEL_VERSION = '0.7.8'
log = logging.Logger(__name__)
DATAPACKAGE_JSON = {
"dbnomics": {
"data_model_version": DATA_MODEL_VERSION
}
}
PROVIDER_JSON = dict(
code='METI',
......@@ -76,11 +70,6 @@ CONCEPT_TO_CODE_MAP = {
}
def csv_filename_to_id(filename):
""" Return 'gom2e' from b2010_gom2e.csv """
return filename[:-4]
def generate_tsv_from_csv_data_line(csv_code, csv_name, cols, ds_dir):
""" Parse data, generates time series dans return series info list """
# TODO
......@@ -111,6 +100,9 @@ def norm_period_values(period_values, freq):
2007FY, ... 2017FY -> 2007, ... 2017
2007Q1, ... 2017Q4 -> 2007-Q1, ... 2017-Q4
"""
# sanitize period_values (trim 'p ' at the beginning of the period value)
period_values = [p.lstrip('p ') for p in period_values]
if freq == 'A':
return [p[:4] for p in period_values]
elif freq in ('Q', 'M'):
......@@ -122,7 +114,7 @@ def norm_period_values(period_values, freq):
class CSVLineHelper:
""" Handy class that helps parsing CSV time series """
def __init__(self, csv_code, csv_name, ds_dir, header_cols):
def __init__(self, csv_code, csv_name, ds_dir: Path, header_cols):
""" Initializes instance computing period_infos """
self.csv_code = csv_code
......@@ -157,6 +149,7 @@ class CSVLineHelper:
assert not code in code_set, "Code [{}] already found".format(code)
p_info = {
'col_interval': (p[0]+3, p[1]+3),
'obs_status': ['p' if p.startswith('p ') else '' for p in p[2]],
'norm_period_values': norm_period_values(p[2], freq),
'freq': freq,
'code': code,
......@@ -186,12 +179,19 @@ class CSVLineHelper:
obs_values = CSVLineHelper.norm_obs_values(cols[ci[0]:ci[1]])
assert len(period_values) == len(obs_values)
# Prepare data to be written
header = ['PERIOD', 'VALUE']
value_list = [period_values, obs_values]
if any([status != '' for status in period_info['obs_status']]):
header.append('OBS_STATUS')
value_list.append(period_info['obs_status'])
# Write TSV file
tsv_filepath = os.path.join(self.ds_dir, '{}.tsv'.format(ts_code))
with open(tsv_filepath, mode='w', encoding='utf-8') as tsv_fd:
tsv_fd.write('PERIOD\tVALUE\n')
for p_val, obs_val in zip(period_values, obs_values):
tsv_fd.write('{}\t{}\n'.format(p_val, obs_val))
tsv_filepath = self.ds_dir / '{}.tsv'.format(ts_code)
with tsv_filepath.open('w', encoding='utf-8') as tsv_fd:
tsv_fd.write('\t'.join(header) + '\n')
for t in zip(*value_list):
tsv_fd.write('\t'.join(t) + '\n')
# Append to ts_infos
ts_infos.append((ts_code, ts_name, {'frequency': period_info['freq']}))
......@@ -215,14 +215,14 @@ def extract_concept(csv_name):
return CONCEPT_TO_CODE_MAP[chunk]
def extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept=False):
def extract_timeseries_from_csv(csv_code, csv_filepath: Path, ds_dir: Path, with_concept=False):
"""
Extracts time series from csv_filepath
Generates tsv files
Returns time series info to write dataset.json
"""
series_info = []
with open(csv_filepath, mode='r', encoding='ascii') as csv_fd:
with csv_filepath.open('rt', encoding='ascii') as csv_fd:
csv_name = None
in_data = False
csv_lh = None
......@@ -252,7 +252,7 @@ def extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept=Fal
# Adds dimensions values for all ts in the same CSV
_series_info = []
so_val = csv_code[7].upper()
#print('csv_code = [{}], seasonal_adjustment = [{}]'.format(csv_code, so_val))
# print('csv_code = [{}], seasonal_adjustment = [{}]'.format(csv_code, so_val))
concept_val = extract_concept(csv_name) if with_concept else None
for si in series_info:
dim_dict = si[2]
......@@ -263,31 +263,30 @@ def extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept=Fal
return _series_info
def generate_dataset(ds_code, source_dir, ds_name, target_dir, with_concept=False):
def generate_dataset(ds_code, source_dir: Path, ds_name, target_dir: Path, with_concept=False):
"""
creates dataset_dir
generates time series tsv files
generates dataset.json
"""
ds_dir = os.path.join(target_dir, ds_code)
if not os.path.exists(ds_dir):
os.mkdir(ds_dir)
log.info('Generating dataset [%s]', ds_code)
ds_dir = target_dir / ds_code
if not ds_dir.exists():
ds_dir.mkdir()
LOG.info('Working on %s dataset', ds_code)
log.info('Working on %s dataset', ds_code)
series_info = []
for filename in sorted(os.listdir(source_dir)):
if not cu.ends_with(filename, '.csv'):
continue
csv_filepath = os.path.join(source_dir, filename)
csv_code = csv_filename_to_id(filename)
for csv_filepath in sorted(source_dir.glob('*.csv')):
csv_code = csv_filepath.stem
series_info.extend(extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept))
datasetjson_filepath = os.path.join(ds_dir, 'dataset.json')
datasetjson_filepath = ds_dir / 'dataset.json'
write_dataset_json(datasetjson_filepath, ds_code, ds_name, series_info, with_concept)
def write_dataset_json(json_filepath, ds_code, ds_name, series_info, with_concept=False):
def write_dataset_json(json_filepath: Path, ds_code, ds_name, series_info, with_concept=False):
""" Writes dataset.json """
dataset_data = {
'code': ds_code,
......@@ -335,21 +334,18 @@ def write_dataset_json(json_filepath, ds_code, ds_name, series_info, with_concep
cu.write_json_file(json_filepath, dataset_data)
def clean_csv_files(source_dir):
def clean_csv_files(source_dir: Path):
""" Fix CSV files found in given source_dir and store fixed versions in temp directory
return temp directory path """
import tempfile
temp_dir = tempfile.mkdtemp(prefix='sanzi')
for filename in os.listdir(source_dir):
if filename[-4:] != '.csv':
continue
orig_csv_filepath = os.path.join(source_dir, filename)
with open(orig_csv_filepath, mode='rb') as bin_fd:
temp_dir = Path(tempfile.mkdtemp(prefix='sanzi'))
for filepath in source_dir.glob('*.csv'):
with filepath.open('rb') as bin_fd:
bcontent = bin_fd.read()
fixed_bcontent = bcontent.replace(b'\x81\x6A', b')')
fixed_csv_filepath = os.path.join(temp_dir, filename)
with open(fixed_csv_filepath, mode='wb') as bin_fd:
fixed_csv_filepath = temp_dir / filepath.name
with fixed_csv_filepath.open('wb') as bin_fd:
bin_fd.write(fixed_bcontent)
return temp_dir
......@@ -365,40 +361,44 @@ def write_category_tree_json(json_filepath):
def main():
""" Converts downloaded CSV files into datasets and time series """
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'source_dir',
help='path of source directory containing downloaded DeStatis data',
)
parser.add_argument(
'target_dir',
help='path of target directory containing JSON data',
)
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('source_dir', type=Path, help='path of source directory')
parser.add_argument('target_dir', type=Path, help='path of target directory')
parser.add_argument('--log', default='WARNING', help='level of logging messages')
args = parser.parse_args()
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: {}'.format(args.log))
logging.basicConfig(
format="%(levelname)s:%(name)s:%(asctime)s:%(message)s",
level=numeric_level,
stream=sys.stdout, # Use stderr if script outputs data to stdout.
)
source_dir = args.source_dir
assert os.path.exists(source_dir)
assert os.access(source_dir, os.R_OK)
if not source_dir.exists():
parser.error("Source dir {!r} not found".format(str(source_dir)))
target_dir = args.target_dir
assert os.path.exists(target_dir)
assert os.access(target_dir, os.W_OK)
if not target_dir.exists():
parser.error("Target dir {!r} not found".format(str(target_dir)))
# Standard metadata
cu.write_json_file(os.path.join(target_dir, 'provider.json'), PROVIDER_JSON)
cu.write_json_file(os.path.join(target_dir, 'datapackage.json'), DATAPACKAGE_JSON)
write_category_tree_json(os.path.join(target_dir, 'category_tree.json'))
cu.write_json_file(target_dir / 'provider.json', PROVIDER_JSON)
write_category_tree_json(target_dir / 'category_tree.json')
# iip dataset
generate_dataset('iip', os.path.join(source_dir, 'b2010_g1e'), DATASETS_INFO['iip'], target_dir, with_concept=True)
generate_dataset('iip', source_dir / 'b2010_g1e', DATASETS_INFO['iip'], target_dir, with_concept=True)
# sanzi dataset
sanzi_source_dir = os.path.join(source_dir, 'b2010_ke')
sanzi_source_dir = source_dir / 'b2010_ke'
# sanzi csv files include bad character, first clean them and store fixed_csv in a temporary directory
temp_dir = clean_csv_files(sanzi_source_dir)
generate_dataset('sanzi', temp_dir, DATASETS_INFO['sanzi'], target_dir)
shutil.rmtree(temp_dir)
shutil.rmtree(str(temp_dir))
if __name__ == '__main__':
......
......@@ -3,20 +3,13 @@
Handy functions to ease convert work in dbnomics
"""
import json
from pathlib import Path
def ends_with(text, subtext):
""" true if subtext found at the end of text """
stlen = len(subtext)
if stlen > len(text):
return False
return text[-stlen:] == subtext
def write_json_file(file_path, data):
def write_json_file(file_path: Path, data):
""" Writes data the json way to file_path """
with open(file_path, 'w', encoding='utf-8') as json_fd:
with file_path.open('wt', encoding='utf-8') as json_fd:
json.dump(data, json_fd, ensure_ascii=False, indent=2, sort_keys=True)
......
......@@ -25,73 +25,89 @@
import argparse
import logging
import os
import shutil
import sys
import zipfile
from pathlib import Path
import http.client
import requests
LOG = logging.Logger('meti download')
log = logging.Logger(__name__)
def download_binary_file(url, file_path, cache=False):
def download_binary_file(url, file_path: Path, cache=False):
""" Download url into binary file """
LOG.debug("Downloading %s... ", os.path.basename(file_path))
if cache and os.path.exists(file_path):
LOG.debug('-> cached.')
log.debug("Downloading %s... ", file_path.name)
if cache and file_path.exists():
log.debug('-> cached.')
return
req = requests.get(url, stream=True)
with open(file_path, mode='wb') as fout:
with file_path.open('wb') as fout:
req.raw.decode_content = True
shutil.copyfileobj(req.raw, fout)
LOG.debug('-> done.')
log.debug('-> done.')
def die(errmsg):
""" Writes error msg and stops """
LOG.error(errmsg)
log.error(errmsg)
import sys
sys.stderr.write('Error: {}\n'.format(errmsg))
sys.exit(1)
def download_and_extract(url, target_dir, dir_name, cache=False):
def download_and_extract(url, target_dir: Path, dir_name, cache=False):
""" Downloads zip archive and extracts it in a folder """
# Downloads zip
zip_filepath = os.path.join(target_dir, '{}.zip'.format(dir_name))
zip_filepath = target_dir / '{}.zip'.format(dir_name)
download_binary_file(url, zip_filepath, cache=cache)
assert os.path.exists(zip_filepath)
assert zip_filepath.exists()
# Bad zip :-(
zip_archive = zipfile.ZipFile(zip_filepath)
zip_archive = zipfile.ZipFile(str(zip_filepath))
if zip_archive.testzip() is not None:
die('Bad zip file: [{}]'.format(zip_filepath))
die('Bad zip file: [{}]'.format(str(zip_filepath)))
# Extracts all CSV
csv_dir = os.path.join(target_dir, dir_name)
os.mkdir(csv_dir)
zip_archive.extractall(csv_dir)
csv_dir = target_dir / dir_name
csv_dir.mkdir()
zip_archive.extractall(str(csv_dir))
# And removes zip archive
if not cache:
os.remove(zip_filepath)
zip_filepath.unlink()
def main():
""" Downloads and extracts zip files in folders """
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('target_dir',
help='path of target directory for downloaded data',
)
parser.add_argument('target_dir', type=Path, help='path of target directory')
parser.add_argument('--debug-http', action='store_true', help='display http.client debug messages')
parser.add_argument('--log', default='WARNING', help='level of logging messages')
args = parser.parse_args()
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: {}'.format(args.log))
logging.basicConfig(
format="%(levelname)s:%(name)s:%(asctime)s:%(message)s",
level=numeric_level,
stream=sys.stdout, # Use stderr if script outputs data to stdout.
)
logging.getLogger("urllib3").setLevel(logging.DEBUG if args.debug_http else logging.WARNING)
if args.debug_http:
http.client.HTTPConnection.debuglevel = 1
target_dir = args.target_dir
assert os.path.exists(target_dir)
assert os.access(target_dir, os.W_OK)
if not target_dir.exists():
parser.error("Target dir {!r} not found".format(str(target_dir)))
LOG.info('Downloading meti data')
log.info('Downloading meti data')
download_and_extract('http://www.meti.go.jp/english/statistics/tyo/iip/csv/b2010_g1e.zip',
target_dir, 'b2010_g1e')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment