Source code for cacp.comparison

import inspect
import os
import typing
from pathlib import Path
from timeit import default_timer as timer

import pandas as pd
import river
from joblib import delayed, Parallel
from river import metrics as river_metrics, stream, utils
from river.datasets.base import Dataset
from tqdm import tqdm

from cacp.dataset import ClassificationDatasetBase, ClassificationFoldData, AVAILABLE_N_FOLDS, \
    ClassificationFoldDataModifierBase, ClassificationFoldDataNormalizer
from cacp.util import accuracy, precision, recall, auc, f1

DEFAULT_METRICS = (('AUC', auc), ('Accuracy', accuracy), ('Precision', precision), ('Recall', recall), ('F1', f1))

DEFAULT_INCREMENTAL_METRICS = (
    ('AUC', river_metrics.ROCAUC), ('Accuracy', river_metrics.Accuracy), ('Precision', river_metrics.Precision),
    ('Recall', river_metrics.Recall), ('F1', river_metrics.F1)
)


[docs]def process_comparison_single( classifier_factory, classifier_name, dataset: ClassificationDatasetBase, fold: ClassificationFoldData, metrics: typing.Sequence[typing.Tuple[str, typing.Callable]], ) -> dict: """ Runs comparison on single classifier and dataset. :param classifier_factory: classifier factory :param classifier_name: classifier name :param dataset: single dataset :param fold: fold data :param metrics: metrics collection :return: dictionary of calculated metrics and metadata """ model = classifier_factory(fold.x_train.shape[1], len(fold.labels)) train_start_time = timer() labels = fold.labels pred = None try: if 'classes' in inspect.getfullargspec(model.fit).args: model.fit(fold.x_train, fold.y_train, classes=labels.tolist()) else: model.fit(fold.x_train, fold.y_train) train_time = timer() - train_start_time pred_start_time = timer() pred = model.predict(fold.x_test) pred_time = timer() - pred_start_time except Exception as e: train_time = 9999 pred_time = 9999 print(f"Error while running {classifier_name}, metrics will be set to 0", e) result = { 'Dataset': dataset.name, 'Algorithm': classifier_name, 'Number of classes': len(set(fold.y_train)), 'Train size': len(fold.x_train), 'Test size': len(fold.x_test), 'CV index': fold.index, 'Train time [s]': train_time, 'Prediction time [s]': pred_time } for (metric, metric_fun) in metrics: try: result[metric] = metric_fun(fold.y_test, pred, labels) except Exception as e: result[metric] = 0. print(f"Error while calculating {metric} for {classifier_name}, value will be set to 0", e) return result
[docs]def process_comparison( datasets: typing.List[ClassificationDatasetBase], classifiers: typing.List[typing.Tuple[str, typing.Callable]], result_dir: Path, metrics: typing.Sequence[typing.Tuple[str, typing.Callable]] = DEFAULT_METRICS, n_folds: AVAILABLE_N_FOLDS = 10, custom_fold_modifiers: typing.List[ClassificationFoldDataModifierBase] = None, dob_scv: bool = True, categorical_to_numerical=True, normalized: bool = False, progress=lambda progress, total: None, ): """ Runs comparison for provided datasets and classifiers. :param datasets: dataset collection :param classifiers: classifiers collection :param result_dir: results directory :param metrics: metrics collection :param n_folds: number of folds {5,10} :param custom_fold_modifiers: custom fold modifiers that can change fold data before usage :param dob_scv: if folds distribution optimally balanced stratified cross-validation (DOB-SCV) should be used :param categorical_to_numerical: if dataset categorical values should be converted to numerical :param normalized: if the data should be normalized in range [0..1] :param progress: function that can be used to monitor progress """ count = 0 records = [] df = None fold_modifiers = [] if custom_fold_modifiers: fold_modifiers.extend(custom_fold_modifiers) if normalized: fold_modifiers.append(ClassificationFoldDataNormalizer()) with tqdm(total=len(datasets) * n_folds, desc='Processing comparison', unit='fold') as pbar: progress(pbar.n, pbar.total) for dataset_idx, dataset in enumerate(datasets): for fold in dataset.folds(n_folds=n_folds, dob_scv=dob_scv, categorical_to_numerical=categorical_to_numerical): modified_fold = fold for fold_modifier in fold_modifiers: modified_fold = fold_modifier.modify(modified_fold) rows = Parallel(n_jobs=len(classifiers))( delayed(process_comparison_single)(c, c_n, dataset, modified_fold, metrics) for c_n, c in classifiers ) records.extend(rows) pbar.update(1) progress(pbar.n, pbar.total) df = pd.DataFrame(records) df = df.sort_values(by=['Dataset', 'Algorithm', 'CV index']) count += 1 df.to_csv(result_dir.joinpath(f'comparison_{count}.csv'), index=False) if count > 1: prev_file = result_dir.joinpath(f'comparison_{count - 1}.csv') if os.path.isfile(prev_file): os.remove(prev_file) if df is not None: prev_file = result_dir.joinpath(f'comparison_{count}.csv') if os.path.isfile(prev_file): os.remove(prev_file) df.to_csv(result_dir.joinpath('comparison.csv'), index=False)
[docs]def process_incremental_comparison_single(classifier_factory, classifier_name, dataset: typing.Union[ ClassificationDatasetBase, Dataset ], number_of_classes: int, incremental_comparison_dir: Path, metrics: typing.Sequence[ typing.Tuple[str, typing.Callable]] = DEFAULT_INCREMENTAL_METRICS ) -> dict: """ Runs comparison on single classifier and dataset. :param classifier_factory: classifier factory :param classifier_name: classifier name :param dataset: single dataset :param number_of_classes: number of classes :param incremental_comparison_dir: incremental single results directory :param metrics: metrics collection :return: dictionary of calculated metrics and metadata """ incremental_comparison_classifier_dir = incremental_comparison_dir.joinpath(classifier_name) incremental_comparison_classifier_dir.mkdir(exist_ok=True, parents=True) train_time = 0 pred_time = 0 train_size = 0 dataset_name = "-" metric = river_metrics.base.Metrics([]) try: dataset_type = type(dataset) if issubclass(dataset_type, ClassificationDatasetBase): dataset_name = dataset.name train_size = dataset.instances elif issubclass(dataset_type, Dataset): dataset_name = dataset.__class__.__name__.lower() train_size = dataset.n_samples metric = river_metrics.base.Metrics([m() for _, m in metrics]) model = classifier_factory(train_size, number_of_classes) # Determine if predict_one or predict_proba_one should be used in case of a classifier if utils.inspect.isclassifier(model) and not metric.requires_labels: pred_func = model.predict_proba_one else: pred_func = model.predict_one records = [] y_pred = None for i, x, y, *kwargs in stream.simulate_qa(dataset, None, None, copy=True): kwargs = kwargs[0] if kwargs else {} if y is None: # no ground truth, just make a prediction pred_start_time = timer() # predict y_pred = pred_func(x=x, **kwargs) pred_time_diff = timer() - pred_start_time pred_time += pred_time_diff else: # there's a ground truth, model and metric can be updated # update the metrics if y_pred != {} and y_pred is not None: metric.update(y_true=y, y_pred=y_pred) y_pred = max(y_pred, key=y_pred.get) if type(y_pred) is dict else y_pred record = { 'index': i, 'y_true': y, 'y_pred': y_pred } for metric_idx, (metric_name, _) in enumerate(metrics): record[metric_name] = metric.data[metric_idx].get() records.append(record) learn_one_start_time = timer() # learn model.learn_one(x=x, y=y, **kwargs) learn_time_diff = timer() - learn_one_start_time train_time += learn_time_diff df = pd.DataFrame(records) df.to_csv(incremental_comparison_classifier_dir.joinpath(f'{dataset_name}.csv'), index=False) except Exception as e: print(f"Error while running {classifier_name}, metrics will be set to 0", e) result = { 'Dataset': dataset_name, 'Algorithm': classifier_name, 'Number of classes': number_of_classes, 'Train size': train_size, 'Test size': train_size, 'Train time [s]': train_time, 'Prediction time [s]': pred_time } for metric_idx, (metric_name, _) in enumerate(metrics): try: value = metric.data[metric_idx].get() result[metric_name] = float(value) except Exception as e: print(f"Error while calculating {metric} for {classifier_name}, value will be set to 0", e) result[metric_name] = 0. return result
[docs]def process_incremental_comparison( datasets: typing.List[typing.Union[ClassificationDatasetBase, river.datasets.base.Dataset]], classifiers: typing.List[typing.Tuple[str, typing.Callable]], result_dir: Path, metrics: typing.Sequence[typing.Tuple[str, typing.Callable]] = DEFAULT_INCREMENTAL_METRICS, progress=lambda progress, total: None, ): """ Runs comparison for provided datasets and incremental classifiers. :param datasets: dataset collection :param classifiers: classifiers collection :param result_dir: results directory :param metrics: metrics collection :param progress: function that can be used to monitor progress """ incremental_comparison_dir = result_dir.joinpath('incremental').joinpath('result') incremental_comparison_dir.mkdir(exist_ok=True, parents=True) count = 0 records = [] df = None with tqdm(total=len(datasets), desc='Processing comparison', unit='dataset') as pbar: progress(pbar.n, pbar.total) for dataset_idx, dataset in enumerate(datasets): # preload dataset to prevent race conditions on file savings and count classes labels = set() for x, y in dataset: labels.add(y) number_of_classes = len(labels) rows = Parallel(n_jobs=len(classifiers))( delayed(process_incremental_comparison_single)(c, c_n, dataset, number_of_classes, incremental_comparison_dir, metrics) for c_n, c in classifiers ) records.extend(rows) pbar.update(1) progress(pbar.n, pbar.total) df = pd.DataFrame(records) df = df.sort_values(by=['Dataset', 'Algorithm']) count += 1 df.to_csv(result_dir.joinpath(f'comparison_{count}.csv'), index=False) if count > 1: prev_file = result_dir.joinpath(f'comparison_{count - 1}.csv') if os.path.isfile(prev_file): os.remove(prev_file) if df is not None: prev_file = result_dir.joinpath(f'comparison_{count}.csv') if os.path.isfile(prev_file): os.remove(prev_file) df.to_csv(result_dir.joinpath('comparison.csv'), index=False)