Python makes extensive use of iterators and iterable collections.
This underlying aspect appears in many places. Each for
statement
makes implicit use of this. When we use functional programming
techniques, like generator expressions, lambdas, the map()
, filter()
,
and reduce()
functions we're exploiting iterators.
Python has an itertools
module full of additional iterator-based
design patterns.
We can apply these concepts in a number of places.
Partitioning all the original samples into testing and training subsets.
Testing a particular k and distance hyperparameter set by classifying all the test cases.
The k-NN algorithm itself and how it locates the k nearest neighbors from all the training samples.
The common aspect of these three processing examples is the "for all" aspect of each one.
Formally, we can summarize each of these with a logic expression. This isn't always helpful, it can be a bit math-heavy. We like the formality of it because it can help describe the processing without the limitations of the Python language.
Here's the essential rule for partitioning, for example. This involves a clear "for all" condition.
$$ \forall s \in S \colon s \in R \vee s \in E $$
For all s in the universe of available samples, S, the value of s is either in the training set, R, or the testing set, E.
We can also summarize the quality metric for tesitng. This has "for all" implied by the \(\sum\) construct.
$$ q = \sum_{e \in E} 1 \textbf{ if } knn(e) = s(e) \textbf{ else } 0 $$
The quality score is the sum for all e in the testing set, E, of 1 where the knn()
classifier applied
to e matches the species for e, s(e)
, otherwise 0.
The k-NN algorithm involves a bit more complexity in its definition. We can think of it as a partitioning problem. We need to start with a collection of ordered pairs. Each pair is the distance from an unkown, u, to a training sample, r, and the sample itself. This has to be done for all r in R. $$ \forall r \in R \colon { \langle d(u, r), r \rangle } $$
Then we need to partition these distances into two subsets, N, and F (near and far)
such that all distances in N are less than or equal to all distances in F, and
the number of elements in N, \(\lvert N \rvert\), is equal to k.
$$ \forall n \in N \wedge f \in F \colon d(u, n) \leq d(u, f) $$
This final formalism exposes an interesting nuance to the computation. What if there are more than k neighbors with the same distance metric? Should all of equidistant training samples be included in voting? Or should we arbitrarily slice exactly k of the equidistant samples? If we "arbitrarily" slice, what's the exact rule that gets used for choosing among the equidistant training samples? Does the selection rule even matter?
The example later in the chapter uses sorted()
function, which tends to preserve the original order.
This will lead to a bias to our classifier when confronted with equidistant choices.
Should we randomize the ordering? If so, how does introducing randomization change reproducibility of results?
These can be significant issues, and they're outside the scope of this book.
We'll tackle each of these in Python, making use of more advanced iterator features. We'll start with the partitioning algorithm.
We need to create two separate kinds of partitions of our data. First, we need to break the set of data into buckets where the values are likely to be equal. Then, from those buckets, we can choose testing and training samples.
If we use the internal hash values, we can create buckets containin samples that are likely to have equal values. In Python, if item items are equal, they must also have the same integer hash value. The inverse is not true: they may have the same hash value, but may not necessarily be equal.
We need to be sure that when a collection of samples are likely to be equal, they're all allocated for testing or training. We don't want to split them between testing and training because this can lead to an accidental exact match between testing and training values, which can bias the test results.
Formally, we can say this:
$$ a = b \Rightarrow h(a) = h(b) $$
Two objects in Python, a and b, are equal if they have the same hash value \(h(x)\). The reverse is not true because equality is more than a simple hash value check; it's true that \(h(a) = h(b) \wedge a \neq b\). This is the "hash collision" of two unequal values.
This is a matter of definition for modulo:
$$ h(a) = h(b) \Rightarrow h(a) = h(b) \pmod m $$
If two values are equal, they are also equal to any modulus of those values. (The inverse is not true.)
In Python, this is hash(a) % m == hash(b) % m
. If the two hash values, modulus m,
are the same, then, the hash values could be the same, and the two objects, a
and b
,
could also be equal. Several objects have the same hash value, and even more objects
have the same hash value modulo m. This approach limits the domain of objects
required for exact equality comparison.
The idea is to create sixty buckets with samples that have equal hash values, modulus sixty. Samples in the same bucket could be equal, and as a simple expedient, we can treat them as equal. Samples in separate buckets have different hash values and cannot possibly be equal.
Here's a partition function that first creates 60 separate buckets for samples. Then, some fraction of the buckets are allocated for testing, the balance for training. Specifically 12, 15 or 20 buckets are about 20%, 25% or 33% of the population. The percentage won't be exact because we don't know how many will fall in each bucket.
We've used a sequence of generator expressions to emit the final TestingKnownSample
instances and TrainingKnownSample
instances.
The generator expressions are lazy: they don't compute values until the generator
is comsumed.
We use the itertools.chain
to consume values from a sequence of generators.
These chained iterators can be used to build final lists of training and testing
objects.
def partition_2(
samples: Iterable[KnownSample], rule: Callable[[int], bool]
) -> Tuple[TrainingList, TestingList]:
# Split into buckets that cannot be equal.
rule_multiple = 3 * 4 * 5
partitions: DefaultDict[int, List[KnownSample]] = collections.defaultdict(list)
for s in samples:
partitions[hash(s) % rule_multiple].append(s)
# Build a fleet of generators for items in each of the buckets.
training_partitions: List[Iterator[TrainingKnownSample]] = list()
testing_partitions: List[Iterator[TestingKnownSample]] = list()
for i, p in enumerate(partitions.values()):
if rule(i) == 0:
testing_partitions.append(TestingKnownSample(s) for s in p)
else:
training_partitions.append(TrainingKnownSample(s) for s in p)
# Merge the generators into single lists.
training = list(itertools.chain(*training_partitions))
testing = list(itertools.chain(*testing_partitions))
return training, testing
This can create a large mapping as an intermediate data structure.
It also creates sixty generators, but these don't require very much memory.
The final two lists contain references to the same Sample
objects as the partitioning
dictionary.
The good news is the mapping is temporary and only exists during this function.
This function also requires a small change to the distribution rule function.
In the previous example, each rule had a type hint of Callable[[KnownSample, int], bool]
.
In this example, distribution rule has been simplified to Callable[[int], bool]
.
Given the index value for a partition (a value from 0 to 59, inclusive), we
can assign it to a testing or training partition.
The testing processing can also be defined as a higher-order function. We can summarize this as a "map-reduce" problem.
Given Hyperparameters with k values and algorithms,
A function classifies the test samples, mapping each test sample to a 1 for a match or 0 for a non-match,
A function reduces the matches to a final score.
Python provides high-level functions for the map and reduce operations.
Looking forward to the next section, we'll split the classifier algorithm out of
the Hyperparameter
class definition. There are at least three different ways to
approach this.
Instead, we'll make the classifier a Strategy that we provide when we create
an instance of the Hyperparameter
class. Here's the definition:
class Hyperparameter(NamedTuple):
k: int
distance_function: DistanceFunc
training_data: TrainingList
classifier: Callable[[int, DistanceFunc, TrainingList, AnySample], str]
def classify(self, unknown: AnySample) -> str:
classifier = self.classifier
return classifier(self.k, self.distance_function, self.training_data, unknown)
def test(self, testing: TestingList) -> int:
classifier = self.classifier
test_results = map(
lambda t: ClassifiedKnownSample(
t.sample,
classifier(
self.k, self.distance_function, self.training_data, t.sample
),
),
testing,
)
pass_fail = map(
lambda t: 1 if t.sample.species == t.classification else 0, test_results
)
return sum(pass_fail)
The test()
method uses two mapping operations and a reduce operation.
First, we define a generator which will map each testing sample to a ClassifiedKnownSample
object.
This object has the original sample and the results of classification.
Second, we define a generator which will map each ClassifiedKnownSample
object to a
one (for a test that matched the expected species) or a zero (for a test that failed).
This generator depends on the first generator to provide values.
The actual work is the summation: this consumes values from the second generator. The second generator consumes objects from the first generator. This technique can minimize the volume of data in memory at any one time. It also decomposes a complex algorithm into two separate steps, allowing us to make changes as necessary.
There's an optimization available here, also. The value of t.classification
in the second
generator is self.classify(t.sample.sample)
. It's possible to reduce this to a single
generator and eliminate creating intermediate ClassifiedKnownSample
objects.
Here's how the test operation looks. We can build a Hyperparameter
instance using a function for distance, manhattan()
and a classifier
function, k_nn_1()
.
h = Hyperparameter(1, manhattan, training_data, k_nn_1)
h.test(testing_data)
We'll look at the implementations of various classifiers in the next two sections.
We'll start with the base definition, k_nn_1()
and then look at one based
on the bisect
module next.
bisect
One summary of the k-NN algorithm has the following steps
Create a list of all (distance, training sample) pairs.
Sort in ascending order.
Pick to the first k, which will be the k nearest neighbors.
Chose the mode (the highest frequency) among the k nearest neighbors.
The implementation would look like this:
def k_nn_1(
k: int, dist: DistanceFunc, training_data: TrainingList, unknown: AnySample
) -> str:
distances = sorted(map(lambda t: Measured(dist(t, unknown), t), training_data))
k_nearest = distances[:k]
k_frequencies: Counter[str] = collections.Counter(
s.sample.sample.species for s in k_nearest
)
mode, fq = k_frequencies.most_common(1)[0]
return mode
While clear, this does accumulate a large number of values, when only k are actually needed.
One of the high cost parts of the basic k-NN algorithm is the sort of the entire set of of training data after the distances have been computed. This is a \(\textbf{O}(n \log n)\) operation that's difficult to eliminate.
Steps one to three can be optimized to keep only
the k smallest distance values. This involves using the bisect
module
to locate the position in a sorted list where a new value can be inserted.
Here's an alternative:
For each training sample:
Compute the distance from this training sample to the unknown sample
If it's greater than the last of the k nearest neighbors, discard it.
Otherwise,
Find a spot among the k values.
Insert the new item.
Truncate the list back to length k.
Find the frequencies of result values among the k nearest neighbors.
Chose the mode (the highest frequency) among the k nearest neighbors.
If we seed the list of k nearest neighbors with \(\inf\) values, then the first few comptued distances, d, will be kept because \(d < \inf\). After that, the distances must be small enough to be relevant.
def k_nn_b(
k: int, dist: DistanceFunc, training_data: TrainingList, unknown: AnySample
) -> str:
k_nearest = [
Measured(float("inf"), cast(TrainingKnownSample, None)) for _ in range(k)
]
for t in training_data:
t_dist = dist(t, unknown)
if t_dist > k_nearest[-1].distance:
continue
new = Measured(t_dist, t)
k_nearest.insert(bisect.bisect_left(k_nearest, new), new)
k_nearest.pop(-1)
k_frequencies: Counter[str] = collections.Counter(
s.sample.sample.species for s in k_nearest
)
mode, fq = k_frequencies.most_common(1)[0]
return mode
We're doing \(\textbf{O}(k \log k)\) processing for each of the items in the training set. We've moved from and \(\textbf{O}(n \log n)\) sort to an \(\textbf{O}(n \times k \log k)\) sequence of inserts. Because k is often much less than the number of training samples, n, this is effectively \(\textbf{O}(k \log k)\) processing.
This algorithm involves two state changes. First, a new item is inserted into the k nearest neighbors, then the most distant of the \(k+1\) neighbors is removed. While this doesn't change the overall complexity, these are relatively inexpensive operations when performed on a very small list.
heapq
We have yet another trick up our sleeve. We can use the heapq
module
to maintain a sorted list of items. This lets us implement the sorting operation
as each item is placed into the overall list. This doesn't reduce
the general complexity metric, but it replaces two inexpensive insert
and pop operations with potentially less expensive insert operations.
The idea is to start with an empty list an insert items into the list assuring that (a) the items are kept in order, and (b) the item at the head of the list is always has the least distance.
We can then pop k items from the heap to retrive the nearest neighbors.
def k_nn_q(
k: int, dist: DistanceFunc, training_data: TrainingList, unknown: AnySample
) -> str:
measured_iter = (Measured(dist(t, unknown), t) for t in training_data)
k_nearest = heapq.nsmallest(k, measured_iter)
k_frequencies: Counter[str] = collections.Counter(
s.sample.sample.species for s in k_nearest
)
mode, fq = k_frequencies.most_common(1)[0]
return mode
This is elegantly simple. It's not, however, remarkably fast.
Here are the performance results
algorithm | test quality | time |
---|---|---|
k_nn_1 | q= 241/ 1000 | 6.553s |
k_nn_b | q= 241/ 1000 | 3.992s |
k_nn_q | q= 241/ 1000 | 5.294s |
The test quality is the number of correct test cases. The number is low because the data is completely random, and a correct classification rate of about 25% is exactly what's expected.
The original algorithm is the slowest.
The bisect
based processing, even though the complexity is theoretically
comparable. The efficiences in working with a small list outweigh
the costs of performing the bisect operation many times.
The heapq
processing time was better than the original algorithm,
but only by about 20%. The heapq
is a faster way to build
a sorted list of items.
It's important to do both a theoretical analysis of the algorithm's complexity as well as a benchmark with actual data. The theoretical analysis is difficult to do in detail. Rather than assume all operations are equal cost, we need to also perform benchmark analysis to confirm the theoretical predictions.