...
 
Commits (2)
......@@ -104,9 +104,10 @@ async def process_resources(
* removing the excluded ones if the ``--exclude`` option is used
* keeping only some of them if the ``--only`` option is used
* processing a limited number of resources if the ``--limit`` option is used
By default, any resource already processed is skipped,
whatever its status was (SUCCESS or FAILURE).
By default do not process resources that were already processed with
``SUCCESS`` or ``FAILURE`` status.
If the option ``--retry-failed`` is used, retry resources with FAILURE status.
If the option ``--force`` is used, process all resources.
......@@ -114,27 +115,20 @@ async def process_resources(
to track the processing progress.
If an exception is raised during the execution of ``process_resource``:
* log the error and process the next resource or re-raise
if ``--fail-fast`` option is used
* log the error and process the next resource,
or re-raise if ``--fail-fast`` option is used
* call ``resource.delete()`` if ``--delete-on-error`` option is used
"""
if events is None:
events = []
resource_event_by_id = {
event.id: event for event in events or [] if event.type == EventType.RESOURCE
event.id: event for event in events if event.type == EventType.RESOURCE
}
resources_to_process = get_resources_to_process(resources, args)
# Emit SKIPPED event for each non-processed resource
skipped_res_ids = {resource.id for resource in resources} - {
resource.id for resource in resources_to_process
}
for res_id in skipped_res_ids:
resource_event_by_id[res_id] = ResourceEvent(
id=res_id,
status=ResourceStatus.SKIPPED,
duration=0,
message="Resource was skipped",
)
resources_to_process = _filter_resources_with_args(
resources, args, resource_event_by_id
)
if not resources_to_process:
logger.debug(
......@@ -189,15 +183,11 @@ async def _process_resource_wrapper(
resource_number: int,
on_event: OnEventCallback = None,
):
# Skip previous executed steps if "events" sequence was given.
event = resource_event_by_id.get(resource.id)
if (
not args.force
and event is not None
and (not args.retry_failed or event.status != ResourceStatus.FAILURE)
):
if not _should_process_resource(args, event=event):
assert event is not None, event
logger.debug(
"Skipping resource %d/%d because it has already been processed (%s)",
"Do not process resource %d/%d because it was already processed (%s)",
resource_number,
len(resources_to_process),
event.status.value,
......@@ -254,45 +244,62 @@ async def _process_resource_wrapper(
)
def get_resources_to_process(
resources: Sequence[Resource], args: argparse.Namespace
def _should_process_resource(args: argparse.Namespace, event: ResourceEvent = None):
return (
args.force
or event is None
or (
event.status != ResourceStatus.SUCCESS
and (event.status != ResourceStatus.FAILURE or args.retry_failed)
)
)
def _filter_resources_with_args(
resources: Sequence[Resource],
args: argparse.Namespace,
resource_event_by_id: Dict[ResourceId, ResourceEvent],
) -> Sequence[Resource]:
"""Apply ``--only``, ``--exclude`` and ``--limit`` script options."""
def check_invalid(option_ids: Set[ResourceId], option_name: str) -> Set[ResourceId]:
"""Check for invalid resources and return valid ones."""
invalid_resources = option_ids - ids
if invalid_resources:
logger.error(
"%s were given to %s: %s",
pluralize(len(invalid_resources), "invalid resource"),
option_name,
", ".join(sorted(invalid_resources)),
)
valid_resources = option_ids - invalid_resources
return valid_resources
"""Filter ``resources`` by applying ``args``, and update ``resource_event_by_id``.
Script optiona taken into account: ``--only``, ``--exclude`` and ``--limit``.
For every resource filtered-out by script options, add a
:class:`dbnomics_fetcher_toolbox.status.ResourceEvent` to ``resource_event_by_id``.
"""
all_resource_ids = {r.id for r in resources}
# Initialize `resources_to_process` with all `resources`.
# The script options will be applied below, restricting this list.
resources_to_process = resources
ids = {resource.id for resource in resources}
if args.only:
valid_only = check_invalid(set(args.only), "--only")
option_name = "--only"
valid_only = _validate_resources(all_resource_ids, set(args.only), option_name)
logger.debug(
"Process only %d of %s because of --only: %s",
"Process only %d of %s because of %s: %s",
len(valid_only),
pluralize(len(resources), "resource"),
option_name,
", ".join(valid_only),
)
resources_to_process = [
resource for resource in resources_to_process if resource.id in valid_only
]
_add_skipped_events(
resources, resources_to_process, resource_event_by_id, option_name
)
if args.exclude:
valid_exclude = check_invalid(set(args.exclude), "--exclude")
option_name = "--exclude"
valid_exclude = _validate_resources(
all_resource_ids, set(args.exclude), option_name
)
logger.debug(
"Exclude %d of %s because of --exclude: %s",
"Exclude %d of %s because of %s: %s",
len(valid_exclude),
pluralize(len(resources), "resource"),
option_name,
", ".join(valid_exclude),
)
resources_to_process = [
......@@ -300,16 +307,67 @@ def get_resources_to_process(
for resource in resources_to_process
if resource.id not in valid_exclude
]
_add_skipped_events(
resources, resources_to_process, resource_event_by_id, option_name
)
if args.limit is not None:
option_name = "--limit"
logger.debug(
"%s because of --limit",
"%s because of %s",
"Process only the first resource"
if args.limit == 1
else "Don't process any resource"
if args.limit == 0
else f"Process only the {format_number(args.limit)} first resources",
option_name,
)
resources_to_process = list(take(args.limit, resources_to_process))
_add_skipped_events(
resources, resources_to_process, resource_event_by_id, option_name
)
return resources_to_process
def _add_skipped_events(
resources: Sequence[Resource],
resources_to_process: Sequence[Resource],
resource_event_by_id: Dict[ResourceId, ResourceEvent],
option_name: str,
):
"""Add SKIPPED events for each resource not processed because of script options.
Mutate ``resource_event_by_id``.
"""
for resource in resources:
if resource.id not in {r.id for r in resources_to_process}:
resource_event_by_id[resource.id] = ResourceEvent(
id=resource.id,
status=ResourceStatus.SKIPPED,
duration=0,
message=f"Resource is skipped because of {option_name} option",
)
def _validate_resources(
all_resource_ids: Set[ResourceId], option_ids: Set[ResourceId], option_name: str
) -> Set[ResourceId]:
"""Check for invalid resources in script option.
Log errors and return valid resources.
"""
invalid_resources = option_ids - all_resource_ids
if invalid_resources:
logger.error(
"%s passed to %s: %s",
pluralize(
len(invalid_resources),
singular="invalid resource was",
plural="invalid resources were",
),
option_name,
", ".join(sorted(invalid_resources)),
)
valid_resources = option_ids - invalid_resources
return valid_resources
......@@ -25,9 +25,18 @@ from pathlib import Path
import pytest
from dbnomics_fetcher_toolbox import status
from dbnomics_fetcher_toolbox.arguments import add_arguments_for_download
from dbnomics_fetcher_toolbox.resources import Resource, process_resources
from dbnomics_fetcher_toolbox.resources import (
Resource,
_should_process_resource,
process_resources,
)
from dbnomics_fetcher_toolbox.status import (
ResourceEvent,
ResourceStatus,
load_events,
open_status_writer,
)
def default_process_resource(resource: Resource):
......@@ -37,24 +46,25 @@ def default_process_resource(resource: Resource):
def prepare_args(target_dir: Path, args_str: str = None):
"""Prepare command-line args for test cases."""
if args_str is None:
args_str = ""
parser = argparse.ArgumentParser()
add_arguments_for_download(parser)
additional_arg_list = [] if args_str is None else args_str.split(" ")
return parser.parse_args(
args=list(filter(None, [str(target_dir), *additional_arg_list]))
)
additional_arg_list = filter(None, args_str.split(" "))
return parser.parse_args(args=[str(target_dir), *additional_arg_list])
@pytest.mark.asyncio
async def test_process_all_resources(tmp_path):
"""Process all resources."""
events = status.load_events(tmp_path)
events = load_events(tmp_path)
assert events is None
resources = [Resource(id=letter) for letter in list("ABCDEF")]
args = prepare_args(tmp_path)
with status.open_status_writer(args) as append_event:
with open_status_writer(args) as append_event:
resource_event_by_id = await process_resources(
resources=resources,
args=args,
......@@ -66,22 +76,21 @@ async def test_process_all_resources(tmp_path):
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SUCCESS
for evt in resource_event_by_id.values()
evt.status == ResourceStatus.SUCCESS for evt in resource_event_by_id.values()
)
@pytest.mark.asyncio
async def test_process_one_resource(tmp_path):
"""Process ony one resource."""
events = status.load_events(tmp_path)
events = load_events(tmp_path)
assert events is None
resources = [Resource(id=letter) for letter in list("ABCDEF")]
selected_resource_id = "C"
args = prepare_args(tmp_path, f"--only {selected_resource_id}")
with status.open_status_writer(args) as append_event:
with open_status_writer(args) as append_event:
resource_event_by_id = await process_resources(
resources=resources,
args=args,
......@@ -93,20 +102,17 @@ async def test_process_one_resource(tmp_path):
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SKIPPED
evt.status == ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id != selected_resource_id
)
assert (
resource_event_by_id[selected_resource_id].status
== status.ResourceStatus.SUCCESS
)
assert resource_event_by_id[selected_resource_id].status == ResourceStatus.SUCCESS
@pytest.mark.asyncio
async def test_exclude_some_resources(tmp_path):
"""Process all resources except some ones."""
events = status.load_events(tmp_path)
events = load_events(tmp_path)
assert events is None
resources = [Resource(id=letter) for letter in list("ABCDEF")]
......@@ -114,7 +120,7 @@ async def test_exclude_some_resources(tmp_path):
excluded_resource_id_list = excluded_resources_str.split(" ")
args = prepare_args(tmp_path, f"--exclude {excluded_resources_str}")
with status.open_status_writer(args) as append_event:
with open_status_writer(args) as append_event:
resource_event_by_id = await process_resources(
resources=resources,
args=args,
......@@ -126,12 +132,12 @@ async def test_exclude_some_resources(tmp_path):
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SKIPPED
evt.status == ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id in excluded_resource_id_list
)
assert all(
evt.status == status.ResourceStatus.SUCCESS
evt.status == ResourceStatus.SUCCESS
for resource_id, evt in resource_event_by_id.items()
if resource_id not in excluded_resource_id_list
)
......@@ -140,7 +146,7 @@ async def test_exclude_some_resources(tmp_path):
@pytest.mark.asyncio
async def test_limit_processed_resources(tmp_path):
"""Process a limited number of resources."""
events = status.load_events(tmp_path)
events = load_events(tmp_path)
assert events is None
resources = [Resource(id=letter) for letter in list("ABCDEF")]
......@@ -148,7 +154,7 @@ async def test_limit_processed_resources(tmp_path):
processed_resource_id_list = [resource.id for resource in resources[:limit_count]]
args = prepare_args(tmp_path, f"--limit {limit_count}")
with status.open_status_writer(args) as append_event:
with open_status_writer(args) as append_event:
resource_event_by_id = await process_resources(
resources=resources,
args=args,
......@@ -160,12 +166,26 @@ async def test_limit_processed_resources(tmp_path):
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SUCCESS
evt.status == ResourceStatus.SUCCESS
for resource_id, evt in resource_event_by_id.items()
if resource_id in processed_resource_id_list
)
assert all(
evt.status == status.ResourceStatus.SKIPPED
evt.status == ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id not in processed_resource_id_list
)
# def test_should_process_resource_no_event():
def test_should_process_resource_force(tmp_path):
args = prepare_args(tmp_path, "--force")
event = ResourceEvent(id="A", duration=1, status=ResourceStatus.SUCCESS)
assert _should_process_resource(args, event=event)
# def test_should_process_resource_success():
# def test_should_process_resource_failure():
# def test_should_process_resource_failure_retry_failed():