Using the State Pattern to Reduce Cyclomatic Complexity

Using the State Pattern to Reduce Cyclomatic Complexity

A while ago I wrote some code at work that computed some signal processing stuff on an incoming stream of data. It looked likes this (in python):

class StreamComputation:
    def __init__(self):
        # Initialize value to 0.
        self.x = 0

    def nextValue(self, incoming):
        # Add incoming and return.
        self.x += incoming
        return self.x

After some time had passed, I got the requirement to add a stage to the computation that would run before the code I had already written, and change a little bit the written code. With a bit of time pressure, I quickly added a function for the new first stage, added a variable, and an if to branch between the stages. It looked something like this:

class TwoStagedStreamComputation:
    def __init__(self):
        # Initialize stage to 1.
        self.stage = 1

        # Initialize value to 0.
        self.x = 0

    def stage1(self, incoming):
        self.x += incoming
        if self.x >= 10:
            self.stage = 2
        return self.x

    def stage2(self, incoming):
        # Add (some computation on) incoming and return.
        self.x += someComputation(incoming)
        return self.x

    def nextValue(self, incoming):
        if self.stage == 1:
            return self.stage1(incoming)
        else:
            return self.stage2(incoming)

Tests were updated, QA had a check, maybe there was a code review (maybe not), and the code shipped. There was *no* bug in the code. But I still feel that I didn’t do it right.

Beyond this one example, all over our code base at work, and in some of my private projects, I have similar looking code. Sometimes I have a member or two expressing the stage of computation a class is in, and sometimes it’s about stage of data entry in GUI. Actually quite a bit of old GUI code I have is written like this (not all, see my post on using generators for GUI).

After some more time had passed, and after watching all kinds of YouTube videos on programming, I stumbled on State Machines. Sure, I am familiar with the concept. I even have quite a bit of code that names itself “Sate Machine”. But it’s not really a state machine, and it’s not the best code.

One of the videos was about a particular generic state machine implementation for C++. So at this point I realized that probably all languages have many implementations of state machines. And this is great. I’m still researching some libraries, but I’m definitely going to pick one for each language I program in (mostly python and C++). But during this research period I also found Peter Norvig’s presentation Design Patterns in Dynamic Programming, in which he basically says that the Sate Pattern isn’t needed in dynamic languages!

The state pattern is not exactly the same as a state machine. I didn’t know what the state pattern was until I saw Peter Norvig’s presentation, but once I learned about it I realized how to utilize the pattern, in a dynamic language, and to make my code slightly better:

class FPtrTwoStagedStreamComputation:
    def __init__(self):
        # Initialize nextValue to first function.
        self.nextValue = self.nextValueStage1

        # Initialize value to 0.
        self.x = 0

    def nextValueStage1(self, incoming):
        self.x += incoming
        if self.x >= 10:
            self.nextValue = self.nextValueStage2
        return self.x

    def nextValueStage2(self, incoming):
        # Add (some computation on) incoming and return.
        self.x += someComputation(incoming)
        return self.x

The change is that I keep a “function pointer” to the current computation stage, and change it when I transition. This removes the if condition on stages. I think this is basically the state pattern, modified for a dynamic language where functions are first class, in the way Norvig meant. I like it!

But how do I know that this reduces the complexity of my code? I actually asked myself this. Then google. And google came through. Google found some StackOverflow answer on reducing branch conditions that contained the term Cyclomatic Complexity. Alright! This is basically a quantitative measurement of how complicated a program is, which counts the number of linearly independent paths in the program. Really it’s about the number of branchings (if/switch/for/while/try…) the program has.

This is a completely new term for me, and I can already see myself overusing it all the time. But let’s stick to the code at hand. So in the first `TwoStagedStreamComputation` class, with the if statement, the cyclomatic complexity of the functions are as follows:

1. stage1 – 2
2. stage2 – 1
3. nextValue – 2

In the `FPtrTwoStagedStreamComputation` class, with the function pointer and no if statement, the cyclomatic complexity of each function is:

1. nextValueStage1 – 2
2. nextValueStage2 – 1

And that’s it! So there’s 2 whole cyclomatic complexity points less. The code is definitely better! Ahahahahahahahhahaha

Anyway… At this point I am pretty sure that the state pattern (or state machines, for different purposes) is useful and reduces the complexity of code. I’ll finish the post with thoughts on what to do when there’s more than one function to each stage of computation.

Say that there’s another function that we must expose, called `reset`, which behaves differently between the stages. In the first stage, calling `reset` returns `self.x` to 0, and in the second stage to `10`. One way to add this to the code is simply with another function pointer:

class FPtrTwoStagedStreamComputationAndReset:
    def __init__(self):
        # Initialize nextValue to first function.
        self.nextValue = self.nextValueStage1
        self.reset = self.reset1

        # Initialize value to 0.
        self.x = 0

    def nextValueStage1(self, incoming):
        self.x += incoming
        if self.x >= 10:
            self.nextValue = self.nextValueStage2
            self.reset = self.reset2
        return self.x

    def reset1(self):
        self.x = 0

    def nextValueStage2(self, incoming):
        # Add (some computation on) incoming and return.
        self.x += someComputation(incoming)
        return self.x

    def reset2(self):
        self.x = 10

This may start to look not so maintainable. What if we need to add even another function later? Will we remember to set all the pointers? We can bunch up the functions pointers corresponding to each computation stage in a value. The usual way to bunch function pointers is using a class. In a dynamic language, this is basically what classes are: bunching of values, such as function pointers, since we’re in a dynamic language. So maybe something like this:

class ClassesTwoStagedStreamComputationAndReset:
    def __init__(self):
        # Initialize stage to first class.
        self.stage = FPtrTwoStagedStreamComputationAndReset.stage1

        # Initialize value to 0.
        self.x = 0

    def nextValue(self, incoming):
        return self.stage.nextValue(self, incoming)

    def reset(self):
        self.stage.reset(self)

    class stage1:
        @staticmethod
        def nextValue(self, incoming):
            self.x += incoming
            if self.x >= 10:
                self.stage = FPtrTwoStagedStreamComputationAndReset.stage2
            return self.x

        @staticmethod
        def reset(self):
            self.x = 0

    class stage2:
        @staticmethod
        def nextValue(self, incoming):
            # Add (some computation on) incoming and return.
            self.x += someComputation(incoming)
            return self.x

        @staticmethod
        def reset(self):
            self.x = 10

As to which code is more readable, maintainable, or just better, I currently have no personal opinion. The cyclomatic complexity, other than having 2 more functions in the second alternative (for `nextValue` and `reset`) which don’t really count, is the same. The second alternative now really looks like the state pattern, as defined in the Wikipedia article, so maybe there’s another, cleaner way to do it in a dynamic language. I’m going to ask around to see what other people think. What about you, what do you think?

An important note is that everything here can be done in languages with functions pointers, which include C++. In fact, in C++ we can use type erasure, like `std::variant` or `std::function` (storing a `std::bind` if needed) to point to the current stage structure (be it a state instance or a function).

Other than the state-machine-like code that I have that will undergo significant modification once I choose state machine libraries, I also have a lot of code that looks like a pipeline:

def algorithm(input1, input2):
    do_stuff
    if condition:
        do_more_stuff
        for something in some iterator:
            if another condition:
                for elements in another iterator:
                    if last condition:
                        return I found something!

There are no else’s, and no early breaks. The code only flows forward. So this isn’t a state machine or state pattern. It’s a pipeline. And it has high cyclomatic complexity. I hope to write a post on how to reduce the complexity of this kind of code.

On computing the power of a polynomial in a given frequency band

In signal processing it is often that we need to compare two finite impulse responses (FIR). It is also often that we don’t care to know the difference between two FIRs outside a specific frequency band, and only care about the difference within that band. Given the difference FIR p, given as n+1 coefficients for a sample rate N, and the frequency band of interest being [f_1, f_2], it is common to simply assume the sample rate is actually n+1 and define:

power_{[\frac{f_1}{N}, \frac{f_2}{N}]}^{approx}(p):=\frac{1}{n+1}\sum_{\substack{f\in\mathbb{N} \\ \frac{f_1}{N}(n+1)\le f \le \frac{f_2}{N}(n+1)}} w(\frac{f}{n+1})|\hat{p}(\frac{f}{n+1})|^2

where for p=(p_0, ..., p_n) we put \hat{p}(z)=p_0+p_1e^{-2\pi\iota z}+...+p_n e^{-2\pi\iota nz}, and we have some coefficients w(\frac{f}{n+1}).

We want to preserve Parseval’s Theorem, namely we would like the following to hold:
power_{[\frac{0}{N}, \frac{N/2}{N}]}^{approx}(p)=p_0^2+...+p_n^2.

A bit of algebra, that I’m not interested in showing here, gets us:
w(x) = \begin{cases}        1 & x=0\ or\ x= \frac{1}{2}\\       2 & otherwise    \end{cases}.

In order to compute power_{[\frac{f_1}{N}, \frac{f_2}{N}]}^{approx}(p) we recognise that the numbers \hat{p}(\frac{f}{n+1}) are a subsequence of the discrete Fourier transform (DFT) of p=(p_0, ..., p_n). We compute the DFT of p, probably using an implementation of FFT, and sum as needed.

Let’s instead try compute the true power of p in the band [\frac{f_1}{N}, \frac{f_2}{N}]. Instead of declaring the definition, I will first motivate it a bit. In the approximation above, we can double the sample rate we’ve assumed:

power_{[\frac{f_1}{N}, \frac{f_2}{N}]}^{approx}(p;2):=\frac{1}{2n+2}\sum_{\substack{f\in\mathbb{N} \\ \frac{f_1}{N}(2n+2)\le f \le \frac{f_2}{N}(2n+2)}} w(\frac{f}{2n+2})|\hat{p}(\frac{f}{2n+2})|^2.

The sum is twice as large so we get “more” frequencies involved. On the other hand, if we use the same computation method as above then we pay twice as much for the larger FFT. Now, instead of doubling, we can multiply the sample rate by a number M to get the sum:

power_{[\frac{f_1}{N}, \frac{f_2}{N}]}^{approx}(p;M):=\frac{1}{M(n+1)}\sum_{\substack{f\in\mathbb{N} \\ \frac{f_1M}{N}(n+1)\le f \le \frac{f_2M}{N}(n+1)}} w(\frac{f}{M(n+1)})|\hat{p}(\frac{f}{M(n+1)})|^2.

The larger M is, the more our sum should become independent of M – we’re taking more and more frequencies. But now to compute this the FFT needed is pretty large. So instead of simply taking a large M, let’s take it to be even larger, and define:

power_{[\frac{f_1}{N}, \frac{f_2}{N}]}(p):=\lim_{M\rightarrow\infty} \frac{1}{M(n+1)}\sum_{\substack{f\in\mathbb{N} \\ \frac{f_1M}{N}(n+1)\le f \le \frac{f_2M}{N}(n+1)}} w(\frac{f}{M(n+1)})|\hat{p}(\frac{f}{M(n+1)})|^2.

Wait what? The FFT is big, so make it bigger??? Hold on: we can now recognise the limit on the right to be a Riemann integral. In fact, we have:

power_{[\frac{f_1}{N}, \frac{f_2}{N}]}(p)=2\int_{\frac{f_1}{N}}^{\frac{f_2}{N}} |\hat{p}(f)|^2df.

This is the true power of p in the band [\frac{f_1}{N}, \frac{f_2}{N}]. It is independent of the sample rate, so from now on we’ll simply write, for two real numbers f_1, f_2\in[0, \frac{1}{2}]:

power_{f_1, f_2}(p):=2\int_{f_1}^{f_2} |\hat{p}(f)|^2df.

How do we compute this? Let’s apply some algebra.

We’ll perform some abuse of notation and use p to also denote the polynomial p(x):=p_0+p_1x+...+p_nx^n. Putting the definition of \hat{p}(f) into the definition of power we have:

\begin{aligned} power_{f_1, f_2}(p)&=2\int_{f_1}^{f_2}|p(e^{-2\pi\iota f})|^2df \\ &=2\int_{f_1}^{f_2}p(e^{-2\pi\iota f})\overline{p(e^{-2\pi\iota f})}df \\ &=2\int_{f_1}^{f_2}p(e^{-2\pi\iota f})p(e^{2\pi\iota f})df \end{aligned}

where we have used that, since p_i are real, \overline{p(x)}=p(\overline{x}). We continue by setting:

\begin{aligned} q(x):&=p(x^{-1})p(x)=\sum_{i=0}^{n} p_ix^{-i} \sum_{j=0}^{n} p_jx^j \\ &=\sum_{k=-n}^{n}\big(\sum_{\substack{0\le i,j\le n\\ i-j=k}}a_ia_j \big)x^k=\sum_{k=-n}^{n}q_kx^k \end{aligned}

so that

\begin{aligned} power_{f1,f2}(p) &=2\int_{f_1}^{f_2}q(e^{2\pi\iota f})df \\ &= 2\int_{f_1}^{f_2}\sum_{k=-n}^{n}q_ke^{2\pi\iota k f}df \\ &= 2\sum_{k=-n}^{n}q_k\int_{f_1}^{f_2}e^{2\pi\iota k f}df \\ &= 2\sum_{k=-n}^{n}q_k \frac{e^{2\pi\iota k f}}{2\pi\iota k}\biggr|_{f_1}^{f_2} \\ &= 2\sum_{k=-n}^{n}q_k \frac{e^{2\pi\iota k f_2} - e^{2\pi\iota k f_1}}{2\pi\iota k}. \end{aligned}

With more abuse of notation, set q=(q_{-n}, ..., q_n). Also, set

v_{f_1, f_2}^n := (2\frac{e^{2\pi\iota k f_2} - e^{2\pi\iota k f_1}}{2\pi\iota k})_{k=-n}^n.

So we have:

power_{f_1, f_2}(p)=(q, v_{f_1, f_2}^n)

where (\cdot, \cdot) is the usual dot product. Note that the coefficients of q(x) are also the coefficients of p(x)\cdot x^np(x^{-1}), which is a multiplication of two degree n polynomials and hence can be computed using two FFTs. This shows that the power of a polynomial in a given band can be computed using two FFTs of size 2n+1 (number of coefficients of q), some exponentiating, and a dot product. Even though we took our sample rate to infinity, the computation turned out pretty simple.

One more improvement before you go: instead of computing the coefficients of q(x), we can use the identity

(q, v_{f_1, f_2}^n)=(Uq, Uv_{f_1, f_2}^n)

where U is the unitary discrete Fourier transform operator. Since q is a multiplication of polynomials, so a convolution, Uq can be expressed in terms of Up:

Uq=U(p(x)*x^np(x^{-1}))=Up\times\overline{Up}

where the \times means component wise. If we already have Uv_{f_1, f_2}^n computed, this means that computing power_{f_1, f_2}(p) takes only a single FFT of size 2n+1. Amazing!

Ok, that’s it.