Source code for cassiopy.mixture
import numpy as np
import os
import scipy
from cassiopy.stats import SkewT
import pandas as pd
from scipy.special import digamma, loggamma
from scipy.optimize import minimize_scalar
from scipy.stats import skewnorm, norm, gamma, t
[docs]
class SkewTMixture:
"""
SkewTMixture
=============
A mixture model for clustering using the skewed Student's t-distribution.
Parameters
==========
n_cluster : int
Number of clusters.
init_method : str, default='gmm'
Initialization method. Options: 'gmm', 'kmeans', 'params'.
parametre : dict, optional
Dictionary of initial parameters if init_method='params'.
n_init_gmm : int, default=6
Number of initializations for GMM.
Attributes
==========
mean : ndarray
Cluster means.
sigma : ndarray
Cluster standard deviations.
dl : ndarray
Degrees of freedom for each cluster.
lamb : ndarray
Skewness parameters for each cluster.
alpha : ndarray
Cluster weights.
tik : ndarray
Posterior probabilities.
logli : list
Log-likelihood values during training.
Notes
=====
For more information, refer to the documentation :ref:`doc.mixture.SkewTMixture`
Examples
========
>>> import numpy as np
>>> from cassiopy.mixture import SkewTMixture
>>> X = np.array([[5, 3], [5, 7], [5, 1], [20, 3], [20, 7], [20, 1]])
>>> model = SkewTMixture(n_cluster=2, init_method='kmeans')
>>> model.fit(X, max_iter=100, tol=1e-4)
>>> model.mean
array([[ 5. , 1.40735413],
[20.00000058, 0.66644041]])
>>> model.predict_proba(np.array([[0, 0], [22, 5]]))
array([[0.5, 0.5],
[0.5, 0.5]])
>>> model.save('model.h5')
>>> model.load('Models_folder/model.h5')
>>> model.predict_cluster(np.array([[0, 0], [22, 5]]))
array([0., 0.])
"""
def __init__(self, n_cluster:int, init_method='gmm', parametre=None, n_init_gmm=6):
self.n_cluster = n_cluster
self.init_method = init_method
if self.init_method == "params":
self.params = parametre
if self.init_method == "gmm":
self.n_init_gmm = n_init_gmm
[docs]
def initialisation_random(self, X):
"""
Initialize the parameters randomly for the SkewMM algorithm.
Parameters
==========
X : array-like of shape (n_samples, n_features)
Input data matrix.
Returns
=======
w : ndarray of shape (n_clusters,)
Cluster weights.
mu : ndarray of shape (n_clusters, n_features)
Cluster means.
s : ndarray of shape (n_clusters, n_features)
Cluster standard deviations.
nu : ndarray of shape (n_clusters, n_features)
Degrees of freedom for each cluster.
la : ndarray of shape (n_clusters, n_features)
Skewness parameters for each cluster.
"""
w = np.random.uniform(0.1, 1.0, size=self.n_cluster)
w /= np.sum(w)
mu = np.random.uniform(np.min(X, axis=0), np.max(X, axis=0), size=(self.n_cluster, X.shape[1]))
s = np.random.uniform(0.1, 1.0, size=(self.n_cluster, X.shape[1]))
nu = np.random.uniform(4., 10., size=(self.n_cluster, X.shape[1]))
la = np.random.uniform(-2., 2., size=(self.n_cluster, X.shape[1]))
return w, mu, s, nu, la
[docs]
def initialisation_gmm(self, X):
"""
Initialize the parameters for the Gaussian Mixture Model (GMM).
Parameters
==========
X : array-like of shape (n_samples, n_features)
Input data matrix.
Returns
=======
w : ndarray of shape (n_clusters,)
Cluster weights.
mu : ndarray of shape (n_clusters, n_features)
Cluster means.
s : ndarray of shape (n_clusters, n_features)
Cluster standard deviations.
nu : ndarray of shape (n_clusters, n_features)
Degrees of freedom for each cluster.
la : ndarray of shape (n_clusters, n_features)
Skewness parameters for each cluster.
"""
from sklearn.mixture import GaussianMixture
gmm = GaussianMixture(n_components=self.n_cluster, covariance_type='diag').fit(X)
mu = gmm.means_
s = np.sqrt(gmm.covariances_)
w = gmm.weights_
la = np.random.uniform(low=1e-6, high=1e-5, size=(self.n_cluster, X.shape[1]))
nu = np.random.uniform(0, 10, size=(self.n_cluster, X.shape[1]))
return w, mu, s, nu, la
[docs]
def initialisation_kmeans(self, X):
"""
Initializes the parameters for the SkewMM algorithm using the K-means initialization method.
Parameters
==========
X : array-like of shape (n_samples, n_features)
The input data matrix.
default_n_init : int, default='auto'
The number of times the K-means algorithm will be run with different centroid seeds. Default is 'auto'.
Returns
=======
dict : dict
A dictionary containing the initialized parameters:
w : ndarray of shape (n_clusters,)
Cluster weights.
mu : ndarray of shape (n_clusters, n_features)
Cluster means.
s : ndarray of shape (n_clusters, n_features)
Cluster standard deviations.
nu : ndarray of shape (n_clusters, n_features)
Degrees of freedom for each cluster.
la : ndarray of shape (n_clusters, n_features)
Skewness parameters for each cluster.
"""
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=self.n_cluster, n_init=20).fit(X)
mu = kmeans.cluster_centers_
s = np.ones((self.n_cluster, X.shape[1]))
w = np.bincount(kmeans.labels_) / len(kmeans.labels_)
la = np.random.uniform(low=1e-6, high=1e-5, size=(self.n_cluster, X.shape[1]))
nu = np.random.uniform(0, 10, size=(self.n_cluster, X.shape[1]))
return w, mu, s, nu, la
[docs]
def logt_expr(self, eta2, nu):
"""
Compute the logarithm of the t-distribution expression.
Parameters
==========
eta2 : ndarray
Squared standardized residuals.
nu : ndarray
Degrees of freedom.
Returns
=======
logt_val : ndarray
Logarithm of the t-distribution expression.
"""
return loggamma((nu + 1)/2) - loggamma(nu/2) - 0.5 * np.log(nu * np.pi) - ((nu + 1)/2) * np.log(1 + eta2 / nu)
[docs]
def ST(self, X, w, mu, s, nu, la):
"""
Compute the posterior probabilities for the SkewT mixture model.
Parameters
==========
X : ndarray
Input data.
w : ndarray
Cluster weights.
mu : ndarray
Cluster means.
s : ndarray
Cluster standard deviations.
nu : ndarray
Degrees of freedom for each cluster.
la : ndarray
Skewness parameters for each cluster.
Returns
=======
Z : ndarray
Posterior probabilities for each cluster.
"""
Skew = np.ones((len(w), X.shape[0]))
if X.ndim == 1:
eta = (X[None, :] - mu[:, None]) / s[:, None]
logt_val = self.logt_expr(eta**2, nu[:, None])
lau = la[:, None] * eta
t0 = (nu[:, None] + 1) / (nu[:, None]+ eta**2)
c0 = lau * np.sqrt(t0)
Tc0 = t.cdf(c0, df=nu[:, None] + 1)
Skew = w[:, None] * 2 * np.exp(logt_val) * Tc0 / s[:, None]
else:
Skew *= w[:, None]
for j in range(X.shape[1]):
eta = (X[:, j][None, :] - mu[:, j][:, None]) / s[:, j][:, None]
logt_val = self.logt_expr(eta**2, nu[:, j][:, None])
lau = la[:, j][:, None] * eta
t0 = (nu[:, j][:, None] + 1) / (nu[:, j][:, None]+ eta**2)
c0 = lau * np.sqrt(t0)
Tc0 = t.cdf(c0, df=nu[:, j][:, None] + 1)
Skew *= 2 * np.exp(logt_val) * Tc0 / s[:, j][:, None]
return Skew
def LL(self, Z):
"""
Compute the log-likelihood of the model.
Parameters
==========
Z : ndarray
Posterior probabilities for each cluster.
Returns
=======
logli_new : float
Log-likelihood of the model.
"""
ind_den = np.sum(Z, axis=0)
logli_new = np.sum(np.log(ind_den))
return logli_new
[docs]
def fit(self, X, tol=1e-6, max_iter=200, verbose=0):
"""
Fit the SkewT mixture model to the data.
Parameters
==========
X : array-like, shape (n_samples, n_features)
The input data.
tol : float, default=1e-6
Tolerance for convergence.
max_iter : int, default=200
Maximum number of iterations for the EM algorithm.
"""
if self.init_method == 'gmm':
best_params = None
best_LL = -np.inf
params_to_test = [
self.initialisation_gmm(X) for _ in range(self.n_init_gmm)
]
for params in params_to_test:
w, mu, s, nu, la = params
Z = self.ST(X, w, mu, s, nu, la)
LL = self.LL(Z)
if LL > best_LL:
best_LL = LL
best_params = params
w, mu, s, nu, la = best_params
elif self.init_method == 'random':
w, mu, s, nu, la = self.initialisation_random(X)
elif self.init_method == 'kmeans':
w, mu, s, nu, la = self.initialisation_kmeans(X)
elif self.init_method == 'parametre':
w = self.params['alpha']
mu = self.params['mu']
s = self.params['sig']
la = self.params['lamb']
nu = self.params['nu']
else :
raise ValueError(f"Error: The initialization method {self.init_method} is not recognized, please choose from 'parametre', 'kmeans', or 'gmm' ")
if X.ndim == 1:
n, dim = len(X), 1
else:
n, dim = X.shape[0], X.shape[1]
g = len(w) if isinstance(w, (list, np.ndarray)) else 1
if g == 1:
w = np.array([w])
mu = np.array([mu])
s = np.array([s])
la = np.array([la])
nu = np.array([nu])
iter = 0
eta = (X[None, :] - mu[:, None]) / s[:, None]
eta2 = eta**2
lau = la[:, None] * eta
t0 = (nu[:, None] + 1) / (nu[:, None] + eta2)
c0 = lau * np.sqrt(t0)
Tc0 = t.cdf(c0, df=nu[:, None] + 1)
Z = self.ST(X, w, mu, s, nu, la) # ST
ind_den = np.sum(Z, axis=0)
logli_old = self.LL(Z)
self.logli = [self.LL(Z)]
while True:
# E-step
if verbose==1:
print(f'iteration: {iter}/{max_iter}')
iter += 1
ind_den = np.sum(Z, axis=0)
Z /= ind_den
c2 = la[:, None] * eta * np.sqrt((nu[:, None] + 3) / (nu[:, None] + eta2))
Tc2 = t.cdf(c2, df=nu[:, None] + 3)
tau = t0 * Tc2 / Tc0
de = la / np.sqrt(1 + la**2)
al = s * de
hs1 = (lau * tau + t.pdf(c0, df=nu[:, None] + 1) / Tc0 * np.sqrt(t0)) * np.sqrt(1 - de[:, None]**2)
hs2 = 1 - de[:, None]**2 + de[:, None] * eta * hs1
# M-step
ni= np.sum(Z, axis=1)
w = ni / n
mu = np.sum(Z[:, :, None] * (tau * X - (al[:, None] * hs1)), axis=1) / np.sum(Z[:, :, None] * tau, axis=1)
cent = (X[None, :] - mu[:, None])
Szs2 = np.sum(Z[:, :, None] * hs2, axis=1)
al = np.sum(Z[:, :, None] * hs1 * cent, axis=1) / Szs2
ka2 = np.sum(Z[:, :, None] * (tau * cent**2 - 2 * al[:, None] * hs1 * cent + (al**2)[:, None] * hs2), axis=1) / ni[:, None]
s = np.sqrt(ka2 + al**2)
de = al / s
la = de / np.sqrt(1 - de**2)
for i in range(g):
if dim == 1:
def CML_nu(v):
cdf_vals = t.cdf(la[i] * np.sqrt((v + 1) / (v + eta2[i, :])), df=v + 1)
log_t = self.logt_expr(eta2[i, :], v)
return -np.sum(Z[i, :] * (log_t + np.log(cdf_vals)))
res = minimize_scalar(CML_nu, bounds=(1e-6, 100), method='bounded')
nu[i] = res.x
else:
for j in range(dim):
def CML_nu(v):
cdf_vals = t.cdf(la[i, j] * np.sqrt((v + 1) / (v + eta2[i, :, j])), df=v + 1)
log_t = self.logt_expr(eta2[i, :, j], v)
return -np.sum(Z[i, :] * (log_t + np.log(cdf_vals)))
res = minimize_scalar(CML_nu, bounds=(1e-6, 100), method='bounded')
nu[i, j] = res.x
eta = (X[None, :] - mu[:, None]) / s[:, None]
eta2 = eta**2
lau = la[:, None] * eta
t0 = (nu[:, None] + 1) / (nu[:, None] + eta2)
c0 = lau * np.sqrt(t0)
Tc0 = t.cdf(c0, df=nu[:, None] + 1)
Z = self.ST(X, w, mu, s, nu, la) # ST
logli_new = self.LL(Z)
diff = logli_new - logli_old
if diff < tol or iter >= max_iter:
print('tolérance atteinte')
break
if logli_new < logli_old:
print('logli_new', logli_new)
print('logli_old', logli_old)
print('diminution du logli')
break
if iter == max_iter:
print('nombre d iterations atteint')
break
logli_old = logli_new
self.logli.append(logli_new)
self.tik = Z/ ind_den
self.mean = mu
self.sigma = s
self.dl = nu
self.lamb = la
self.alpha = w
return self
[docs]
def predict_proba(self, X):
"""
Predict the posterior probabilities for the data.
Parameters
==========
X : array-like, shape (n_samples, n_features)
The input data.
Returns
=======
proba : ndarray, shape (n_samples, n_clusters)
The posterior probabilities for each cluster.
"""
# Implementation of the predict_proba method
Z = self.ST(X, self.alpha, self.mean, self.sigma, self.dl, self.lamb)
ind_den = np.sum(Z, axis=0)
proba = Z / ind_den
return proba
[docs]
def predict(self, X):
"""
Predict the cluster labels for the data.
Parameters
==========
- X (array-like): The input data.
Returns
=======
- labels (array-like): The predicted cluster labels.
"""
# Implementation of the predict method
proba = self.predict_proba(X)
labels=np.array([])
for i in range(proba.shape[1]):
labels = np.append(labels, np.argmax(proba[:, i]))
return labels
[docs]
def ARI(self, y_true, y_pred):
"""
Compute the ARI .
Parameters
==========
- y (array-like): The true labels.
Returns
=======
- ari (float): The Adjusted Rand Index (ARI) score.
"""
from sklearn.metrics import adjusted_rand_score
ari = adjusted_rand_score(y_true, y_pred)
return ari
[docs]
def confusion_matrix(self, y_true, y_pred):
"""
Calculate the confusion matrix.
Parameters
==========
y_true : array-like
The true labels.
y_pred : array-like, default=None
The predicted labels.
Returns
=======
matrix : array-like
The confusion matrix. The last cluster correspond to the uniform cluster.
"""
if y_true is None:
raise ValueError("Error: The true labels are missing, please provide them.")
y_true = y_true.astype(int)
y_pred = np.array(y_pred).astype(int)
# Obtenez les classes uniques
classes = np.unique(np.concatenate((y_true, y_pred)))
# Créez une matrice de confusion initialisée à zéro
matrix = np.zeros((self.n_cluster, self.n_cluster), dtype=int)
# Utilisez np.searchsorted pour obtenir les indices des classes
true_indices = np.searchsorted(classes, y_true)
pred_indices = np.searchsorted(classes, y_pred)
# Mettez à jour la matrice de confusion en comptant les occurrences
for true_idx, pred_idx in zip(true_indices, pred_indices):
matrix[true_idx, pred_idx] += 1
return matrix
[docs]
def BIC(self, X):
"""
Calculate the Bayesian Information Criterion (BIC) for the model.
Parameters
==========
- X (array-like): The input data.
Returns
=======
- bic (float): The BIC value.
"""
# Implementation of the BIC method
n = X.shape[0]
LL = self.logli[-1]
m = 4 * self.n_cluster
bic = -2 * LL + np.log(n) * m
return bic
[docs]
def AIC(self, X, penalty_weight=0.1):
from collections import Counter
# Implementation of the BIC method
y_pred = self.predict(X)
n = X.shape[0]
LL = self.logli[-1]
m = 4 * self.n_cluster
aic = -2 * LL + 2* m
cluster_sizes = Counter(y_pred)
penalty = sum(1 for size in cluster_sizes.values() if size < 5)
return aic * (1 - penalty_weight * penalty)
[docs]
def IUS(self, X, bins=3):
"""
Calculate the Indice d'Uniformité de Shannon (IUS) for the model.
This index measures the uniformity of the distribution of data points across clusters.
Parameters
==========
data : ndarray of shape (n_samples, n_features)
bins : int, default=3
Number of bins for the histogram.
Returns
=======
ius : float
The Indice d'Uniformité de Shannon (IUS) value.
"""
probabilites= self.tik[:,:]
y_pred = self.predict(X)
tirage_bernoulli = np.random.binomial(1, probabilites)
label_bernouilli = np.zeros(X.shape[0])
for i in range(X.shape[0]):
if tirage_bernoulli[i]==1:
label_bernouilli[i]= self.n_cluster
else:
label_bernouilli[i]=y_pred[i]
data = X[:, :][label_bernouilli == self.n_cluster]
# Histogramme multidimensionnel
hist, edges = np.histogramdd(data, bins=bins)
# Probabilités (normalisation)
prob = hist / np.sum(hist)
# Entropie (en bits)
prob_nonzero = prob[prob > 0]
H = -np.sum(prob_nonzero * np.log2(prob_nonzero))
# Entropie maximale = log2(N) où N est le nombre total de cases non nulles
N = np.sum(hist)
H_max = np.log2(N) if N > 0 else 1e-12 # pour éviter la division par zéro
# Indice d’uniformité
iuS = H / H_max if H_max > 0 else 0
return iuS
[docs]
def KL(self, X, bins=3):
"""
Calculate the Kullback-Leibler divergence for the model.
This index measures the divergence between the empirical distribution of data points in the uniform cluster and a uniform distribution.
Parameters
==========
data : ndarray of shape (n_samples, n_features)
bins : int, default=3
Number of bins for the histogram.
Returns
=======
kl_div : float
The Kullback-Leibler divergence value.
"""
from scipy.special import rel_entr
probabilites= self.tik[: ,:]
y_pred = self.predict(X)
tirage_bernoulli = np.random.binomial(1, probabilites)
label_bernouilli = np.zeros(X.shape[0])
for i in range(X.shape[0]):
if tirage_bernoulli[i]==1:
label_bernouilli[i]= self.n_cluster
else:
label_bernouilli[i]=y_pred[i]
data = X[:, :][label_bernouilli == self.n_cluster]
hist, _ = np.histogramdd(data, bins=bins)
prob = hist / np.sum(hist)
uniform_prob = 1 / np.prod(hist.shape)
kl_div = np.sum(rel_entr(prob, uniform_prob))
return kl_div
[docs]
def L2(self, X, bins=10, penalty_weight=0.1):
"""
Calculate the L2 distance between the empirical distribution of data points in the uniform cluster and a uniform distribution.
This index measures the distance between the empirical distribution and a uniform distribution.
Parameters
==========
data : ndarray of shape (n_samples, n_features)
bins : int, default=10
Number of bins for the histogram.
penalty_weight : float, default=0.1
Weight for the penalty term based on cluster sizes.
Returns
=======
l2_distance : float
The L2 distance value.
"""
from scipy.spatial.distance import cdist
from collections import Counter
probabilites= self.tik[:,:]
y_pred = self.predict(X)
tirage_bernoulli = np.random.binomial(1, probabilites)
label_bernouilli = np.zeros(X.shape[0])
for i in range(X.shape[0]):
if tirage_bernoulli[i]==1:
label_bernouilli[i]= self.n_cluster
else:
label_bernouilli[i]=y_pred[i]
data = X[:, :][label_bernouilli == self.n_cluster]
hist, _ = np.histogramdd(data, bins=bins)
prob = hist / np.sum(hist)
uniform_prob = 1 / np.prod(hist.shape)
l2_distance = np.sqrt(np.sum((prob - uniform_prob) ** 2))
cluster_sizes = Counter(y_pred)
penalty = sum(1 for size in cluster_sizes.values() if size < 5)
return l2_distance * (1 - penalty_weight * penalty)
[docs]
def chi2(self, X, bins=3):
"""
Calculate the Chi-squared statistic for the model.
This index measures the goodness of fit between the empirical distribution of data points in the uniform cluster and a uniform distribution.
Parameters
==========
data : ndarray of shape (n_samples, n_features)
bins : int, default=3
Number of bins for the histogram.
Returns
=======
chi2_stat : float
The Chi-squared statistic value.
p_value : float
The p-value associated with the Chi-squared statistic.
"""
from scipy.stats import chisquare
probabilites= self.tik[:,:]
y_pred = self.predict(X)
tirage_bernoulli = np.random.binomial(1, probabilites)
label_bernouilli = np.zeros(X.shape[0])
for i in range(X.shape[0]):
if tirage_bernoulli[i]==1:
label_bernouilli[i]= self.n_cluster
else:
label_bernouilli[i]=y_pred[i]
data = X[:, :][label_bernouilli == self.n_cluster]
hist, _ = np.histogramdd(data, bins=bins)
observed = hist.flatten()
expected = np.full_like(observed, np.mean(observed))
chi2_stat, p_value = chisquare(f_obs=observed, f_exp=expected)
return chi2_stat, p_value
[docs]
def HARTIGAN(self, X):
"""
Calculate the Hartigan's index for the model.
This index measures the compactness of clusters by summing the squared distances of points from their cluster centers.
Parameters
==========
X : array-like, shape (n_samples, n_features)
The input data.
Returns
=======
W : float
The Hartigan's index value.
"""
W = 0
y_pred = self.predict(X)
for cluster_id in np.unique(y_pred):
cluster_points = X[y_pred == cluster_id]
if len(cluster_points) == 0:
continue # éviter les clusters vides
center = cluster_points.mean(axis=0)
distances = np.linalg.norm(cluster_points - center, axis=1)
W += np.sum(distances ** 2)
return W
[docs]
def save(self, filename: str):
"""
Save the model to a file.
Parameters
==========
filename : str
The name of the file.
"""
import h5py
if not filename.endswith(".h5"):
filename = f"{filename}.h5"
if not filename:
raise ValueError(
"Error: The filename is empty, please provide a valid filename."
)
if not os.path.exists("Models_folder"):
os.makedirs("Models_folder")
with h5py.File(f"Models_folder/{filename}", "w") as f:
f.create_dataset("mean", data=self.mean)
f.create_dataset("sigma", data=self.sigma)
f.create_dataset("dl", data=self.dl)
f.create_dataset("lamb", data=self.lamb)
f.create_dataset("alpha", data=self.alpha)
f.create_dataset("tik", data=self.tik)
f.create_dataset("E_log_likelihood", data=self.logli)
[docs]
def load(self, filename: str):
"""
Load matrices from a given file.
Parameters
==========
filename : str
The path to the file containing the matrices.
"""
import h5py
with h5py.File(filename, "r") as f:
self.mean = f["mean"][:]
self.sigma = f["sigma"][:]
self.dl = f["dl"][:]
self.lamb = f["lamb"][:]
self.alpha = f["alpha"][:]
self.logli = f["E_log_likelihood"][:]
[docs]
class SkewTUniformMixture:
"""
SkewTUniformMixture
====================
A mixture model for clustering using the skewed Student's t-distribution.
Parameters
==========
n_cluster : int
Number of clusters.
init_method : str, default='gmm'
Initialization method. Options: 'gmm', 'kmeans', 'params'.
parametre : dict, optional
Dictionary of initial parameters if init_method='params'.
n_init_gmm : int, default=6
Number of initializations for GMM.
Attributes
==========
mean : ndarray
Cluster means.
sigma : ndarray
Cluster standard deviations.
dl : ndarray
Degrees of freedom for each cluster.
lamb : ndarray
Skewness parameters for each cluster.
alpha : ndarray
Cluster weights.
tik : ndarray
Posterior probabilities.
logli : list
Log-likelihood values during training.
Notes
=====
For more information, refer to the documentation :ref:`doc.mixture.SkewTUniformMixture`
Examples
========
>>> import numpy as np
>>> from cassiopy.mixture import SkewTUniformMixture
>>> X = np.array([[5, 3], [5, 7], [5, 1], [20, 3], [20, 7], [20, 1]])
>>> model = SkewTUniformMixture(n_cluster=2, init_method='kmeans')
>>> model.fit(X, max_iter=100, tol=1e-4)
>>> model.mean
array([[ 5. , 1.40735413],
[20.00000058, 0.66644041]])
>>> model.predict_proba(np.array([[0, 0], [22, 5]]))
array([[0.5, 0.5],
[0.5, 0.5]])
>>> model.save('model.h5')
>>> model.load('Models_folder/model.h5')
>>> model.predict_cluster(np.array([[0, 0], [22, 5]]))
array([0., 0.])
"""
def __init__(self, n_cluster:int, init_method='gmm', parametre=None, n_init_gmm=6):
self.n_cluster = n_cluster
self.init_method = init_method
if self.init_method == "params":
self.params = parametre
if self.init_method == "gmm":
self.n_init_gmm = n_init_gmm
[docs]
def initialisation_gmm(self, X):
"""
Initialize the parameters for the Gaussian Mixture Model (GMM).
Parameters
==========
X : array-like of shape (n_samples, n_features)
Input data matrix.
Returns
=======
w : ndarray of shape (n_clusters,)
Cluster weights.
mu : ndarray of shape (n_clusters, n_features)
Cluster means.
s : ndarray of shape (n_clusters, n_features)
Cluster standard deviations.
nu : ndarray of shape (n_clusters, n_features)
Degrees of freedom for each cluster.
la : ndarray of shape (n_clusters, n_features)
Skewness parameters for each cluster.
"""
from sklearn.mixture import GaussianMixture
gmm = GaussianMixture(n_components=self.n_cluster, covariance_type='diag').fit(X)
mu = gmm.means_
s = np.sqrt(gmm.covariances_)
w = gmm.weights_
w = np.append(w, 0.2) #add uniform cluster
w = w/ np.sum(w)
la = np.random.uniform(low=0, high=5, size=(self.n_cluster, X.shape[1]))
nu = np.random.uniform(0, 10, size=(self.n_cluster, X.shape[1]))
return w, mu, s, nu, la
[docs]
def initialisation_kmeans(self, X):
"""
Initializes the parameters for the SkewMM algorithm using the K-means initialization method.
Parameters
==========
X : array-like of shape (n_samples, n_features)
The input data matrix.
default_n_init : int, default='auto'
The number of times the K-means algorithm will be run with different centroid seeds. Default is 'auto'.
Returns
=======
dict : dict
A dictionary containing the initialized parameters:
w : ndarray of shape (n_clusters,)
Cluster weights.
mu : ndarray of shape (n_clusters, n_features)
Cluster means.
s : ndarray of shape (n_clusters, n_features)
Cluster standard deviations.
nu : ndarray of shape (n_clusters, n_features)
Degrees of freedom for each cluster.
la : ndarray of shape (n_clusters, n_features)
Skewness parameters for each cluster.
"""
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=self.n_cluster, n_init=20).fit(X)
mu = kmeans.cluster_centers_
s = np.ones((self.n_cluster, X.shape[1]))
w = np.bincount(kmeans.labels_) / len(kmeans.labels_)
w = np.append(w, 0.2) #add uniform cluster
w = w/ np.sum(w)
la = np.random.uniform(low=1e-6, high=1e-5, size=(self.n_cluster, X.shape[1]))
nu = np.random.uniform(0, 10, size=(self.n_cluster, X.shape[1]))
return w, mu, s, nu, la
[docs]
def initialisation_random(self, X):
"""
Initialize the parameters randomly for the SkewMM algorithm.
Parameters
==========
X : array-like of shape (n_samples, n_features)
Input data matrix.
Returns
=======
w : ndarray of shape (n_clusters,)
Cluster weights.
mu : ndarray of shape (n_clusters, n_features)
Cluster means.
s : ndarray of shape (n_clusters, n_features)
Cluster standard deviations.
nu : ndarray of shape (n_clusters, n_features)
Degrees of freedom for each cluster.
la : ndarray of shape (n_clusters, n_features)
Skewness parameters for each cluster.
"""
w = np.random.uniform(0.1, 1.0, size=self.n_cluster)
w /= np.sum(w)
mu = np.random.uniform(np.min(X, axis=0), np.max(X, axis=0), size=(self.n_cluster, X.shape[1]))
s = np.random.uniform(0.1, 1.0, size=(self.n_cluster, X.shape[1]))
nu = np.random.uniform(4., 10., size=(self.n_cluster, X.shape[1]))
la = np.random.uniform(-2., 2., size=(self.n_cluster, X.shape[1]))
return w, mu, s, nu, la
[docs]
def logt_expr(self, eta2, nu):
return loggamma((nu + 1)/2) - loggamma(nu/2) - 0.5 * np.log(nu * np.pi) - ((nu + 1)/2) * np.log(1 + eta2 / nu)
[docs]
def ST(self, X, mu, s, nu, la):
"""
Compute the posterior probabilities for the SkewT mixture model.
Parameters
==========
X : ndarray
Input data.
w : ndarray
Cluster weights.
mu : ndarray
Cluster means.
s : ndarray
Cluster standard deviations.
nu : ndarray
Degrees of freedom for each cluster.
la : ndarray
Skewness parameters for each cluster.
Returns
=======
Z : ndarray
Posterior probabilities for each cluster.
"""
Skew = np.ones((self.n_cluster, X.shape[0]))
if X.ndim == 1:
eta = (X[None, :] - mu[:, None]) / s[:, None]
logt_val = self.logt_expr(eta**2, nu[:, None])
lau = la[:, None] * eta
t0 = (nu[:, None] + 1) / (nu[:, None]+ eta**2)
c0 = lau * np.sqrt(t0)
Tc0 = t.cdf(c0, df=nu[:, None] + 1)
Skew = 2 * np.exp(logt_val) * Tc0 / s[:, None]
else:
for j in range(X.shape[1]):
eta = (X[:, j][None, :] - mu[:, j][:, None]) / s[:, j][:, None]
logt_val = self.logt_expr(eta**2, nu[:, j][:, None])
lau = la[:, j][:, None] * eta
t0 = (nu[:, j][:, None] + 1) / (nu[:, j][:, None]+ eta**2)
c0 = lau * np.sqrt(t0)
Tc0 = t.cdf(c0, df=nu[:, j][:, None] + 1)
Skew *= 2 * np.exp(logt_val) * Tc0 / s[:, j][:, None]
return Skew
def LL(self, Z):
"""
Compute the log-likelihood of the model.
Parameters
==========
Z : ndarray
Posterior probabilities for each cluster.
Returns
=======
logli_new : float
Log-likelihood of the model.
"""
ind_den = np.sum(Z, axis=0)
logli_new = np.sum(np.log(ind_den), axis=0)
return logli_new
def phi(self, x, alpha, mean, sig, dl, lamb):
"""
Compute the posterior probabilities for the SkewT mixture model.
Parameters
==========
x : array-like, shape (n_samples, n_features)
The input data.
alpha : array-like, shape (n_clusters,)
The weights of the clusters.
mean : array-like, shape (n_clusters, n_features)
The means of the clusters.
sig : array-like, shape (n_clusters, n_features)
The standard deviations of the clusters.
dl : array-like, shape (n_clusters, n_features)
The degrees of freedom for each cluster.
lamb : array-like, shape (n_clusters, n_features)
The skewness parameters for each cluster.
Returns
=======
p : array-like, shape (n_clusters, n_samples)
The posterior probabilities for each cluster.
"""
p=np.zeros((self.n_cluster+1, x.shape[0]))
p[:-1, :]=alpha[:-1][:, None] * self.ST( x[:, :], mean[:, :], sig[:, :], dl, lamb)
volume = np.prod(x.max(axis=0) - x.min(axis=0))
p[-1, :] = alpha[-1] / volume
p = np.maximum(p, 1e-30)
return p
[docs]
def fit(self, X, tol=1e-6, max_iter=200, init_method='gmm', parametre=None, verbose=0):
"""
Fit the SkewT mixture model to the data.
Parameters
==========
X : array-like, shape (n_samples, n_features)
The input data.
tol : float, default=1e-6
Tolerance for convergence.
max_iter : int, default=200
Maximum number of iterations for the EM algorithm.
"""
if self.init_method == 'gmm':
best_params = None
best_LL = -np.inf
params_to_test = [
self.initialisation_gmm(X) for _ in range(self.n_init_gmm)
]
for params in params_to_test:
w, mu, s, nu, la = params
Z = self.phi(X, w, mu, s, nu, la)
LL = self.LL(Z)
if LL > best_LL:
best_LL = LL
best_params = params
w, mu, s, nu, la = best_params
elif self.init_method == 'kmeans':
w, mu, s, nu, la = self.initialisation_kmeans(X)
elif self.init_method == 'random':
w, mu, s, nu, la = self.initialisation_random(X)
elif self.init_method == 'parametre':
w = parametre['alpha']
mu = parametre['mu']
s = parametre['sig']
la = parametre['lamb']
nu = parametre['nu']
else :
raise ValueError(f"Error: The initialization method {self.init_method} is not recognized, please choose from 'params', 'kmeans', 'random' or 'gmm' ")
if X.ndim == 1:
n, dim = len(X), 1
else:
n, dim = X.shape[0], X.shape[1]
iter = 0
eta = (X[:, :][None, :] - mu[:, :][:, None]) / s[:, :][:, None]
eta2 = eta**2
lau = la[:, None] * eta
t0 = (nu[:, None] + 1) / (nu[:, None] + eta2)
c0 = lau * np.sqrt(t0)
Tc0 = t.cdf(c0, df=nu[:, None] + 1)
Z = self.phi(X, w, mu, s, nu, la)
logli_old = self.LL(Z)
self.logli = [self.LL(Z)]
while True:
# E-step
if verbose==1:
print(f'iteration: {iter}/{max_iter}')
iter += 1
ind_den = np.sum(Z, axis=0)
Z /= ind_den
c2 = la[:, None] * eta * np.sqrt((nu[:, None] + 3) / (nu[:, None] + eta2))
Tc2 = t.cdf(c2, df=nu[:, None] + 3)
tau = t0 * Tc2 / Tc0
de = la / np.sqrt(1 + la**2)
al = s[:, :] * de
hs1 = (lau * tau + t.pdf(c0, df=nu[:, None] + 1) / Tc0 * np.sqrt(t0)) * np.sqrt(1 - de[:, None]**2)
hs2 = 1 - de[:, None]**2 + de[:, None] * eta * hs1
# M-step
ni= np.sum(Z, axis=1)
w = ni / n
# Mise a jour de la skew
mu[:, :] = np.sum(Z[:-1, :, None] * (tau * X[:, :] - (al[:, None] * hs1)), axis=1) / np.sum(Z[:-1, :, None] * tau, axis=1)
cent = (X[:, :][None, :] - mu[:, :][:, None])
Szs2 = np.sum(Z[:-1, :, None] * hs2, axis=1)
al = np.sum(Z[:-1, :, None] * hs1 * cent, axis=1) / Szs2
ka2 = np.sum(Z[:-1, :, None] * (tau * cent**2 - 2 * al[:, None] * hs1 * cent + (al**2)[:, None] * hs2), axis=1) / ni[:-1, None]
s[:, :] = np.sqrt(ka2 + al**2)
de = al / s[:, :]
la = de / np.sqrt(1 - de**2)
for i in range(self.n_cluster):
if dim == 1:
def CML_nu(v):
cdf_vals = t.cdf(la[i] * np.sqrt((v + 1) / (v + eta2[i, :])), df=v + 1)
log_t = self.logt_expr(eta2[i, :], v)
return -np.sum(Z[i, :] * (log_t + np.log(cdf_vals)))
res = minimize_scalar(CML_nu, bounds=(1e-6, 100), method='bounded')
nu[i] = res.x
else:
for j in range(dim):
def CML_nu(v):
cdf_vals = t.cdf(la[i, j] * np.sqrt((v + 1) / (v + eta2[i, :, j])), df=v + 1)
log_t = self.logt_expr(eta2[i, :, j], v)
return -np.sum(Z[i, :] * (log_t + np.log(cdf_vals)))
res = minimize_scalar(CML_nu, bounds=(1e-6, 100), method='bounded')
nu[i, j] = res.x
eta = (X[:, :][None, :] - mu[:, :][:, None]) / s[:, :][:, None]
eta2 = eta**2
lau = la[:, None] * eta
t0 = (nu[:, None] + 1) / (nu[:, None] + eta2)
c0 = lau * np.sqrt(t0)
Tc0 = t.cdf(c0, df=nu[:, None] + 1)
Z = self.phi(X, w, mu, s, nu, la)
logli_new = self.LL(Z)
diff = logli_new - logli_old
if diff < tol or iter >= max_iter:
print('tolérance atteinte')
break
if logli_new < logli_old:
print('logli_new', logli_new)
print('logli_old', logli_old)
print('diminution du logli')
break
if iter == max_iter:
print('nombre d iterations atteint')
break
logli_old = logli_new
self.logli.append(logli_new)
ind_den = np.sum(Z, axis=0)
Z /= ind_den
self.tik = Z/ ind_den
self.mean = mu
self.sigma = s
self.dl = nu
self.lamb = la
self.alpha = w
return self
[docs]
def predict_proba(self, X):
"""
Predict the posterior probabilities for the data.
Parameters
==========
X : array-like, shape (n_samples, n_features)
The input data.
Returns
=======
proba : ndarray, shape (n_samples, n_clusters)
The posterior probabilities for each cluster.
"""
# Implementation of the predict_proba method
Z = self.phi(X, self.alpha, self.mean, self.sigma, self.dl, self.lamb)
ind_den = np.sum(Z, axis=0)
proba = Z / ind_den
return proba
[docs]
def predict(self, X):
"""
Predict the cluster labels for the data.
Parameters
==========
- X (array-like): The input data.
Returns
=======
- labels (array-like): The predicted cluster labels.
"""
# Implementation of the predict method
proba = self.predict_proba(X)
labels=np.array([])
for i in range(proba.shape[1]):
labels = np.append(labels, np.argmax(proba[:, i]))
return labels
[docs]
def ARI(self, y_true, y_pred):
"""
Compute the ARI .
Parameters
==========
- y (array-like): The true labels.
Returns
=======
- ari (float): The Adjusted Rand Index (ARI) score.
"""
from sklearn.metrics import adjusted_rand_score
ari = adjusted_rand_score(y_true, y_pred)
return ari
[docs]
def confusion_matrix(self, y_true, y_pred):
"""
Calculate the confusion matrix.
Parameters
==========
y_true : array-like
The true labels.
y_pred : array-like, default=None
The predicted labels.
Returns
=======
matrix : array-like
The confusion matrix. The last cluster correspond to the uniform cluster.
"""
if y_true is None:
raise ValueError("Error: The true labels are missing, please provide them.")
y_true = y_true.astype(int)
y_pred = np.array(y_pred).astype(int)
# Obtenez les classes uniques
classes = np.unique(np.concatenate((y_true, y_pred)))
# Créez une matrice de confusion initialisée à zéro
matrix = np.zeros((len(np.unique(y_true)), self.n_cluster+1), dtype=int)
# Utilisez np.searchsorted pour obtenir les indices des classes
true_indices = np.searchsorted(classes, y_true)
pred_indices = np.searchsorted(classes, y_pred)
# Mettez à jour la matrice de confusion en comptant les occurrences
for true_idx, pred_idx in zip(true_indices, pred_indices):
matrix[true_idx, pred_idx] += 1
return matrix
[docs]
def BIC(self, X):
"""
Calculate the Bayesian Information Criterion (BIC) for the model.
Parameters
==========
- X (array-like): The input data.
Returns
=======
- bic (float): The BIC value.
"""
# Implementation of the BIC method
n = X.shape[0]
LL = self.logli[-1]
m = 4 * self.n_cluster
bic = -2 * LL + np.log(n) * m
return bic
[docs]
def AIC(self, X, penalty_weight=0.1):
from collections import Counter
# Implementation of the BIC method
y_pred = self.predict(X)
n = X.shape[0]
LL = self.logli[-1]
m = 4 * self.n_cluster
aic = -2 * LL + 2* m
cluster_sizes = Counter(y_pred)
penalty = sum(1 for size in cluster_sizes.values() if size < 5)
return aic * (1 - penalty_weight * penalty)
[docs]
def IUS(self, X, y_pred, bins=3):
"""
Calculate the Indice d'Uniformité de Shannon (IUS) for the model.
This index measures the uniformity of the distribution of data points across clusters.
Parameters
==========
data : ndarray of shape (n_samples, n_features)
bins : int, default=3
Number of bins for the histogram.
Returns
=======
ius : float
The Indice d'Uniformité de Shannon (IUS) value.
"""
probabilites= self.tik[-1,:]
tirage_bernoulli = np.random.binomial(1, probabilites)
label_bernouilli = np.zeros(X.shape[0])
for i in range(X.shape[0]):
if tirage_bernoulli[i]==1:
label_bernouilli[i]= self.n_cluster
else:
label_bernouilli[i]=y_pred[i]
data = X[:, :][label_bernouilli == self.n_cluster]
# Histogramme multidimensionnel
hist, edges = np.histogramdd(data, bins=bins)
# Probabilités (normalisation)
prob = hist / np.sum(hist)
# Entropie (en bits)
prob_nonzero = prob[prob > 0]
H = -np.sum(prob_nonzero * np.log2(prob_nonzero))
# Entropie maximale = log2(N) où N est le nombre total de cases non nulles
N = np.sum(hist)
H_max = np.log2(N) if N > 0 else 1e-12 # pour éviter la division par zéro
# Indice d’uniformité
iuS = H / H_max if H_max > 0 else 0
return iuS
[docs]
def KL(self, X, bins=3):
"""
Calculate the Kullback-Leibler divergence for the model.
This index measures the divergence between the empirical distribution of data points in the uniform cluster and a uniform distribution.
Parameters
==========
data : ndarray of shape (n_samples, n_features)
bins : int, default=3
Number of bins for the histogram.
Returns
=======
kl_div : float
The Kullback-Leibler divergence value.
"""
from scipy.special import rel_entr
y_pred = self.predict(X)
probabilites= self.tik[-1,:]
tirage_bernoulli = np.random.binomial(1, probabilites)
label_bernouilli = np.zeros(X.shape[0])
for i in range(X.shape[0]):
if tirage_bernoulli[i]==1:
label_bernouilli[i]= self.n_cluster
else:
label_bernouilli[i]=y_pred[i]
data = X[:, :][label_bernouilli == self.n_cluster]
hist, _ = np.histogramdd(data, bins=bins)
prob = hist / np.sum(hist)
uniform_prob = 1 / np.prod(hist.shape)
kl_div = np.sum(rel_entr(prob, uniform_prob))
return kl_div
[docs]
def L2(self, X, bins=10, penalty_weight=0.1):
"""
Calculate the L2 distance between the empirical distribution of data points in the uniform cluster and a uniform distribution.
This index measures the distance between the empirical distribution and a uniform distribution.
Parameters
==========
data : ndarray of shape (n_samples, n_features)
bins : int, default=10
Number of bins for the histogram.
penalty_weight : float, default=0.1
Weight for the penalty term based on cluster sizes.
Returns
=======
l2_distance : float
The L2 distance value.
"""
from scipy.spatial.distance import cdist
from collections import Counter
y_pred = self.predict(X)
probabilites= self.tik[-1,:]
tirage_bernoulli = np.random.binomial(1, probabilites)
label_bernouilli = np.zeros(X.shape[0])
for i in range(X.shape[0]):
if tirage_bernoulli[i]==1:
label_bernouilli[i]= self.n_cluster
else:
label_bernouilli[i]=y_pred[i]
data = X[:, :][label_bernouilli == self.n_cluster]
hist, _ = np.histogramdd(data, bins=bins)
prob = hist / np.sum(hist)
uniform_prob = 1 / np.prod(hist.shape)
l2_distance = np.sqrt(np.sum((prob - uniform_prob) ** 2))
cluster_sizes = Counter(y_pred)
penalty = sum(1 for size in cluster_sizes.values() if size < 5)
return l2_distance * (1 - penalty_weight * penalty)
[docs]
def chi2(self, X, bins=3):
"""
Calculate the Chi-squared statistic for the model.
This index measures the goodness of fit between the empirical distribution of data points in the uniform cluster and a uniform distribution.
Parameters
==========
data : ndarray of shape (n_samples, n_features)
bins : int, default=3
Number of bins for the histogram.
Returns
=======
chi2_stat : float
The Chi-squared statistic value.
p_value : float
The p-value associated with the Chi-squared statistic.
"""
from scipy.stats import chisquare
y_pred = self.predict(X)
probabilites= self.tik[-1,:]
tirage_bernoulli = np.random.binomial(1, probabilites)
label_bernouilli = np.zeros(X.shape[0])
for i in range(X.shape[0]):
if tirage_bernoulli[i]==1:
label_bernouilli[i]= self.n_cluster
else:
label_bernouilli[i]=y_pred[i]
data = X[:, [0,1,3]][label_bernouilli == self.n_cluster]
hist, _ = np.histogramdd(data, bins=bins)
observed = hist.flatten()
expected = np.full_like(observed, np.mean(observed))
chi2_stat, p_value = chisquare(f_obs=observed, f_exp=expected)
return chi2_stat, p_value
[docs]
def HARTIGAN(self, X):
"""
Calculate the Hartigan's index for the model.
This index measures the compactness of clusters by summing the squared distances of points from their cluster centers.
Parameters
==========
X : array-like, shape (n_samples, n_features)
The input data.
Returns
=======
W : float
The Hartigan's index value.
"""
W = 0
y_pred = self.predict(X)
for cluster_id in np.unique(y_pred):
cluster_points = X[y_pred == cluster_id]
if len(cluster_points) == 0:
continue # éviter les clusters vides
center = cluster_points.mean(axis=0)
distances = np.linalg.norm(cluster_points - center, axis=1)
W += np.sum(distances ** 2)
return W
[docs]
def save(self, filename: str):
"""
Save the model to a file.
Parameters
==========
filename : str
The name of the file.
"""
import h5py
if not filename.endswith(".h5"):
filename = f"{filename}.h5"
if not filename:
raise ValueError(
"Error: The filename is empty, please provide a valid filename."
)
if not os.path.exists("Models_folder"):
os.makedirs("Models_folder")
with h5py.File(f"Models_folder/{filename}", "w") as f:
f.create_dataset("mean", data=self.mean)
f.create_dataset("sigma", data=self.sigma)
f.create_dataset("dl", data=self.dl)
f.create_dataset("lamb", data=self.lamb)
f.create_dataset("alpha", data=self.alpha)
f.create_dataset("tik", data=self.tik)
f.create_dataset("E_log_likelihood", data=self.logli)
[docs]
def load(self, filename: str):
"""
Load matrices from a given file.
Parameters
==========
filename : str
The path to the file containing the matrices.
"""
import h5py
with h5py.File(filename, "r") as f:
self.mean = f["mean"][:]
self.sigma = f["sigma"][:]
self.dl = f["dl"][:]
self.lamb = f["lamb"][:]
self.alpha = f["alpha"][:]
self.logli = f["E_log_likelihood"][:]