Commit 33f36e20 authored by Christophe Benz's avatar Christophe Benz
Browse files

Import provider.json/category_tree.json only if changed

parent 22ab6c65
......@@ -69,7 +69,7 @@ class GitToDoltImporter:
"series",
[
sa.Column("code", sa.Text, primary_key=True),
sa.Column("dataset_code", sa.Text, nullable=False), # TODO add foreign key constraint
sa.Column("dataset_code", sa.Text, primary_key=True), # TODO add foreign key constraint
sa.Column("name", sa.Text),
sa.Column("dimensions", sa.Text), # denormalized
sa.Column("observations", LONGTEXT), # denormalized
......@@ -91,7 +91,7 @@ class GitToDoltImporter:
return self._dolt_client.sql(query, params)
def commit(self, git_commit: Commit):
message = git_commit.message
message = git_commit.message + "\n\n" + f"from Git commit ID: {git_commit.hexsha}"
author = git_commit.committer.name
email = git_commit.committer.email
date = git_commit.committed_datetime
......
......@@ -45,7 +45,7 @@ def cli(
from_rev: Optional[str] = None,
max_commits: Optional[int] = None,
verbose: bool = typer.Option(os.getenv("VERBOSE"), "--verbose", "-v"),
doltpy_log_level: str = typer.Option(os.getenv("DOLTPY_LOG_LEVEL", "INFO")),
doltpy_log_level: str = typer.Option(os.getenv("DOLTPY_LOG_LEVEL")),
):
import_json_data(
git_repo_dir,
......@@ -107,7 +107,10 @@ def import_json_data(
logger.info("Deleting %r content", str(dolt_repo_dir))
delete_dir_content(dolt_repo_dir, exclude={".sqlhistory"})
logging.getLogger("doltpy").setLevel(doltpy_log_level)
if verbose:
doltpy_log_level = "DEBUG"
if doltpy_log_level is not None:
logging.getLogger("doltpy").setLevel(doltpy_log_level)
doltpy_client = init_dolt(dolt_repo_dir)
dolt_server_config = ServerConfig(user="root", loglevel="debug")
......
import csv
import logging
from collections import defaultdict
from io import BytesIO, StringIO
from itertools import chain
from operator import itemgetter
......@@ -9,12 +8,12 @@ from typing import Iterator, Optional, Union
import git
import simdjson
from git import Blob, Commit, Repo
from git import Blob, Commit, Diff, Repo
from more_itertools import windowed
from toolz import keyfilter, take, update_in
from .json_utils import parse_json
from .typez import DatasetCode, DatasetTreeChanges, GitImporter, SeriesCode
from .typez import DatasetCode, DatasetTreeChanges, GitCommitChanges, GitImporter, SeriesCode
CATEGORY_TREE_JSON = "category_tree.json"
DATASET_JSON = "dataset.json"
......@@ -33,13 +32,14 @@ class GitWalker:
self._git_repo_dir = git_repo_dir
self._git_repo = Repo(self._git_repo_dir)
self._git_importer = git_importer
self._last_known_provider_json: Optional[dict] = None
def import_provider_repo(
self,
rev: str,
from_rev: Optional[str],
max_commits: Optional[int],
datasets: list[str],
datasets: list[DatasetCode],
):
def iter_git_commit_with_previous(repo):
"""Yield Git commit pairs (previous, current) to process according to the script options.
......@@ -96,64 +96,57 @@ class GitWalker:
self,
git_commit: Commit,
previous_git_commit: Optional[Commit],
datasets: list[str],
datasets: list[DatasetCode],
):
"""Walk Git objects (blobs and trees) of a commit and import them via git_importer."""
# Walk and import blobs (files)
provider_json = None
category_tree_json = None
git_commit_changes = self._get_git_commit_changes(git_commit, previous_git_commit, datasets)
for blob in git_commit.tree.blobs:
if blob.name == PROVIDER_JSON:
provider_json = load_blob_json(blob)
elif blob.name == CATEGORY_TREE_JSON:
category_tree_json = load_blob_json(blob)
if git_commit_changes.provider_json_blob is not None or git_commit_changes.category_tree_json_blob is not None:
provider_json = None
if git_commit_changes.provider_json_blob is None:
provider_json = self._last_known_provider_json
else:
logger.error(
"Unexpected blob %r in root tree %r of Git commit %r",
blob,
git_commit.tree,
git_commit,
)
if provider_json is not None:
self._git_importer.import_provider(provider_json, category_tree_json)
else:
logger.error("provider.json was not found, skipping importing it")
# Walk and import trees (sub-directories)
provider_json = load_blob_json(git_commit_changes.provider_json_blob)
self._last_known_provider_json = provider_json
category_tree_json = None
if git_commit_changes.category_tree_json_blob is not None:
if provider_json is None:
logger.error(
"No %r found since the beginning of Git history, skipping importing %r",
PROVIDER_JSON,
CATEGORY_TREE_JSON,
)
else:
category_tree_json = load_blob_json(git_commit_changes.category_tree_json_blob)
dataset_trees_changes = self._get_dataset_trees_changes(git_commit, previous_git_commit)
logger.info("%d datasets did change since previous commit", len(dataset_trees_changes))
if provider_json is not None:
self._git_importer.import_provider(provider_json, category_tree_json)
else:
logger.debug(
"Neither %r or %r changed since last commit, skipping importing provider",
PROVIDER_JSON,
CATEGORY_TREE_JSON,
)
if datasets:
dataset_trees_changes = keyfilter(lambda dataset_code: dataset_code in datasets, dataset_trees_changes)
logger.info("%d datasets did change since previous commit", len(git_commit_changes.dataset_trees))
for dataset_tree_index, (dataset_code, dataset_tree_changes) in enumerate(
sorted(dataset_trees_changes.items()), start=1
sorted(git_commit_changes.dataset_trees.items()), start=1
):
dataset_tree = git_commit.tree / dataset_code
logger.info(
"----- [%d/%d %r] Importing dataset tree %r",
dataset_tree_index,
len(dataset_trees_changes),
len(git_commit_changes.dataset_trees),
dataset_code,
dataset_tree,
)
self._import_dataset_tree_changes(dataset_code, dataset_tree_changes)
def _get_dataset_trees_changes(
self, git_commit: Commit, previous_git_commit: Optional[Commit]
) -> dict[DatasetCode, DatasetTreeChanges]:
"""Yield dataset trees with their associated changes."""
def normalize_dataset_json(dataset_json: dict):
# Ignore series attribute order
return (
update_in(dataset_json, ["series"], lambda series: sorted(series, key=itemgetter("code")))
if dataset_json.get("series") is not None
else dataset_json
)
def _get_git_commit_changes(
self, git_commit: Commit, previous_git_commit: Optional[Commit], datasets: list[DatasetCode]
) -> GitCommitChanges:
git_commit_changes = GitCommitChanges()
if previous_git_commit is None:
# It's the first commit
......@@ -164,46 +157,76 @@ class GitWalker:
diff_source = previous_git_commit
diff_target = git_commit
dataset_tree_changes: dict[DatasetCode, DatasetTreeChanges] = defaultdict(DatasetTreeChanges)
for diff in diff_source.diff(diff_target):
if diff.change_type not in {"A", "D", "M"}:
logger.debug("Ignoring change type %r for diff %r", diff.change_type, diff)
continue
if diff.b_blob in git_commit.tree.blobs:
# It's a file at the root like provider.json
continue
path = Path(diff.b_path)
if path.parent.parent.as_posix() == ".":
# It's a dataset dir
dataset_code = str(path.parent)
if path.name == DATASET_JSON:
b_dataset_json = normalize_dataset_json(load_blob_json(diff.b_blob))
assert dataset_code == b_dataset_json["code"]
if diff.a_blob is not None:
a_dataset_json = normalize_dataset_json(load_blob_json(diff.a_blob))
assert dataset_code == a_dataset_json["code"]
if b_dataset_json == a_dataset_json:
# Ignore false Git commits
continue
dataset_tree_changes[dataset_code].dataset_json = (diff.b_blob, b_dataset_json)
elif path.name.endswith(".tsv"):
if diff.change_type in {"A", "M"}:
dataset_tree_changes[dataset_code].changed_series_blobs.add(diff.b_blob)
elif diff.change_type == "D":
series_code = path.stem
dataset_tree_changes[dataset_code].deleted_series_codes.add(series_code)
elif path.name == SERIES_JSONL:
if diff.change_type in {"A", "M"}:
dataset_tree_changes[dataset_code].changed_series_blobs.add(diff.b_blob)
if diff.a_blob is not None:
deleted_series_code = self._find_jsonl_deleted_series_codes(diff.a_blob, diff.b_blob)
dataset_tree_changes[dataset_code].deleted_series_codes |= deleted_series_code
else:
logger.error("Unexpected diff item: %r", diff.b_path)
blob = diff.b_blob
# It's a file at the root like provider.json
if blob.name == PROVIDER_JSON:
git_commit_changes.provider_json_blob = blob
elif blob.name == CATEGORY_TREE_JSON:
git_commit_changes.category_tree_json_blob = blob
else:
logger.error("Unexpected diff item: %r", diff.b_path)
path = Path(diff.b_path)
if path.parent.parent.as_posix() == ".":
# It's a dataset dir
dataset_code = str(path.parent)
if datasets and dataset_code in datasets:
# Skip dataset if --datasets script option is used, and dataset is not asked for
continue
dataset_tree_changes = git_commit_changes.dataset_trees.get(dataset_code)
if dataset_tree_changes is None:
dataset_tree_changes = DatasetTreeChanges()
git_commit_changes.dataset_trees[dataset_code] = dataset_tree_changes
self._update_dataset_tree_changes(dataset_tree_changes, diff, dataset_code, path)
else:
logger.error(
"Unexpected blob %r in root tree %r of Git commit %r",
blob,
git_commit.tree,
git_commit,
)
return git_commit_changes
def _update_dataset_tree_changes(
self, dataset_tree_changes: DatasetTreeChanges, diff: Diff, dataset_code: DatasetCode, path: Path
):
"""Yield dataset trees with their associated changes."""
return dataset_tree_changes
def normalize_dataset_json(dataset_json: dict):
# Ignore series attribute order
return (
update_in(dataset_json, ["series"], lambda series: sorted(series, key=itemgetter("code")))
if dataset_json.get("series") is not None
else dataset_json
)
if path.name == DATASET_JSON:
b_dataset_json = normalize_dataset_json(load_blob_json(diff.b_blob))
assert dataset_code == b_dataset_json["code"]
if diff.a_blob is not None:
a_dataset_json = normalize_dataset_json(load_blob_json(diff.a_blob))
assert dataset_code == a_dataset_json["code"]
if b_dataset_json == a_dataset_json:
# Ignore false Git commits
return None
dataset_tree_changes.dataset_json = (diff.b_blob, b_dataset_json)
elif path.name.endswith(".tsv"):
if diff.change_type in {"A", "M"}:
dataset_tree_changes.changed_series_blobs.add(diff.b_blob)
elif diff.change_type == "D":
series_code = path.stem
dataset_tree_changes.deleted_series_codes.add(series_code)
elif path.name == SERIES_JSONL:
if diff.change_type in {"A", "M"}:
dataset_tree_changes.changed_series_blobs.add(diff.b_blob)
if diff.a_blob is not None:
deleted_series_code = self._find_jsonl_deleted_series_codes(diff.a_blob, diff.b_blob)
dataset_tree_changes.deleted_series_codes |= deleted_series_code
else:
logger.error("Unexpected diff item: %r", diff.b_path)
def _find_jsonl_deleted_series_codes(self, a_blob: Blob, b_blob: Optional[Blob]) -> set[SeriesCode]:
def iter_series_codes(blob: Blob):
......
......@@ -18,9 +18,18 @@ class TableImportMode(Enum):
UPDATE = "UPDATE"
@dataclass
class GitCommitChanges:
"""Describes what changed between 2 Git commits."""
dataset_trees: dict[DatasetCode, "DatasetTreeChanges"] = field(default_factory=dict)
provider_json_blob: Optional[Blob] = None
category_tree_json_blob: Optional[Blob] = None
@dataclass
class DatasetTreeChanges:
"""Describes what changed between 2 commits in a dataset tree."""
"""Describes what changed between 2 Git commits in a dataset tree."""
dataset_json: Optional[tuple[Blob, dict]] = None
changed_series_blobs: set[Blob] = field(default_factory=set)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment