import dataclasses
import typing
from abc import ABC, abstractmethod
from pathlib import Path
from urllib.request import urlretrieve
from zipfile import ZipFile
import numpy as np
import pandas as pd
import typing_extensions
from sklearn import preprocessing
from sklearn.model_selection import KFold
from tqdm import tqdm
BASE_KEEL_DATASETS_URL = 'https://github.com/sylwekczmil/cacp_files/raw/main/'
AVAILABLE_CLASSIFICATION_DATASET_NAMES = typing_extensions.Literal[
'abalone', 'appendicitis', 'australian', 'automobile', 'balance', 'banana', 'bands',
'breast', 'bupa', 'car', 'chess', 'cleveland', 'coil2000', 'contraceptive', 'crx',
'dermatology', 'ecoli', 'flare', 'german', 'glass', 'haberman', 'hayes-roth', 'heart',
'hepatitis', 'housevotes', 'ionosphere', 'iris', 'kr-vs-k', 'led7digit', 'letter',
'lymphography', 'magic', 'mammographic', 'marketing', 'monk-2', 'movement_libras',
'mushroom', 'newthyroid', 'nursery', 'optdigits', 'page-blocks', 'penbased', 'phoneme',
'pima', 'post-operative', 'ring', 'saheart', 'satimage', 'segment', 'shuttle', 'sonar',
'spambase', 'spectfheart', 'splice', 'tae', 'texture', 'thyroid', 'tic-tac-toe',
'titanic', 'twonorm', 'vehicle', 'vowel', 'wdbc', 'wine', 'winequality-red',
'winequality-white', 'wisconsin', 'yeast', 'zoo'
]
AVAILABLE_N_FOLDS = typing_extensions.Literal[5, 10]
[docs]@dataclasses.dataclass
class ClassificationFoldData:
"""
Class that represents single dataset fold.
"""
index: int = dataclasses.field()
labels: np.ndarray = dataclasses.field()
x_train: np.ndarray = dataclasses.field(repr=False)
y_train: np.ndarray = dataclasses.field(repr=False)
x_test: np.ndarray = dataclasses.field(repr=False)
y_test: np.ndarray = dataclasses.field(repr=False)
[docs]class ClassificationFoldDataModifierBase(ABC):
[docs] @abstractmethod
def modify(self, fold: ClassificationFoldData) -> ClassificationFoldData:
pass
[docs]class ClassificationFoldDataNormalizer(ClassificationFoldDataModifierBase):
[docs] def modify(self, fold: ClassificationFoldData) -> ClassificationFoldData:
x_tra_len = len(fold.x_train)
x = np.concatenate([fold.x_train.astype(float), fold.x_test.astype(float)])
min_max_scaler = preprocessing.MinMaxScaler()
x = min_max_scaler.fit_transform(x)
x_train, x_test = x[:x_tra_len], x[x_tra_len:]
return ClassificationFoldData(
index=fold.index,
labels=fold.labels,
x_train=x_train,
y_train=fold.y_train,
x_test=x_test,
y_test=fold.y_test
)
[docs]class ClassificationDatasetMinimalBase(ABC):
"""
Minimal base class for classification dataset that represents single dataset.
"""
def __init__(self, seed=1):
self.seed = seed
[docs] @abstractmethod
def folds(
self,
n_folds: AVAILABLE_N_FOLDS = 10,
dob_scv: bool = True,
categorical_to_numerical=True
) -> typing.Iterable[ClassificationFoldData]:
pass
def __iter__(self):
random_state = np.random.RandomState(seed=self.seed)
for fold in self.folds():
idx = random_state.permutation(np.arange(len(fold.x_test)))
x_test = fold.x_test[idx]
y_test = fold.y_test[idx]
for x_data, y in zip(x_test, y_test):
x = {i: value for i, value in enumerate(x_data)}
yield x, y
[docs]class ClassificationDatasetBase(ClassificationDatasetMinimalBase):
"""
Base class for classification dataset that represents single dataset.
"""
@property
@abstractmethod
def name(self) -> str:
pass
@property
@abstractmethod
def instances(self) -> int:
pass
@property
@abstractmethod
def features(self) -> int:
pass
@property
@abstractmethod
def classes(self) -> int:
pass
def __str__(self):
return f'Dataset name: {self.name}, ' \
f'instances: {self.instances}, ' \
f'features: {self.features}, ' \
f'classes: {self.classes}'
[docs]class ClassificationDatasetDownloadProgressBar(tqdm):
[docs] def update_to(self, b=1, bsize=1, t_size=None):
if t_size is not None:
self.total = t_size
self.update(b * bsize - self.n)
[docs]class ClassificationDataset(ClassificationDatasetBase):
"""
Class that represents KEEL single dataset.
"""
def __init__(
self, name: AVAILABLE_CLASSIFICATION_DATASET_NAMES,
files_cache_path=Path.home().joinpath('cacp_files'),
seed=1,
):
"""
Initializes class instance that represents KEEL single dataset.
:param name: KEEL dataset name
:param files_cache_path: optional cache file patch where dataset will be downloaded
"""
super().__init__(seed)
self._name = name
self._instances = 0
self._features = 0
self._classes = 0
self._output = 'Class'
self._origin = ''
self._attributes: typing.Dict[str, str] = {}
self._files_cache_path = files_cache_path
self._files_cache_path.mkdir(exist_ok=True, parents=True)
self._load_description()
@property
def name(self) -> str:
return self._name
@property
def instances(self) -> int:
return self._instances
@property
def features(self) -> int:
return self._features
@property
def classes(self) -> int:
return self._classes
@property
def origin(self) -> str:
return self._origin
@property
def output_name(self) -> str:
return self._output_name
[docs] def folds(
self,
n_folds: AVAILABLE_N_FOLDS = 10,
dob_scv: bool = True,
categorical_to_numerical=True
) -> typing.Iterator[ClassificationFoldData]:
zip_data_name = f'{self.name}-{n_folds}-{"dobscv" if dob_scv else "fold"}'
data_path = self._fetch_data(zip_data_name, dob_scv)
if dob_scv:
data_name = f'{self.name}-{n_folds}dobscv'
else:
data_name = f'{self.name}-{n_folds}'
for fold_index in range(1, n_folds + 1):
train_data_path = data_path.joinpath(f'{data_name}-{fold_index}tra.dat')
x_tra, y_tra = self._load_data(train_data_path, categorical_to_numerical)
test_data_path = data_path.joinpath(f'{data_name}-{fold_index}tst.dat')
x_tst, y_tst = self._load_data(test_data_path, categorical_to_numerical)
labels = np.unique(np.hstack([y_tra, y_tst]))
yield ClassificationFoldData(
index=fold_index,
x_train=x_tra,
y_train=y_tra,
x_test=x_tst,
y_test=y_tst,
labels=labels
)
def _load_description(self):
file_name = f'{self.name}-names.txt'
file_path = self._fetch_file(file_name)
attributes_names = []
attributes_types_names = []
inputs = []
output_name = 'Class'
# KEEL descriptions files contain latin1 chars
with file_path.open('r', encoding='latin1') as file:
for line in file:
if '@attribute' in line or '@Attribute' in line:
if '{' in line:
attr_name = line.split('{')[0].split()[1]
attr_type = 'category'
else:
s = line.split()[1:]
attr_name = s[0].strip()
attr_type = s[1].split('[')[0].strip()
attributes_names.append(attr_name)
attributes_types_names.append(attr_type)
if '@input' in line:
inputs.append(line.split()[1:])
elif '@output' in line:
output_name = line.split()[1]
elif 'Origin.' in line:
self._origin = line.split('Origin.')[1].strip()
elif 'Features.' in line:
self._features = int(line.split('Features.')[1].strip())
elif 'Classes.' in line:
self._classes = int(line.split('Classes.')[1].strip())
elif 'Instances.' in line:
self._instances = int(line.split('Instances.')[1].split()[0].strip())
self._attributes = {n: t for n, t in zip(attributes_names, attributes_types_names)}
self._output_name = output_name
def _load_data(self, path: Path, categorical_to_numerical: bool) -> typing.Tuple[np.ndarray, np.ndarray]:
skip_rows = 4 + len(self._attributes)
df = pd.read_csv(path, skiprows=skip_rows, names=self._attributes.keys(), na_values='?')
if categorical_to_numerical:
for attr_name, attr_type_name in self._attributes.items():
if attr_type_name == 'category':
df[attr_name] = df[attr_name].astype('category').cat.codes.values
y = df[self._output_name].values
del df[self._output_name]
x = df.values
return x, y
def _fetch_data(self, data_name: str, dob_scv: bool) -> Path:
data_path = self._files_cache_path.joinpath(data_name)
data_unzip_path = data_path
if dob_scv:
data_path = data_path.joinpath(self.name)
data_unzip_path = data_path.parent
if not data_path.exists():
zip_file_path = self._fetch_file(f'{data_name}.zip')
with ZipFile(zip_file_path, mode='r') as zipfile:
zipfile.extractall(data_unzip_path)
return data_path
def _fetch_file(self, file_name: str) -> Path:
out_file_path = self._files_cache_path.joinpath(file_name)
if not out_file_path.exists():
url = f'{BASE_KEEL_DATASETS_URL}{file_name}'
with ClassificationDatasetDownloadProgressBar(unit='B', unit_scale=True, miniters=1,
desc=f'Downloading {file_name}') as t:
urlretrieve(url, filename=out_file_path, reporthook=t.update_to)
return out_file_path
[docs]class LocalClassificationDataset(ClassificationDataset):
"""
Class that represents single local dataset that has similar structure to KEEL dataset.
"""
def __init__(self, name: str, dataset_directory: Path):
"""
Initializes class instance that represents KEEL single local dataset.
:param name: dataset name
:param dataset_directory: directory where dataset is stored
"""
super().__init__(name, dataset_directory)
def _fetch_data(self, data_name: str, dob_scv: bool) -> Path:
return self._files_cache_path
def _fetch_file(self, file_name: str) -> Path:
return self._files_cache_path.joinpath(file_name)
[docs]class LocalCsvClassificationDataset(ClassificationDatasetBase):
"""
Class that represents single local dataset that is SCV with header.
"""
@property
def name(self) -> str:
return self._name
@property
def instances(self) -> int:
return self._instances
@property
def features(self) -> int:
return self._features
@property
def classes(self) -> int:
return self._classes
[docs] def folds(self, n_folds: AVAILABLE_N_FOLDS = 10, dob_scv: bool = True, categorical_to_numerical=True) -> \
typing.Iterable[ClassificationFoldData]:
df = self._df()
if categorical_to_numerical:
for attr_name, attr_type_name in zip(df.columns, [t.name for t in df.dtypes]):
if attr_type_name == 'category' or attr_type_name == 'object':
df[attr_name] = df[attr_name].astype('category').cat.codes.values
y = df[self._output_name].values
labels = np.unique(y)
del df[self._output_name]
x = df.values
kf = KFold(n_splits=n_folds, shuffle=True, random_state=self.seed)
for i, (train_index, test_index) in enumerate(kf.split(x), start=1):
x_train, x_test = x[train_index], x[test_index]
y_train, y_test = y[train_index], y[test_index]
yield ClassificationFoldData(
index=i,
labels=labels,
x_test=x_test,
y_test=y_test,
x_train=x_train,
y_train=y_train
)
def _df(self):
return pd.read_csv(self._dataset_path)
def _load_metadata(self):
df = self._df()
self._instances = len(df)
self._features = len(df.columns) - 1
self._output_name = df.columns[-1]
self._classes = len(df[self._output_name].unique())
def __init__(self, name: str, dataset_path: Path):
"""
Initializes class instance that represents CSV local dataset.
:param name: dataset name
:param dataset_path:path where dataset is stored
"""
super().__init__()
self._name = name
self._output_name = ""
self._dataset_path = dataset_path
self._instances = 0
self._features = 0
self._classes = 0
self._load_metadata()
[docs]def all_datasets() -> typing.List[ClassificationDataset]:
"""
Gets all available datasets
:return: all classification datasets
"""
return [
ClassificationDataset(name) for name in typing_extensions.get_args(AVAILABLE_CLASSIFICATION_DATASET_NAMES)
]