Shortcuts

Source code for mmengine.evaluator.metric

# Copyright (c) OpenMMLab. All rights reserved.
import logging
from abc import ABCMeta, abstractmethod
from typing import Any, List, Optional, Sequence, Union

from torch import Tensor

from mmengine.dist import (broadcast_object_list, collect_results,
                           is_main_process)
from mmengine.fileio import dump
from mmengine.logging import print_log
from mmengine.registry import METRICS
from mmengine.structures import BaseDataElement


[docs]class BaseMetric(metaclass=ABCMeta): """Base class for a metric. The metric first processes each batch of data_samples and predictions, and appends the processed results to the results list. Then it collects all results together from all ranks if distributed training is used. Finally, it computes the metrics of the entire dataset. A subclass of class:`BaseMetric` should assign a meaningful value to the class attribute `default_prefix`. See the argument `prefix` for details. Args: collect_device (str): Device name used for collecting results from different ranks during distributed training. Must be 'cpu' or 'gpu'. Defaults to 'cpu'. prefix (str, optional): The prefix that will be added in the metric names to disambiguate homonymous metrics of different evaluators. If prefix is not provided in the argument, self.default_prefix will be used instead. Default: None collect_dir: (str, optional): Synchronize directory for collecting data from different ranks. This argument should only be configured when ``collect_device`` is 'cpu'. Defaults to None. `New in version 0.7.3.` """ default_prefix: Optional[str] = None def __init__(self, collect_device: str = 'cpu', prefix: Optional[str] = None, collect_dir: Optional[str] = None) -> None: if collect_dir is not None and collect_device != 'cpu': raise ValueError('`collec_dir` could only be configured when ' "`collect_device='cpu'`") self._dataset_meta: Union[None, dict] = None self.collect_device = collect_device self.results: List[Any] = [] self.prefix = prefix or self.default_prefix self.collect_dir = collect_dir if self.prefix is None: print_log( 'The prefix is not set in metric class ' f'{self.__class__.__name__}.', logger='current', level=logging.WARNING) @property def dataset_meta(self) -> Optional[dict]: """Optional[dict]: Meta info of the dataset.""" return self._dataset_meta @dataset_meta.setter def dataset_meta(self, dataset_meta: dict) -> None: """Set the dataset meta info to the metric.""" self._dataset_meta = dataset_meta
[docs] @abstractmethod def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None: """Process one batch of data samples and predictions. The processed results should be stored in ``self.results``, which will be used to compute the metrics when all batches have been processed. Args: data_batch (Any): A batch of data from the dataloader. data_samples (Sequence[dict]): A batch of outputs from the model. """
[docs] @abstractmethod def compute_metrics(self, results: list) -> dict: """Compute the metrics from processed results. Args: results (list): The processed results of each batch. Returns: dict: The computed metrics. The keys are the names of the metrics, and the values are corresponding results. """
[docs] def evaluate(self, size: int) -> dict: """Evaluate the model performance of the whole dataset after processing all batches. Args: size (int): Length of the entire validation dataset. When batch size > 1, the dataloader may pad some data samples to make sure all ranks have the same length of dataset slice. The ``collect_results`` function will drop the padded data based on this size. Returns: dict: Evaluation metrics dict on the val dataset. The keys are the names of the metrics, and the values are corresponding results. """ if len(self.results) == 0: print_log( f'{self.__class__.__name__} got empty `self.results`. Please ' 'ensure that the processed results are properly added into ' '`self.results` in `process` method.', logger='current', level=logging.WARNING) if self.collect_device == 'cpu': results = collect_results( self.results, size, self.collect_device, tmpdir=self.collect_dir) else: results = collect_results(self.results, size, self.collect_device) if is_main_process(): # cast all tensors in results list to cpu results = _to_cpu(results) _metrics = self.compute_metrics(results) # type: ignore # Add prefix to metric names if self.prefix: _metrics = { '/'.join((self.prefix, k)): v for k, v in _metrics.items() } metrics = [_metrics] else: metrics = [None] # type: ignore broadcast_object_list(metrics) # reset the results list self.results.clear() return metrics[0]
[docs]@METRICS.register_module() class DumpResults(BaseMetric): """Dump model predictions to a pickle file for offline evaluation. Args: out_file_path (str): Path of the dumped file. Must end with '.pkl' or '.pickle'. collect_device (str): Device name used for collecting results from different ranks during distributed training. Must be 'cpu' or 'gpu'. Defaults to 'cpu'. collect_dir: (str, optional): Synchronize directory for collecting data from different ranks. This argument should only be configured when ``collect_device`` is 'cpu'. Defaults to None. `New in version 0.7.3.` """ def __init__(self, out_file_path: str, collect_device: str = 'cpu', collect_dir: Optional[str] = None) -> None: super().__init__( collect_device=collect_device, collect_dir=collect_dir) if not out_file_path.endswith(('.pkl', '.pickle')): raise ValueError('The output file must be a pkl file.') self.out_file_path = out_file_path
[docs] def process(self, data_batch: Any, predictions: Sequence[dict]) -> None: """transfer tensors in predictions to CPU.""" self.results.extend(_to_cpu(predictions))
[docs] def compute_metrics(self, results: list) -> dict: """dump the prediction results to a pickle file.""" dump(results, self.out_file_path) print_log( f'Results has been saved to {self.out_file_path}.', logger='current') return {}
def _to_cpu(data: Any) -> Any: """transfer all tensors and BaseDataElement to cpu.""" if isinstance(data, (Tensor, BaseDataElement)): return data.to('cpu') elif isinstance(data, list): return [_to_cpu(d) for d in data] elif isinstance(data, tuple): return tuple(_to_cpu(d) for d in data) elif isinstance(data, dict): return {k: _to_cpu(v) for k, v in data.items()} else: return data

© Copyright 2022, mmengine contributors. Revision 39ed23fa.

Built with Sphinx using a theme provided by Read the Docs.
Read the Docs v: latest
Versions
latest
stable
v0.10.3
v0.10.2
v0.10.1
v0.10.0
v0.9.1
v0.9.0
v0.8.5
v0.8.4
v0.8.3
v0.8.2
v0.8.1
v0.8.0
v0.7.4
v0.7.3
v0.7.2
v0.7.1
v0.7.0
v0.6.0
v0.5.0
v0.4.0
v0.3.0
v0.2.0
Downloads
epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.