1 背景
2 Kmeans
Kmeans的主要步骤大致分为三步:
- 随机找 k 个点作为质心(种子);
- 计算其他点到这 k 个种子的距离,选择最近的那个作为该点的类别;
- 更新各类的质心,迭代到质心的不变为止。
3 Kmeans++
Kmeans++则对于Kmeans在初始化起始簇类中心的选取方法上进行了优化,其初始化的核心思想还是基于簇间距离大,簇内距离小的思想来完成的,其步骤如下:
-
输入n个点
-
随机一个点做c1
-
计算其余点和c1的距离,以距离大小为正比计算概率,选取下一个质心c
-
重复3,获得k个c
-
把每个点划分到最近的中心(分簇)
-
计算每个簇的中心,作为新的中心
-
重复 5,6,直到任意中心移动距离小于阈值(或重复M次)
在Kmeans初始化时需要用到轮盘赌的实现,下面提供一个C++实现的例子:
#inlcude <math.h>
#include <>
#include <>
#include <>
int select(double bit[]){
double bet = (double)rand()/RAND_MAX
int j = 0;
double psum = 0.0;
while(psum < bet) {
psum += bet[j];
j++;
}
return (j-1);
}
void main() {
double bit[] = {0.1, 0.2, 0.05, 0.15, 0.11, 0.07, 0.04, 0.12, 0.09, 0.07};
int i, j, k = 0;
int num[10];
for(i=0; i<10; i++) {
num[i] = 0;
}
srand(1);
for(j=1; j<=1000; j++) {
k = select(bit);
num[k] += 1;
}
}
3.1 Sklearn源码解析
Kmeans++对于质心的初始化进行了优化,在Sklearn Kmeans类的fit函数中可以找到初始化质心的方法:
def _init_centroids(self, X, x_squared_norms, init, random_state, init_size=None):
n_samples = X.shape[0]
n_clusters = self.n_clusters
if init_size is not None and init_size < n_samples:
init_indices = random_state.randint(0, n_samples, init_size)
X = X[init_indices]
x_squared_norms = x_squared_norms[init_indices]
n_samples = X.shape[0]
if isinstance(init, str) and init == "k-means++":
centers, _ = _kmeans_plusplus(
X,
n_clusters,
random_state=random_state,
x_squared_norms=x_squared_norms,
)
return centers
其中核心部分的代码实现在_kmeans_plusplus中完成,需要说明的是n_local_trials参数表示在选择下一个质心时依照概率随机采样的候选质心的数量,若没有设置这个参数时,算法实现中会默认以2+log(k)来计算候选质心的数量,其中k表示预先设置的质心数量:
def _kmeans_plusplus(X, n_clusters, x_squared_norms, random_state, n_local_trials=None):
"""Computational component for initialization of n_clusters by
k-means++. Prior validation of data is assumed.
Parameters
----------
X : {ndarray, sparse matrix} of shape (n_samples, n_features)
The data to pick seeds for.
n_clusters : int
The number of seeds to choose.
x_squared_norms : ndarray of shape (n_samples,)
Squared Euclidean norm of each data point.
random_state : RandomState instance
The generator used to initialize the centers.
See :term:`Glossary <random_state>`.
n_local_trials : int, default=None
The number of seeding trials for each center (except the first),
of which the one reducing inertia the most is greedily chosen.
Set to None to make the number of trials depend logarithmically
on the number of seeds (2+log(k)); this is the default.
Returns
-------
centers : ndarray of shape (n_clusters, n_features)
The initial centers for k-means.
indices : ndarray of shape (n_clusters,)
The index location of the chosen centers in the data array X. For a
given index and center, X[index] = center.
"""
n_samples, n_features = X.shape
centers = np.empty((n_clusters, n_features), dtype=X.dtype)
# Set the number of local seeding trials if none is given
if n_local_trials is None:
n_local_trials = 2 + int(np.log(n_clusters))
# Pick first center randomly and track index of point
center_id = random_state.randint(n_samples) #随机生成第一个质心的下标
indices = np.full(n_clusters, -1, dtype=int) #下标列表
centers[0] = X[center_id] #将第一个质心设置为选中的下标对应的样本值
indices[0] = center_id #将对应的下标列表的第一个值设置为选中的样本下标
# Initialize list of closest distances and calculate current potential
# 下面调用_euclidean_distances来计算所有样本到当前第一个质心之间的欧式距离
closest_dist_sq = _euclidean_distances(
centers[0, np.newaxis], X, Y_norm_squared=x_squared_norms, squared=True
)
# 计算所有距离的和,这个参数在进行轮盘赌选择时可以将random_sample得到的[0,1)的浮点数映射到[0,current_pot)的范围
# 我们接下来称之为映射系数
current_pot = closest_dist_sq.sum()
# 以下的循环是在找出k-1个初始化质心,也就是Kmeans++的核心实现
for c in range(1, n_clusters):
# 这里的random_state其实是numpy库中的RandomState(htps:///doc/stable/reference/random/?highlight=randomstate#),这里的rand_vals的数量与n_local_trials相同
rand_vals = random_state.random_sample(n_local_trials) * current_pot
# 下面的searchsorted就是找到在累加列表中第一个累加值大于当前随机值的样本下标
candidate_ids = np.searchsorted(stable_cumsum(closest_dist_sq), rand_vals)
# XXX: numerical imprecision can result in a candidate_id out of range
np.clip(candidate_ids, None, closest_dist_sq.size - 1, out=candidate_ids)
# Compute distances to center candidates
# 下面计算所有样本点到当前选中的候选质心的欧式距离
distance_to_candidates = _euclidean_distances(
X[candidate_ids], X, Y_norm_squared=x_squared_norms, squared=True
)
# update closest distances squared and potential for each candidate
# 更新欧式距离,用目前最小距离值(每个样本点到每个质心中距离最小的那个值)作为每个样本点到质心的距离
np.minimum(closest_dist_sq, distance_to_candidates, out=distance_to_candidates)
# 计算所有候选质心轮盘赌选择映射的范围值(因为上一步每个样本到质心的最小距离被更新了)
candidates_pot = distance_to_candidates.sum(axis=1)
# Decide which candidate is the best
best_candidate = np.argmin(candidates_pot) #选择候选中距离和最小的样本点值在候选池中的下标
current_pot = candidates_pot[best_candidate] #更新下一次迭代轮盘赌需要用到的
closest_dist_sq = distance_to_candidates[best_candidate] #更新最佳的样本点到质心距离列表
best_candidate = candidate_ids[best_candidate] #更新最佳质心在样本集中的下标
centers[c] = X[best_candidate] #将当前寻找的质心更新为目前最佳的候选质心
indices[c] = best_candidate #更新最佳质心对应到样本集中的下标
return centers, indices
上面的实现中会有一个困惑就是为什么在选择候选质心时通过轮盘赌选择依照概率来选择候选质心,在最后确定质心时又对closest_dist_sq和distance_to_candidates取最小并求和来选择最终的质心呢?其实这两步操作就体现了聚类算法的核心思想,即簇间距离尽量大(依照距离和概率的正比关系选择候选质心),簇内距离尽量小(最终的最优质心就是能够使每个样本依据当前的候选质心和已确定的质心进行簇分配后计算的距离之和最小的那个候选质心)
接下来我们需要详细的了解一下上面提到的_euclidean_distances方法,其实现如下:
def _euclidean_distances(X, Y, X_norm_squared=None, Y_norm_squared=None, squared=False):
"""Computational part of euclidean_distances
Assumes inputs are already checked.
If norms are passed as float32, they are unused. If arrays are passed as
float32, norms needs to be recomputed on upcast chunks.
TODO: use a float64 accumulator in row_norms to avoid the latter.
"""
if X_norm_squared is not None:
if X_norm_squared.dtype == np.float32:
XX = None
else:
XX = X_norm_squared.reshape(-1, 1)
elif X.dtype == np.float32:
XX = None
else:
XX = row_norms(X, squared=True)[:, np.newaxis]
if Y is X:
YY = None if XX is None else XX.T
else:
if Y_norm_squared is not None:
if Y_norm_squared.dtype == np.float32:
YY = None
else:
YY = Y_norm_squared.reshape(1, -1)
elif Y.dtype == np.float32:
YY = None
else:
YY = row_norms(Y, squared=True)[np.newaxis, :]
if X.dtype == np.float32:
# To minimize precision issues with float32, we compute the distance
# matrix on chunks of X and Y upcast to float64
distances = _euclidean_distances_upcast(X, XX, Y, YY)
else:
# if dtype is already float64, no need to chunk and upcast
distances = -2 * safe_sparse_dot(X, Y.T, dense_output=True)
distances += XX
distances += YY
np.maximum(distances, 0, out=distances)
# Ensure that distances between vectors and themselves are set to 0.0.
# This may not be the case due to floating point rounding errors.
if X is Y:
np.fill_diagonal(distances, 0)
return distances if squared else np.sqrt(distances, out=distances)
首先我们来看一下欧式距离的计算公式,其中X和Y为两个m维:
D
(
X
,
Y
)
=
∑
i
=
0
m
(
x
i
−
y
i
)
2
D(X,Y)=\sqrt{\sum_{i=0}^{m}(x_i-y_i)^2}
D(X,Y)=i=0∑m(xi−yi)2
在上面的代码实现当中其实将上面的公式进行了展开:
D
(
X
,
Y
)
=
∑
i
=
0
m
x
i
2
+
∑
i
=
0
m
y
i
2
−
2
∑
i
=
0
m
x
i
y
i
D(X,Y)=\sqrt{\sum_{i=0}^{m}x_i^2+\sum_{i=0}^{m}y_i^2-2\sum_{i=0}^{m}x_iy_i}
D(X,Y)=i=0∑mxi2+i=0∑myi2−2i=0∑mxiyi
在计算上面的代码中变量XX和YY其实计算的就是上面公式中根号中的前两项,调用的是row_norms,实现如下:
def row_norms(X, squared=False):
"""Row-wise (squared) Euclidean norm of X.
Equivalent to ((X * X).sum(axis=1)), but also supports sparse
matrices and does not create an -sized temporary.
Performs no input validation.
Parameters
----------
X : array-like
The input array.
squared : bool, default=False
If True, return squared norms.
Returns
-------
array-like
The row-wise (squared) Euclidean norm of X.
"""
if sparse.issparse(X):
if not isinstance(X, sparse.csr_matrix):
X = sparse.csr_matrix(X)
norms = csr_row_norms(X)
else:
norms = np.einsum("ij,ij->i", X, X) #这一步就是取得X*的迹,其实就是获取每一个样本各维度的平方和
if not squared:
np.sqrt(norms, norms)
return norms
另一个函数safe_sparse_dot的实现如下:
def safe_sparse_dot(a, b, *, dense_output=False):
"""Dot product that handle the sparse matrix case correctly.
Parameters
----------
a : {ndarray, sparse matrix}
b : {ndarray, sparse matrix}
dense_output : bool, default=False
When False, ``a`` and ``b`` both being sparse will yield sparse output.
When True, output will always be a dense array.
Returns
-------
dot_product : {ndarray, sparse matrix}
Sparse if ``a`` and ``b`` are sparse and ``dense_output=False``.
"""
if a.ndim > 2 or b.ndim > 2:
if sparse.issparse(a):
# sparse is always 2D. Implies b is 3D+
# [i, j] @ [k, ..., l, m, n] -> [i, k, ..., l, n]
b_ = np.rollaxis(b, -2)
b_2d = b_.reshape((b.shape[-2], -1))
ret = a @ b_2d
ret = ret.reshape(a.shape[0], *b_.shape[1:])
elif sparse.issparse(b):
# sparse is always 2D. Implies a is 3D+
# [k, ..., l, m] @ [i, j] -> [k, ..., l, j]
a_2d = a.reshape(-1, a.shape[-1])
ret = a_2d @ b
ret = ret.reshape(*a.shape[:-1], b.shape[1])
else:
ret = np.dot(a, b) #根据上面的参数传入a=X[candidate_ids],b=
else:
ret = a @ b #计算点积
if (
sparse.issparse(a)
and sparse.issparse(b)
and dense_output
and hasattr(ret, "toarray")
):
return ret.toarray()
return ret
4 Elkan Kmeans
在传统的K-Means算法中,我们在每轮迭代时,要计算所有的样本点到所有的质心的距离,这样会比较的耗时。elkan K-Means利用了两边之和大于等于第三边,以及两边之差小于第三边的三角形性质,来减少距离的计算。
第一种规律是对于一个样本点x和两个质心μj1,μj2。如果我们预先计算出了这两个质心之间的距离D(j1,j2),则如果计算发现2D(x,j1)≤D(j1,j2),就可以知道D(x,j1)≤D(x,j2)。此时我们不需要再计算D(x,j2)。
第二种规律是对于一个样本点x和两个质心μj1,μj2。我们可以得到D(x,j2)≥max{0,D(x,j1)−D(j1,j2)}。
4.1 Sklearn源码解析
Elkan-Kmeans的实现逻辑可以从Kmeans类的fit方法开始入手,其主要实现如下:
def fit(self, X, y=None, sample_weight=None):
"""Compute k-means clustering.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training instances to cluster. It must be noted that the data
will be converted to C ordering, which will cause a memory
copy if the given data is not C-contiguous.
If a sparse matrix is passed, a copy will be made if it's not in
CSR format.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
.. versionadded:: 0.20
Returns
-------
self : object
Fitted estimator.
"""
self._check_params(X)
random_state = check_random_state(self.random_state)
sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype) #如果sample_weights是None那将赋值为全1的数组
self._n_threads = _openmp_effective_n_threads()
...
if self._algorithm == "full":
kmeans_single = _kmeans_single_lloyd
self._check_mkl_vcomp(X, X.shape[0])
else:
kmeans_single = _kmeans_single_elkan
for i in range(self._n_init):
# Initialize centers
centers_init = self._init_centroids(
X, x_squared_norms=x_squared_norms, init=init, random_state=random_state
)
if self.verbose:
print("Initialization complete")
# run a k-means once
labels, inertia, centers, n_iter_ = kmeans_single(
X,
sample_weight,
centers_init,
max_iter=self.max_iter,
verbose=self.verbose,
tol=self._tol,
x_squared_norms=x_squared_norms,
n_threads=self._n_threads,
)
# determine if these results are the best so far
# we chose a new run if it has a better inertia and the clustering is
# different from the best so far (it's possible that the inertia is
# slightly better even if the clustering is the same with potentially
# permuted labels, due to rounding errors)
if best_inertia is None or (
inertia < best_inertia
and not _is_same_clustering(labels, best_labels, self.n_clusters)
):
best_labels = labels
best_centers = centers
best_inertia = inertia
best_n_iter = n_iter_
if not sp.issparse(X):
if not self.copy_x:
X += X_mean
best_centers += X_mean
distinct_clusters = len(set(best_labels))
if distinct_clusters < self.n_clusters:
warnings.warn(
"Number of distinct clusters ({}) found smaller than "
"n_clusters ({}). Possibly due to duplicate points "
"in X.".format(distinct_clusters, self.n_clusters),
ConvergenceWarning,
stacklevel=2,
)
self.cluster_centers_ = best_centers
self.labels_ = best_labels
self.inertia_ = best_inertia
self.n_iter_ = best_n_iter
return self
elkan算法的入口是_kmeans_single_elkan,其实现如下:
def _kmeans_single_elkan(
X,
sample_weight,
centers_init,
max_iter=300,
verbose=False,
x_squared_norms=None,
tol=1e-4,
n_threads=1,
):
"""A single run of k-means elkan, assumes preparation completed prior.
Parameters
----------
X : {ndarray, sparse matrix} of shape (n_samples, n_features)
The observations to cluster. If sparse matrix, must be in CSR format.
sample_weight : array-like of shape (n_samples,)
The weights for each observation in X.
centers_init : ndarray of shape (n_clusters, n_features)
The initial centers.
max_iter : int, default=300
Maximum number of iterations of the k-means algorithm to run.
verbose : bool, default=False
Verbosity mode.
x_squared_norms : array-like, default=None
Precomputed x_squared_norms.
tol : float, default=1e-4
Relative tolerance with regards to Frobenius norm of the difference
in the cluster centers of two consecutive iterations to declare
convergence.
It's not advised to set `tol=0` since convergence might never be
declared due to rounding errors. Use a very small number instead.
n_threads : int, default=1
The number of OpenMP threads to use for the computation. Parallelism is
sample-wise on the main cython loop which assigns each sample to its
closest center.
Returns
-------
centroid : ndarray of shape (n_clusters, n_features)
Centroids found at the last iteration of k-means.
label : ndarray of shape (n_samples,)
label[i] is the code or index of the centroid the
i'th observation is closest to.
inertia : float
The final value of the inertia criterion (sum of squared distances to
the closest centroid for all observations in the training set).
n_iter : int
Number of iterations run.
"""
n_samples = X.shape[0] #样本的数量
n_clusters = centers_init.shape[0] #质心的数量
# Buffers to avoid new allocations at each iteration.
centers = centers_init
centers_new = np.zeros_like(centers)
weight_in_clusters = np.zeros(n_clusters, dtype=X.dtype)
labels = np.full(n_samples, -1, dtype=np.int32) #每个样本对应的簇标签
labels_old = labels.copy()
center_half_distances = euclidean_distances(centers) / 2 #计算所有质心之间的欧式距离的一半,得到的结果应该是一个N*N的矩阵
distance_next_center = np.partition(
np.asarray(center_half_distances), kth=1, axis=0
)[1] #找到与每个质心最近的其余质心的距离
upper_bounds = np.zeros(n_samples, dtype=X.dtype) #存储每个样本点到最近的质心间的距离
# 存储每个样本点到每一个质心的距离,但是这有一个前提也就是elkan算法的核心, 只有在D(x, u1) > D(u1, u2)/2时才有必要计算D(x, u2)
lower_bounds = np.zeros((n_samples, n_clusters), dtype=X.dtype)
center_shift = np.zeros(n_clusters, dtype=X.dtype) #上一次迭代到这一次迭代质心距离的偏移量
init_bounds = init_bounds_dense
elkan_iter = elkan_iter_chunked_dense
_inertia = _inertia_dense
init_bounds(X, centers, center_half_distances, labels, upper_bounds, lower_bounds)
strict_convergence = False
for i in range(max_iter):
elkan_iter(
X,
sample_weight,
centers,
centers_new,
weight_in_clusters,
center_half_distances,
distance_next_center,
upper_bounds,
lower_bounds,
labels,
center_shift,
n_threads,
) #第一次迭代时update_centers使用默认的True
# compute new pairwise distances between centers and closest other
# center of each center for next iterations
center_half_distances = euclidean_distances(centers_new) / 2
distance_next_center = np.partition(
np.asarray(center_half_distances), kth=1, axis=0
)[1]
if verbose:
inertia = _inertia(X, sample_weight, centers, labels, n_threads)
print(f"Iteration {i}, inertia {inertia}")
centers, centers_new = centers_new, centers
if np.array_equal(labels, labels_old):
# First check the labels for strict convergence.
if verbose:
print(f"Converged at iteration {i}: strict convergence.")
strict_convergence = True
break
else:
# No strict convergence, check for tol based convergence.
center_shift_tot = (center_shift ** 2).sum()
if center_shift_tot <= tol:
if verbose:
print(
f"Converged at iteration {i}: center shift "
f"{center_shift_tot} within tolerance {tol}."
)
break
labels_old[:] = labels
if not strict_convergence:
# rerun E-step so that predicted labels match cluster centers
elkan_iter(
X,
sample_weight,
centers,
centers,
weight_in_clusters,
center_half_distances,
distance_next_center,
upper_bounds,
lower_bounds,
labels,
center_shift,
n_threads,
update_centers=False,
)
inertia = _inertia(X, sample_weight, centers, labels, n_threads)
return labels, inertia, centers, i + 1
以上实现中较为关键的三个方法包括init_bounds_dense、elkan_iter_chunked_dense、_inertia_dense,为了提升运行时的效率,这几个关键方法的实现都是通过Cython语言来实现的。
2.1 init_bounds_dense的实现
from cython cimport floating
from cython.parallel import prange, parallel
...
def init_bounds_dense(
floating[:, ::1] X, # IN READ-ONLY
floating[:, ::1] centers, # IN
floating[:, ::1] center_half_distances, # IN
int[::1] labels, # OUT
floating[::1] upper_bounds, # OUT
floating[:, ::1] lower_bounds): # OUT
"""Initialize upper and lower bounds for each sample for dense input data.
Given X, centers and the pairwise distances divided by 2.0 between the
centers this calculates the upper bounds and lower bounds for each sample.
The upper bound for each sample is set to the distance between the sample
and the closest center.
The lower bound for each sample is a one-dimensional array of n_clusters.
For each sample i assume that the previously assigned cluster is c1 and the
previous closest distance is dist, for a new cluster c2, the
lower_bound[i][c2] is set to distance between the sample and this new
cluster, if and only if dist > center_half_distances[c1][c2]. This prevents
computation of unnecessary distances for each sample to the clusters that
it is unlikely to be assigned to.
Parameters
----------
X : ndarray of shape (n_samples, n_features), dtype=floating
The input data.
centers : ndarray of shape (n_clusters, n_features), dtype=floating
The cluster centers.
center_half_distances : ndarray of shape (n_clusters, n_clusters), \
dtype=floating
The half of the distance between any 2 clusters centers.
labels : ndarray of shape(n_samples), dtype=int
The label for each sample. This array is modified in place.
upper_bounds : ndarray of shape(n_samples,), dtype=floating
The upper bound on the distance between each sample and its closest
cluster center. This array is modified in place.
lower_bounds : ndarray, of shape(n_samples, n_clusters), dtype=floating
The lower bound on the distance between each sample and each cluster
center. This array is modified in place.
"""
cdef:
int n_samples = X.shape[0]
int n_clusters = centers.shape[0]
int n_features = X.shape[1]
floating min_dist, dist
int best_cluster, i, j
# 下面利用prange进行并行计算每一个样本到每一个质心间的距离,并找到最佳的那个质心和距离以及簇标签
for i in prange(n_samples, schedule='static', nogil=True):
best_cluster = 0
min_dist = _euclidean_dense_dense(&X[i, 0], ¢ers[0, 0],
n_features, False)
lower_bounds[i, 0] = min_dist
for j in range(1, n_clusters):
# 这里利用D(x, u1) > D(u1, u2)/2时才需要计算D(x, u2)来减少计算量
if min_dist > center_half_distances[best_cluster, j]:
dist = _euclidean_dense_dense(&X[i, 0], ¢ers[j, 0],
n_features, False)
lower_bounds[i, j] = dist
if dist < min_dist:
min_dist = dist
best_cluster = j
labels[i] = best_cluster #为当前遍历的样本设置标签(最佳的质心所在的下标)
upper_bounds[i] = min_dist
上面代码中依赖距离的计算,其实现如下:
cdef floating _euclidean_dense_dense(
floating* a, # IN
floating* b, # IN
int n_features,
bint squared) nogil:
"""Euclidean distance between a dense and b dense"""
cdef:
int i
int n = n_features // 4
int rem = n_features % 4
floating result = 0
# We manually unroll the loop for better cache optimization.
for i in range(n):
result += ((a[0] - b[0]) * (a[0] - b[0])
+(a[1] - b[1]) * (a[1] - b[1])
+(a[2] - b[2]) * (a[2] - b[2])
+(a[3] - b[3]) * (a[3] - b[3]))
a += 4; b += 4
for i in range(rem):
result += (a[i] - b[i]) * (a[i] - b[i])
return result if squared else sqrt(result)
2.2 elkan_iter_chunked_dense
def elkan_iter_chunked_dense(
floating[:, ::1] X, # IN READ-ONLY
floating[::1] sample_weight, # IN READ-ONLY
floating[:, ::1] centers_old, # IN
floating[:, ::1] centers_new, # OUT
floating[::1] weight_in_clusters, # OUT
floating[:, ::1] center_half_distances, # IN
floating[::1] distance_next_center, # IN
floating[::1] upper_bounds, # INOUT
floating[:, ::1] lower_bounds, # INOUT
int[::1] labels, # INOUT
floating[::1] center_shift, # OUT
int n_threads,
bint update_centers=True):
"""Single iteration of K-means Elkan algorithm with dense input.
Update labels and centers (inplace), for one iteration, distributed
over data chunks.
Parameters
----------
X : ndarray of shape (n_samples, n_features), dtype=floating
The observations to cluster.
sample_weight : ndarray of shape (n_samples,), dtype=floating
The weights for each observation in X.
centers_old : ndarray of shape (n_clusters, n_features), dtype=floating
Centers before previous iteration, placeholder for the centers after
previous iteration.
centers_new : ndarray of shape (n_clusters, n_features), dtype=floating
Centers after previous iteration, placeholder for the new centers
computed during this iteration.
weight_in_clusters : ndarray of shape (n_clusters,), dtype=floating
Placeholder for the sums of the weights of every observation assigned
to each center.
center_half_distances : ndarray of shape (n_clusters, n_clusters), \
dtype=floating
Half pairwise distances between centers.
distance_next_center : ndarray of shape (n_clusters,), dtype=floating
Distance between each center its closest center.
upper_bounds : ndarray of shape (n_samples,), dtype=floating
Upper bound for the distance between each sample and its center,
updated inplace.
lower_bounds : ndarray of shape (n_samples, n_clusters), dtype=floating
Lower bound for the distance between each sample and each center,
updated inplace.
labels : ndarray of shape (n_samples,), dtype=int
labels assignment.
center_shift : ndarray of shape (n_clusters,), dtype=floating
Distance between old and new centers.
n_threads : int
The number of threads to be used by openmp.
update_centers : bool
- If True, the labels and the new centers will be computed, . runs
the E-step and the M-step of the algorithm.
- If False, only the labels will be computed, runs the E-step of
the algorithm. This is useful especially when calling predict on a
fitted model.
"""
cdef:
int n_samples = X.shape[0]
int n_features = X.shape[1]
int n_clusters = centers_new.shape[0]
# hard-coded number of samples per chunk. Splitting in chunks is
# necessary to get parallelism. Chunk size chosen to be same as lloyd's
int n_samples_chunk = CHUNK_SIZE if n_samples > CHUNK_SIZE else n_samples
int n_chunks = n_samples // n_samples_chunk
int n_samples_rem = n_samples % n_samples_chunk
int chunk_idx, n_samples_chunk_eff
int start, end
int i, j, k
floating *centers_new_chunk
floating *weight_in_clusters_chunk
# count remainder chunk in total number of chunks
n_chunks += n_samples != n_chunks * n_samples_chunk
# number of threads should not be bigger than number of chunks
n_threads = min(n_threads, n_chunks)
if update_centers:
memset(¢ers_new[0, 0], 0, n_clusters * n_features * sizeof(floating))
memset(&weight_in_clusters[0], 0, n_clusters * sizeof(floating))
# 并行的处理每一个chunk内的样本
with nogil, parallel(num_threads=n_threads):
# thread local buffers
centers_new_chunk = <floating*> calloc(n_clusters * n_features, sizeof(floating))
weight_in_clusters_chunk = <floating*> calloc(n_clusters, sizeof(floating))
for chunk_idx in prange(n_chunks, schedule='static'):
start = chunk_idx * n_samples_chunk
if chunk_idx == n_chunks - 1 and n_samples_rem > 0:
end = start + n_samples_rem
else:
end = start + n_samples_chunk
_update_chunk_dense(
X[start: end],
sample_weight[start: end],
centers_old,
center_half_distances,
distance_next_center,
labels[start: end],
upper_bounds[start: end],
lower_bounds[start: end],
centers_new_chunk,
weight_in_clusters_chunk,
update_centers)
# reduction from local buffers. The gil is necessary for that to avoid
# race conditions.
if update_centers:
with gil:
for j in range(n_clusters):
weight_in_clusters[j] += weight_in_clusters_chunk[j]
for k in range(n_features):
centers_new[j, k] += centers_new_chunk[j * n_features + k]
free(centers_new_chunk)
free(weight_in_clusters_chunk)
if update_centers:
_relocate_empty_clusters_dense(X, sample_weight, centers_old,
centers_new, weight_in_clusters, labels)
_average_centers(centers_new, weight_in_clusters)
_center_shift(centers_old, centers_new, center_shift)
# update lower and upper bounds
for i in range(n_samples):
upper_bounds[i] += center_shift[labels[i]]
for j in range(n_clusters):
lower_bounds[i, j] -= center_shift[j]
if lower_bounds[i, j] < 0:
lower_bounds[i, j] = 0
其中比较重要的部分是方法_update_chunk_dense
cdef void _update_chunk_dense(
floating[:, ::1] X, # IN READ-ONLY
floating[::1] sample_weight, # IN READ-ONLY
floating[:, ::1] centers_old, # IN
floating[:, ::1] center_half_distances, # IN
floating[::1] distance_next_center, # IN
int[::1] labels, # INOUT
floating[::1] upper_bounds, # INOUT
floating[:, ::1] lower_bounds, # INOUT
floating *centers_new, # OUT
floating *weight_in_clusters, # OUT
bint update_centers) nogil:
"""K-means combined EM step for one dense data chunk.
Compute the partial contribution of a single data chunk to the labels and
centers.
"""
cdef:
int n_samples = labels.shape[0]
int n_clusters = centers_old.shape[0]
int n_features = centers_old.shape[1]
floating upper_bound, distance
int i, j, k, label
for i in range(n_samples):
upper_bound = upper_bounds[i] #当前样本到所在簇的距离
bounds_tight = 0
label = labels[i] #当前样本属于的簇标签
# Next center is not far away from the currently assigned center.
# Sample might need to be assigned to another center.
# 当前样本距离目前所在簇的质心距离如果比当前簇质心和其最近的质心的距离还要大时,那么当前的样本点可能是属于另一个簇的
if not distance_next_center[label] >= upper_bound:
for j in range(n_clusters):
# If this holds, then center_index is a good candidate for the
# sample to be relabelled, and we need to confirm this by
# recomputing the upper and lower bounds.
if (j != label
and (upper_bound > lower_bounds[i, j]) #如果当前样本到其所在簇的距离大于当前样本到第j个簇的距离
and (upper_bound > center_half_distances[label, j])): #如果当前样本到其所在簇的距离大于当前样本所在簇质心到j簇质心的距离的一半
# Recompute upper bound by calculating the actual distance
# between the sample and its current assigned center.
if not bounds_tight:
upper_bound = _euclidean_dense_dense(
&X[i, 0], ¢ers_old[label, 0], n_features, False)
lower_bounds[i, label] = upper_bound
bounds_tight = 1
# If the condition still holds, then compute the actual
# distance between the sample and center. If this is less
# than the previous distance, reassign label.
# 如果重新计算upper_bound和lower_bounds[i, label]后仍然能够满足上面的两个条件则修改当前样本所在的簇
if (upper_bound > lower_bounds[i, j]
or (upper_bound > center_half_distances[label, j])):
distance = _euclidean_dense_dense(
&X[i, 0], ¢ers_old[j, 0], n_features, False)
lower_bounds[i, j] = distance
if distance < upper_bound:
label = j
upper_bound = distance
labels[i] = label
upper_bounds[i] = upper_bound
if update_centers:
weight_in_clusters[label] += sample_weight[i]
for k in range(n_features):
centers_new[label * n_features + k] += X[i, k] * sample_weight[i]
2.3 _inertia_dense
_inertia_dense的实现就是在计算如下公式,其中C表示设定的簇的数量,i表示当前j簇内的第i个样本,
μ
j
\mu_j
μj表示第j个簇的质心:
∑
j
=
0
C
∑
i
=
0
;
i
∈
j
(
x
i
−
μ
j
)
2
\sum_{j=0}^{C}\sum_{i=0 ;i\in j}\sqrt{(x_i-\mu_j)^2}
j=0∑Ci=0;i∈j∑(xi−μj)2
cpdef floating _inertia_dense(
floating[:, ::1] X, # IN READ-ONLY
floating[::1] sample_weight, # IN READ-ONLY
floating[:, ::1] centers, # IN
int[::1] labels, # IN
int n_threads):
"""Compute inertia for dense input data
Sum of squared distance between each sample and its assigned center.
"""
cdef:
int n_samples = X.shape[0]
int n_features = X.shape[1]
int i, j
floating sq_dist = 0.0
floating inertia = 0.0
for i in prange(n_samples, nogil=True, num_threads=n_threads,
schedule='static'):
j = labels[i]
sq_dist = _euclidean_dense_dense(&X[i, 0], ¢ers[j, 0],
n_features, True)
inertia += sq_dist * sample_weight[i]
return inertia