As a software developer who cares about making robust, debuggable systems, I’ve been interested for a while now in ideas like Functional Reactive Programming, or the Elm Architecture. Sadly, regular employement does not include a lot of opportunities to play with new ideas, so it wasn’t until I took some time off over Easter that I had a chance to sit down and explore for myself.
I had originally intended to implement something like the simple counter example from the Elm documentation, but as a console application in Python, and built on top of a Functional Reactive base. Unfortunately, it turns out that Elm does not have anything to do with FRP these days, so I wound up with two loosely-related things instead of the single compelling example I’d hoped for. (If you’d like to take a look at the end result, the complete project is up on GitLab.)
Nevertheless, I did learn things, and I want to talk about them. But first, some background information.
Functional programming means a lot of different things to different people, but the relevant part to this discussion is mathematically pure functional programming, or just “pure functions” for short. A pure function is one whose return value is determined only by its inputs.
For example, this is a perfectly respectable pure function:
>>> def add_seven(x): ... return 7 + x
Every time you pass 5 into that function, you’re always going to get 12 back out, no matter what the rest of the program might have done in the meantime.
>>> add_seven(5) 12 >>> add_seven(5) 12
On the other hand, here’s an example of an impure function:
>>> def number_from_file(somefile): ... return int(somefile.readline())
Even if you pass in exactly the same value for
somefile twice in a row,
you probably won’t get the same result out.
The second time
.readline() will read
the next line of the file,
not the same line it read originally:
>>> import io >>> handle = io.StringIO("5\n9\n32\n17\n") >>> number_from_file(handle) 5 >>> number_from_file(handle) 9
Pure functional code is easy to work with, because all the context you need to understand it is right in front of you. You don’t need to keep notes about what the rest of the program might be doing. It’s also easier to unit-test, since you don’t need to mock out API calls or reset test fixtures between each test.
It’s impossible to write an entire program in a pure functional style, because eventually you need to get input from the outside world and send a result somewhere. However, the more pure functional code in your codebase, the greater the chance that any given bug will be in one of the easy-to-understand, easy-to-test, easy-to-fix parts of the code.
Functional Reactive Programming
(FRP, from now on)
is like pure functional programming,
except that instead of a function taking values and returning a new value,
a function takes streams of values and returns a new stream.
You could say that
the function “reacts” to changes in its input streams
by updating its output stream.
Imagine a stream of numbers being fed into
the FRP equivalent of
If the input stream contains
5, 9, 32, and 17,
the output stream would include
12, 16, 39 and 24.
One natural use of FRP is service monitoring: given some application’s log file, a monitoring tool could read the logs, parse each line, and apply stream functions to convert that stream of records into streams of statistics like “average request latency” or “total requests per client”. You could have even more stream functions stacked on top, like a five-minute moving average of request latency, a one-hour moving average of request latency, and a stream that contains an alert while the five-minute moving average is 20% higher than the one-hour moving average.
FRP is not just about monitoring, it can be useful in any situation where events happen over time. You might think of the state of a database as a function over a stream of insert/update events, or think of a user-interface as a function over a stream of keyboard/mouse/touch events.
If you’ve written much Python, a function that takes a stream and produces another stream might sound a lot like Python’s generator functions.
You could write a generator function to add 7 to a stream like this:
>>> def add_seven_generator(xs): ... for x in xs: ... yield 7 + x ... >>> xs = iter([5, 9, 32, 17]) >>> numbers = add_seven_generator(xs) >>> next(numbers) 12 >>> next(numbers) 16 >>> next(numbers) 39 >>> next(numbers) 24
However, Python’s generators aren’t quite the same thing as FRP. Generators are designed to link together into a single long chain: Once a generator has yielded a value it must move on to the next value, it can’t yield the same value to any other the downstream generators that might want to consume it.
As an example, let’s say you have a function that adds two numbers:
>>> def add(x, y): ... return x + y
You can use that function to double a number by adding the number to itself:
>>> x = 5 >>> add(x, x) 10
However, if you have a generator function that adds two iterables of numbers:
>>> def add_generator(xs, ys): ... for x, y in zip(xs, ys): ... yield x + y
…you cannot use it to double numbers:
>>> xs = iter([5, 9, 39, 24]) >>> doubles = add_generator(xs, xs) >>> next(doubles) 14
Doubling 5 should not produce 14!
Because of the way Python generators work,
the call to
gets the first and second values from
rather than two copies of the first value.
To implement FRP in Python,
we need different behaviour.
We want streams to be re-usable,
so the same stream can be
passed to many different stream functions,
the same way a value like 5 can be
passed to many different pure functions
without being “used up”.
There’s two basic styles of FRP we could follow:
In Python, there’s no standard way for a function to refer to where its output goes to, so a “push” system would likely be awkward to use. On the other hand, every Python function can refer to its inputs (that’s what function parameters are!) so the “pull” model should be a natural fit.
Assuming we have a Python object representing a stream, the “pull” model requires a method to poll for the current value:
>>> class IStream: ... def poll(self): ... "Return the current output value of this stream."
As a very basic example, we can describe a stream containing a sequence of numbers:
>>> class NumberStream(IStream): ... "A stream of numbers" ... def __init__(self, *numbers): ... self._numbers = iter(numbers) ... ... def poll(self): ... return next(self._numbers)
It works exactly how you’d expect:
>>> numbers = NumberStream(5, 9, 32, 17) >>> numbers.poll() 5 >>> numbers.poll() 9
The stream equivalent of
add() is almost as simple:
>>> class AddStream(IStream): ... "A stream that adds its inputs" ... def __init__(self, xs, ys): ... self._xs = xs ... self._ys = ys ... ... def poll(self): ... # Get the latest x from the stream of xs ... x = self._xs.poll() ... # Get the latest y from the stream of ys ... y = self._ys.poll() ... ... return x + y
If we apply this stream function to
NumberStream of 5s,
NumberStream of 7s,
we will get a stream of 12s,
just like applying
the values 5 and 7 gives
the single value 12:
>>> fives = NumberStream(5, 5) >>> sevens = NumberStream(7, 7) >>> twelves = AddStream(fives, sevens) >>> twelves.poll() 12 >>> twelves.poll() 12
Now we can recreate the generators example from before, but in proper FRP style:
>>> numbers = NumberStream(5, 9, 32, 17) >>> doubles = AddStream(numbers, numbers) >>> doubles.poll() 14
Waaaaaait, that’s not right! This is exactly the same problem we had before!
If we think about a stream as a value that can change over time,
it makes sense that repeated polls at the same time
should return the same value.
The calls to
do not literally happen at exactly the same nanosecond,
but they’re close enough together that
we expect them to be treated the same.
we want to introduce an idea of “time” to
so that the caller can keep it “the same time” until
they’re done examining output streams,
and then “advance time” when they are ready to see what happens next.
>>> class IStream: ... def poll(self, phase): ... "Return the current output value of this stream."
We’ve added a new parameter named
When a stream’s
.poll() method is called,
if the value of
phase is the same as it was for the previous call,
the method must return the same value as it did for the previous call.
phase argument has changed since the previous call,
then the stream function may recalculate its output.
The idea is that a system decides it’s in “blue” phase
and when it polls all the streams it cares about,
it can be sure all the calculations are based on the system state
at the time “blue” phase began.
Then it can switch to “green” phase
and be sure that none of the stream outputs are based on
stale “blue”-phase data.
Because every stream needs to handle phases in the same way, let’s put that functionality into a base class where it can be easily shared:
>>> class BaseStream(IStream): ... "A base class that handles stream phases for you." ... def __init__(self): ... self._last_phase = None ... self._last_output = None ... ... def _poll(self, phase): ... "Override this to implement the actual stream function" ... ... def poll(self, phase): ... if phase != self._last_phase: ... self._last_phase = phase ... self._last_output = self._poll(phase) ... ... return self._last_output
Now, instead of overriding
to implement the calculation we want,
we must override
(with the leading underscore),
.poll() only calls once per phase.
._poll() still takes a
if it needs data from other streams,
it will need to pass the current phase along when polling them.
NumberStream in this new style:
>>> class NumberStream(BaseStream): ... "A stream of numbers" ... def __init__(self, *numbers): ... super().__init__() ... self._numbers = iter(numbers) ... ... # Because NumberStream does not poll any other streams, ... # it does not need the 'phase' argument. ... def _poll(self, _): ... return next(self._numbers)
BaseStream ensures that
NumberStream respects phases:
>>> numbers = NumberStream(5, 9, 32, 17) >>> numbers.poll("blue") 5 >>> numbers.poll("blue") 5 >>> numbers.poll("green") 9
AddStream is nearly the same as it was before:
>>> class AddStream(BaseStream): ... "A stream that adds its inputs" ... def __init__(self, xs, ys): ... super().__init__() ... self._xs = xs ... self._ys = ys ... ... def _poll(self, phase): ... # Get the latest x from the stream of xs ... x = self._xs.poll(phase) ... # Get the latest y from the stream of ys ... y = self._ys.poll(phase) ... ... return x + y
But this time, because we have phases, the doubling example works properly:
>>> numbers = NumberStream(5, 9, 32, 17) >>> doubles = AddStream(numbers, numbers) >>> doubles.poll("blue") 10 >>> doubles.poll("green") 18
So far we’ve made stream-based versions of pure functions like “addition”. Because addition is a pure function, it should not be surprising that stream addition maintains the understandable, testable nature of the pure functions it’s based on.
However, streams implicitly hide a little state (the current position in the stream), which means we can write stream functions that do more than stateless, pure functions can do. Accessing arbitrary shared state (like global variables) still breaks the rules, but a stateful stream function can keep the understandable, testable nature of pure functions as long as the stream function always produces the same output stream given a particular set of input streams.
Let’s say we have some stream that’s intermittent
sometimes it has a useful value
and sometimes it just has
and we want a stream that always has a value
(for example, maybe we want to compare it to some other stream).
We need to “fill the gaps” in the input stream
with some sensible value
to produce an output stream that always has useful values.
Here’s a stateful stream that does just that:
>>> class GapFiller(BaseStream): ... "Fills gaps in the input stream with the last good value." ... def __init__(self, xs): ... super().__init__() ... self._xs = xs ... self._last_good_value = None ... ... def _poll(self, phase): ... x = self._xs.poll(phase) ... ... if x is not None: ... self._last_good_value = x ... ... return self._last_good_value
GapFiller maintains extra state in the form of
but its behaviour is completely determined by the input stream,
so it’s just as easy to test as a plain pure function:
>>> maybe_numbers = NumberStream(5, None, None, 17) >>> numbers = GapFiller(maybe_numbers) >>> numbers.poll("blue") 5 >>> numbers.poll("green") 5 >>> numbers.poll("blue") 5 >>> numbers.poll("green") 17
At this point,
we’ve got basic FRP functionality working,
but it’s difficult to use:
creating a new subclass of
.poll() on our input streams…
that’s a bunch of boilerplate that obscures
the actual logic of a stream function.
Let’s make a decorator that turns a pure function into a pure stream function:
>>> def stream_function(func): ... "Decorates a pure function, making a stream function." ... ... class Inner(BaseStream): ... # We don't know in advance how many inputs func() takes, ... # so we'll take any number. ... def __init__(self, *inputs): ... super().__init__() ... self._inputs = inputs ... ... def _poll(self, phase): ... # Poll all the input streams to get their current values. ... current_input_values = [ ... each.poll(phase) ... for each in self._inputs ... ] ... ... # Whatever func() returns, given these values, ... # is the current output stream value. ... return func(*current_input_values) ... ... return Inner
Inner class is similar to
except that it handles any number of inputs
instead of hardcoding exactly two,
and it calles the wrapped function
instead of hard-coding
x + y.
Now we can write a stream function
by writing a pure function
and decorating it:
>>> @stream_function ... def add_streams(x, y): ... return x + y
…and all the stream-polling and phase-handling happens automatically:
>>> numbers = NumberStream(5, 9, 32, 17) >>> doubles = add_streams(numbers, numbers) >>> doubles.poll("blue") 10
Although Python generator functions can’t properly iterate over an FRP stream (because they don’t pass along the current phase), they’re a still a convenient way to express resumable calculations. If there were some non-iterator-based way to feed in new values from input streams, they could be quite convenient.
Luckily, Python 2.5 introduced “yield expressions”, where the generator function is paused, yields a value, and the caller can later pass in a new value which will be returned by the yield expression. We can use this to write a decorator that makes stateful stream functions out of generator functions:
>>> def stateful_stream_function(func): ... "Decorates a generator function, making a stream function." ... ... class Inner(BaseStream): ... # We don't know in advance how many inputs func() takes, ... # so we'll take any number. ... def __init__(self, *inputs): ... super().__init__() ... self._inputs = inputs ... ... # We need to store the generator object returned ... # when we call func(), so we can repeatedly resume it. ... self._generator = None ... ... def _poll(self, phase): ... # Poll all the input streams to get their current values. ... current_input_values = [ ... each.poll(phase) ... for each in self._inputs ... ] ... ... # If we have not yet created the generator object... ... if self._generator is None: ... # ...create it, passing the initial input values... ... self._generator = func(*current_input_values) ... # ...then execute up to the first yield expression. ... # The yielded value is our first output. ... return self._generator.send(None) ... ... # If we already have a generator object, ... # pass in the next input values and execute up to ... # the next yield expression. The yielded value is ... # our next output. ... return self._generator.send(current_input_values) ... ... return Inner
This is more complex than
because it has to deal with Python’s generator object API,
but the basic structure is the same.
We can re-implement the
GapFiller stateful stream function
as a generator function in this style:
>>> @stateful_stream_function ... def fill_gaps(x): ... last_good_value = None ... while True: ... if x is not None: ... last_good_value = x ... ... x, = yield last_good_value
The initial value for
x is passed in as a parameter,
and subsequent values are returned from the yield expression.
Note the comma after
x in the last line.
stateful_stream_function() is designed to work for
functions with any number of parameters
and the generator
.send() method only takes a single value,
we always send a list of input values.
If this function took multiple inputs,
we could say:
x, y, z = yield whatever
…but because it only takes one,
we need the odd-looking
to unpack the one-item list.
Although this generator function is not completely idiomatic Python, it’s still straight-forward code, and it produces a proper FRP stream:
>>> maybe_numbers = NumberStream(5, None, None, 17) >>> numbers = fill_gaps(maybe_numbers) >>> numbers.poll("blue") 5 >>> numbers.poll("green") 5 >>> numbers.poll("blue") 5 >>> numbers.poll("green") 17
FRP is a promising technique, and (as demonstrated) it can be neatly and ergonomically implmented in Python. That said, there’s a number of potential problems with the implementation shown:
asyncioto manage them. If you figure out how to build an FRP system on top of one of those, I’d love to hear about it.
stateful_stream_function()decorators make no attempt to handle keyword-arguments, or keyword-only arguments, or anything besides plain positional arguments.
stateful_stream_function()decorators require every argument to be a stream. If you want some parameters to be constant, you have to wrap them in a stream that returns the same value forever. That’s understandable, but clunky to use.
For the specific use-case I had in mind
(a single input stream,
window.get_wch() in the
none of these limitations affected me,
but your mileage may vary.
If you have comments or questions, you can discuss this post on Lobste.rs, Hacker News, or /r/Python.