Implement K-Means Clustering
Introduction
KMeans clustering is a fundamental unsupervised learning algorithm used to partition a given dataset into K distinct, non-overlapping subsets (clusters). The goal is to determine the best way to group data points into clusters based on their similarity. A key part of this algorithm involves calculating the Euclidean distance between points to measure similarity.
Your task is to implement the KMeans clustering algorithm from scratch in Python. This includes creating classes for both the KMeans algorithm and the centroids involved in the clustering process.
Visit our Model & Algorithm Fundamentals module to master everything you need to know about KMeans for interviews.
Use NumPy for numerical operations for efficiency.
KMeans Class
The KMeans class should encapsulate the entire clustering process.
- Attributes:
n_features: The number of features in the dataset (dimensionality).k: The number of clusters to form.centroids: A list ofCentroidobjects representing the centers of the clusters.
- Initialization: The
__init__method should acceptn_featuresandkas parameters, initializingkcentroids with random locations in the feature space. This has been implemented already. - Methods:
distance(self, x, y): Calculate and return the Euclidean distance between two pointsxandy(float).fit(self, X, n_iterations): Implement the fitting process that assigns data points inXto the relevant clusterpredict(self, x): Given a new data pointx, predict and return the index of the cluster it belongs to (integer).
Evaluation Criteria
- Correctness: The implementation should correctly perform clustering on a given dataset.
- Efficiency: Code efficiency, especially in distance calculations and centroid updates, will be considered.
- Code Quality: Clarity, readability, and organization of the code, including proper use of classes and methods.
import numpy as np
class Centroid:
def __init__(self, location, vectors):
self.location = location # (D,)
self.vectors = vectors # (N_i, D)
class KMeans:
def __init__(self, n_features, k):
self.n_features = n_features
self.centroids = [
Centroid(
location=np.random.randn(n_features),
vectors=np.empty((0, n_features))
)
for _ in range(k)
]
def distance(self, x, y):
return np.sqrt(np.dot(x - y, x - y))
def fit(self, X, n_iterations):
for _ in range(n_iterations):
# start initialization over again
for centroid in self.centroids:
centroid.vectors = np.empty((0, self.n_features))
for x_i in X:
distances = [
self.distance(x_i, centroid.location) for centroid in self.centroids
]
min_idx = distances.index(min(distances))
cur_vectors = self.centroids[min_idx].vectors
self.centroids[min_idx].vectors = np.vstack((cur_vectors, x_i))
for centroid in self.centroids:
if centroid.vectors.size > 0:
centroid.location = np.mean(centroid.vectors, axis=0)
def predict(self, x):
distances = [self.distance(x, centroid.location) for centroid in self.centroids]
return distances.index(min(distances))In this mock interview, Raj (Snap) answers the prompt, "Implement K-Means clustering in K clusters based on which cluster's centroid each data point is closest to."
Below is a supplemental written solution that utilizes our interview framework.
Step 1: Understand the problem
This algorithm uses distance to determine which centroid a data point fits into. We’ll assume that we can use the Euclidean distance.
Often, Kmeans involves a number of iterations. Some implementations continually iterate until the centroids converge. For this implementation, we’ll assume that the number of iterations is a variable inputted by the user.
Lastly, we’ll assume that the input features, X, will be a NumPy array with arbitrary dimension of N examples and D features.
The K-means algorithm involves maintaining K centroids that represent clusters of the data points closest to them. The location of the centroid is the mean of all the data points in the cluster. Each centroid will be initialized randomly.
Step 2: Discuss the approach
The “fitting” stage will involve multiple iterations, where an iteration involves:
For each data point in the training set:
- Calculate the distance between that data point and the location of that centroid.
- Find the centroid with the minimum distance.
- Assign the data point to that centroid. For each centroid:
- Update the centroid as the mean of all data points assigned to the centroid in this iteration.
We will also have a “predict” function that takes in a new data point and returns which centroid this data point belongs to.
We also will have a “Euclidean distance” function to calculate the distance between points.
We can use a class to represent the K-means algorithm, which will define these three functions.
We can also use a data class to represent the “Centroid.” This class will have attributes for the “location,” which is always the mean of all the vectors, and the “vectors,” which represent a collection of all of the vectors within the cluster.
Step 3: Implement the algorithm
We will start off the implementation by defining the functionalities we discussed:
Pythonclass KMeans:
def __init__(self, n_features, k):
def distance(self, x, y):
def fit(self, X, n_iterations):
def predict(self, x):
Then we want to define the data class that describes the Centroid. Data classes in Python allow you to define classes that only require access to attributes. They don’t typically implement functions.
Pythonclass Centroid:
def __init__(self, location, vectors):
self.location = location # (D,)
self.vectors = vectors # (N_i, D)
Now, let’s define the initialization function. We want to initialize all the centroids to random values initially. We will have the variable “k” define the number of clusters to use, and then we can maintain each Centroid in a list with length equal to k. We will also have an attribute for the number of features, which will make certain things easier later on.
Pythondef __init__(self, n_features, k):
self.n_features = n_features
self.centroids = [
Centroid(
location=np.random.randn(n_features),
vectors=np.empty((0, n_features))
)
for _ in range(k)
]
Now, let’s implement our fit section, which contains most of the main logic for the algorithm.
Since we clarified that we can specify the number of iterations with a variable, we will call this n_iterations and pass this into the fit() function.
We will start off by initializing the vectors in the centroid to an empty list because the centroid contains a different set of vectors on each iteration:
Pythondef fit(self, X, n_iterations):
for _ in range(n_iterations):
# start initialization over again
for centroid in self.centroids:
centroid.vectors = np.empty((0, self.n_features))
Then, we will start computing the centroid that is closest to each data point in the training set. This will be done by taking the distance (which we assume is implemented for now) to each centroid and taking the index of the centroid with the minimum distance to that datapoint. Then we will assign it to the cluster by adding onto the set of vectors that are a part of the class.
Pythonfor x_i in X: distances = [ self.distance(x_i, centroid.location) for centroid in self.centroids ] min_idx = distances.index(min(distances)) cur_vectors = self.centroids[min_idx].vectors self.centroids[min_idx].vectors = np.vstack((cur_vectors, x_i))
Finally, we can update the centroid to be the mean of all of the vectors in that cluster.
Pythonfor centroid in self.centroids: if centroid.vectors.size > 0: centroid.location = np.mean(centroid.vectors, axis=0)
Now let’s implement our helper function for the Euclidean distance.
The Euclidean distance is calculated by taking the squared difference between each corresponding feature of the two data points, summing them up, and then taking the square root of the entire result. This can actually be written more cleanly in NumPy to vectorize that logic:
Pythonreturn np.sqrt(np.dot(x - y, x - y))
Finally, we can do our predict function that assigns a cluster based on an incoming data point:
Pythondef predict(self, x):
distances = [self.distance(x, centroid.location) for centroid in self.centroids]
return distances.index(min(distances))
Step 4: Discuss the results
We could evaluate the clusters by:
- Evaluating performance on downstream tasks that follow the clustering.
- For example, clustering can be used as an approach to find certain data points that give the highest error. You could then train a supervised ML model to see how “good” these clusters were. In other words, how much did these clusters get data points that the supervised model struggles the most with?
- Looking at how quickly (by # of iterations) or how definitively the clusters converge by looking at the distance between the means of the clusters on two successive iterations
- Looking at the variance of the clusters - lower variance indicates better clustering.
With more time, we could make additional improvements, such as:
- Named the “Centroid” class as a “Cluster” class to more clearly define what it means. Generally the centroid will refer to the mean of a cluster, whereas the cluster will hold both the mean and vectors associated with the cluster.
- Initializing the clusters based on the distribution of values within the training set, rather than doing it randomly as float values between 0 and 1.
I used array to append. np.array does not have append and every np.vstack recreates object again.
import numpy as np class Centroid: def __init__(self, location, vectors): self.location = location # (D,) self.vectors = vectors # (N_i, D) class KMeans: def __init__(self, n_features, k): self.n_features = n_features self.centroids = [ Centroid( location=np.random.randn(n_features), vectors=[] # Initialize empty list for vectors ) for _ in range(k) ] def distance(self, x, y): # Calculate Euclidean distance between x and y return np.linalg.norm(x - y) def fit(self, X, n_iterations): for _ in range(n_iterations): # Reset vectors for each centroid for c in self.centroids: c.vectors = [] # Clear the centroid's vector list # Assign points to closest centroid for p in X: distances = [self.distance(p, c.location) for c in self.centroids] # Use self.distance c_idx = np.argmin(distances) # Find the closest centroid index self.centroids[c_idx].vectors.append(p) # Append point to the corresponding centroid # Recalculate centroid location for c in self.centroids: if len(c.vectors) > 0: # Check if there are any points assigned c.location = np.mean(np.array(c.vectors), axis=0) # Update centroid location based on mean of vectors def predict(self, x): distances = [self.distance(x, c.location) for c in self.centroids] # Use self.distance return np.argmin(distances) # Return the index of the closest centroid