Source code for cacp.dataset

import dataclasses
import typing
from abc import ABC, abstractmethod
from pathlib import Path
from urllib.request import urlretrieve
from zipfile import ZipFile

import numpy as np
import pandas as pd
import typing_extensions
from sklearn import preprocessing
from sklearn.model_selection import KFold
from tqdm import tqdm

BASE_KEEL_DATASETS_URL = 'https://github.com/sylwekczmil/cacp_files/raw/main/'

AVAILABLE_CLASSIFICATION_DATASET_NAMES = typing_extensions.Literal[
    'abalone', 'appendicitis', 'australian', 'automobile', 'balance', 'banana', 'bands',
    'breast', 'bupa', 'car', 'chess', 'cleveland', 'coil2000', 'contraceptive', 'crx',
    'dermatology', 'ecoli', 'flare', 'german', 'glass', 'haberman', 'hayes-roth', 'heart',
    'hepatitis', 'housevotes', 'ionosphere', 'iris', 'kr-vs-k', 'led7digit', 'letter',
    'lymphography', 'magic', 'mammographic', 'marketing', 'monk-2', 'movement_libras',
    'mushroom', 'newthyroid', 'nursery', 'optdigits', 'page-blocks', 'penbased', 'phoneme',
    'pima', 'post-operative', 'ring', 'saheart', 'satimage', 'segment', 'shuttle', 'sonar',
    'spambase', 'spectfheart', 'splice', 'tae', 'texture', 'thyroid', 'tic-tac-toe',
    'titanic', 'twonorm', 'vehicle', 'vowel', 'wdbc', 'wine', 'winequality-red',
    'winequality-white', 'wisconsin', 'yeast', 'zoo'
]

AVAILABLE_N_FOLDS = typing_extensions.Literal[5, 10]


[docs]@dataclasses.dataclass class ClassificationFoldData: """ Class that represents single dataset fold. """ index: int = dataclasses.field() labels: np.ndarray = dataclasses.field() x_train: np.ndarray = dataclasses.field(repr=False) y_train: np.ndarray = dataclasses.field(repr=False) x_test: np.ndarray = dataclasses.field(repr=False) y_test: np.ndarray = dataclasses.field(repr=False)
[docs]class ClassificationFoldDataModifierBase(ABC):
[docs] @abstractmethod def modify(self, fold: ClassificationFoldData) -> ClassificationFoldData: pass
[docs]class ClassificationFoldDataNormalizer(ClassificationFoldDataModifierBase):
[docs] def modify(self, fold: ClassificationFoldData) -> ClassificationFoldData: x_tra_len = len(fold.x_train) x = np.concatenate([fold.x_train.astype(float), fold.x_test.astype(float)]) min_max_scaler = preprocessing.MinMaxScaler() x = min_max_scaler.fit_transform(x) x_train, x_test = x[:x_tra_len], x[x_tra_len:] return ClassificationFoldData( index=fold.index, labels=fold.labels, x_train=x_train, y_train=fold.y_train, x_test=x_test, y_test=fold.y_test )
[docs]class ClassificationDatasetMinimalBase(ABC): """ Minimal base class for classification dataset that represents single dataset. """ def __init__(self, seed=1): self.seed = seed
[docs] @abstractmethod def folds( self, n_folds: AVAILABLE_N_FOLDS = 10, dob_scv: bool = True, categorical_to_numerical=True ) -> typing.Iterable[ClassificationFoldData]: pass
def __iter__(self): random_state = np.random.RandomState(seed=self.seed) for fold in self.folds(): idx = random_state.permutation(np.arange(len(fold.x_test))) x_test = fold.x_test[idx] y_test = fold.y_test[idx] for x_data, y in zip(x_test, y_test): x = {i: value for i, value in enumerate(x_data)} yield x, y
[docs]class ClassificationDatasetBase(ClassificationDatasetMinimalBase): """ Base class for classification dataset that represents single dataset. """ @property @abstractmethod def name(self) -> str: pass @property @abstractmethod def instances(self) -> int: pass @property @abstractmethod def features(self) -> int: pass @property @abstractmethod def classes(self) -> int: pass def __str__(self): return f'Dataset name: {self.name}, ' \ f'instances: {self.instances}, ' \ f'features: {self.features}, ' \ f'classes: {self.classes}'
[docs]class ClassificationDatasetDownloadProgressBar(tqdm):
[docs] def update_to(self, b=1, bsize=1, t_size=None): if t_size is not None: self.total = t_size self.update(b * bsize - self.n)
[docs]class ClassificationDataset(ClassificationDatasetBase): """ Class that represents KEEL single dataset. """ def __init__( self, name: AVAILABLE_CLASSIFICATION_DATASET_NAMES, files_cache_path=Path.home().joinpath('cacp_files'), seed=1, ): """ Initializes class instance that represents KEEL single dataset. :param name: KEEL dataset name :param files_cache_path: optional cache file patch where dataset will be downloaded """ super().__init__(seed) self._name = name self._instances = 0 self._features = 0 self._classes = 0 self._output = 'Class' self._origin = '' self._attributes: typing.Dict[str, str] = {} self._files_cache_path = files_cache_path self._files_cache_path.mkdir(exist_ok=True, parents=True) self._load_description() @property def name(self) -> str: return self._name @property def instances(self) -> int: return self._instances @property def features(self) -> int: return self._features @property def classes(self) -> int: return self._classes @property def origin(self) -> str: return self._origin @property def output_name(self) -> str: return self._output_name
[docs] def folds( self, n_folds: AVAILABLE_N_FOLDS = 10, dob_scv: bool = True, categorical_to_numerical=True ) -> typing.Iterator[ClassificationFoldData]: zip_data_name = f'{self.name}-{n_folds}-{"dobscv" if dob_scv else "fold"}' data_path = self._fetch_data(zip_data_name, dob_scv) if dob_scv: data_name = f'{self.name}-{n_folds}dobscv' else: data_name = f'{self.name}-{n_folds}' for fold_index in range(1, n_folds + 1): train_data_path = data_path.joinpath(f'{data_name}-{fold_index}tra.dat') x_tra, y_tra = self._load_data(train_data_path, categorical_to_numerical) test_data_path = data_path.joinpath(f'{data_name}-{fold_index}tst.dat') x_tst, y_tst = self._load_data(test_data_path, categorical_to_numerical) labels = np.unique(np.hstack([y_tra, y_tst])) yield ClassificationFoldData( index=fold_index, x_train=x_tra, y_train=y_tra, x_test=x_tst, y_test=y_tst, labels=labels )
def _load_description(self): file_name = f'{self.name}-names.txt' file_path = self._fetch_file(file_name) attributes_names = [] attributes_types_names = [] inputs = [] output_name = 'Class' # KEEL descriptions files contain latin1 chars with file_path.open('r', encoding='latin1') as file: for line in file: if '@attribute' in line or '@Attribute' in line: if '{' in line: attr_name = line.split('{')[0].split()[1] attr_type = 'category' else: s = line.split()[1:] attr_name = s[0].strip() attr_type = s[1].split('[')[0].strip() attributes_names.append(attr_name) attributes_types_names.append(attr_type) if '@input' in line: inputs.append(line.split()[1:]) elif '@output' in line: output_name = line.split()[1] elif 'Origin.' in line: self._origin = line.split('Origin.')[1].strip() elif 'Features.' in line: self._features = int(line.split('Features.')[1].strip()) elif 'Classes.' in line: self._classes = int(line.split('Classes.')[1].strip()) elif 'Instances.' in line: self._instances = int(line.split('Instances.')[1].split()[0].strip()) self._attributes = {n: t for n, t in zip(attributes_names, attributes_types_names)} self._output_name = output_name def _load_data(self, path: Path, categorical_to_numerical: bool) -> typing.Tuple[np.ndarray, np.ndarray]: skip_rows = 4 + len(self._attributes) df = pd.read_csv(path, skiprows=skip_rows, names=self._attributes.keys(), na_values='?') if categorical_to_numerical: for attr_name, attr_type_name in self._attributes.items(): if attr_type_name == 'category': df[attr_name] = df[attr_name].astype('category').cat.codes.values y = df[self._output_name].values del df[self._output_name] x = df.values return x, y def _fetch_data(self, data_name: str, dob_scv: bool) -> Path: data_path = self._files_cache_path.joinpath(data_name) data_unzip_path = data_path if dob_scv: data_path = data_path.joinpath(self.name) data_unzip_path = data_path.parent if not data_path.exists(): zip_file_path = self._fetch_file(f'{data_name}.zip') with ZipFile(zip_file_path, mode='r') as zipfile: zipfile.extractall(data_unzip_path) return data_path def _fetch_file(self, file_name: str) -> Path: out_file_path = self._files_cache_path.joinpath(file_name) if not out_file_path.exists(): url = f'{BASE_KEEL_DATASETS_URL}{file_name}' with ClassificationDatasetDownloadProgressBar(unit='B', unit_scale=True, miniters=1, desc=f'Downloading {file_name}') as t: urlretrieve(url, filename=out_file_path, reporthook=t.update_to) return out_file_path
[docs]class LocalClassificationDataset(ClassificationDataset): """ Class that represents single local dataset that has similar structure to KEEL dataset. """ def __init__(self, name: str, dataset_directory: Path): """ Initializes class instance that represents KEEL single local dataset. :param name: dataset name :param dataset_directory: directory where dataset is stored """ super().__init__(name, dataset_directory) def _fetch_data(self, data_name: str, dob_scv: bool) -> Path: return self._files_cache_path def _fetch_file(self, file_name: str) -> Path: return self._files_cache_path.joinpath(file_name)
[docs]class LocalCsvClassificationDataset(ClassificationDatasetBase): """ Class that represents single local dataset that is SCV with header. """ @property def name(self) -> str: return self._name @property def instances(self) -> int: return self._instances @property def features(self) -> int: return self._features @property def classes(self) -> int: return self._classes
[docs] def folds(self, n_folds: AVAILABLE_N_FOLDS = 10, dob_scv: bool = True, categorical_to_numerical=True) -> \ typing.Iterable[ClassificationFoldData]: df = self._df() if categorical_to_numerical: for attr_name, attr_type_name in zip(df.columns, [t.name for t in df.dtypes]): if attr_type_name == 'category' or attr_type_name == 'object': df[attr_name] = df[attr_name].astype('category').cat.codes.values y = df[self._output_name].values labels = np.unique(y) del df[self._output_name] x = df.values kf = KFold(n_splits=n_folds, shuffle=True, random_state=self.seed) for i, (train_index, test_index) in enumerate(kf.split(x), start=1): x_train, x_test = x[train_index], x[test_index] y_train, y_test = y[train_index], y[test_index] yield ClassificationFoldData( index=i, labels=labels, x_test=x_test, y_test=y_test, x_train=x_train, y_train=y_train )
def _df(self): return pd.read_csv(self._dataset_path) def _load_metadata(self): df = self._df() self._instances = len(df) self._features = len(df.columns) - 1 self._output_name = df.columns[-1] self._classes = len(df[self._output_name].unique()) def __init__(self, name: str, dataset_path: Path): """ Initializes class instance that represents CSV local dataset. :param name: dataset name :param dataset_path:path where dataset is stored """ super().__init__() self._name = name self._output_name = "" self._dataset_path = dataset_path self._instances = 0 self._features = 0 self._classes = 0 self._load_metadata()
[docs]def all_datasets() -> typing.List[ClassificationDataset]: """ Gets all available datasets :return: all classification datasets """ return [ ClassificationDataset(name) for name in typing_extensions.get_args(AVAILABLE_CLASSIFICATION_DATASET_NAMES) ]