Go ReactiveX Programming

Why it’s worth your time learning the reactive stream in Go

pancy
4 min readMar 19, 2017

What is a reactive system?

Simply put, a reactive system is one that when a change occurs in itself, or specifically any difference from the current state to the last known one (aka delta) is detectable, that change should be propagated throughout the system and either ends with effecting the state of the user-facing client or the shape of the data in the back end.

Any real-time application is a reactive system. There is absolutely nothing new about it. What is new is a new set of fancy terms and the interest in structuring and studying it.

What is reactive programming?

This is where it gets interesting. Some tools and languages are not designed to be used in writing a reactive system declaratively. This means the way you write your program does not reflect or express the reactiveness of your application.

Reactive models

Currently there are many models of reactive programming both at the core language or extension or library level. Some of notable ones are the actor model (led by Erlang, Elixir), callback and promises (led by Javascript) and flow-based programming (such as visual programming languages like Max MSP or Pure Data).

Javascript front-end libraries such as Angular and React are a good example of reactive programming tools. Their main concern is propagate changes in data and update those throughout the app.

ReactiveX Stream

A new reactive model from ReactiveX is the observable stream model. It supports the idea of handling stream of “emitted” changes which can effect other things within a system.

One trend that is emerging is big data. In big data, it isn’t just the amount of data that streams into a certain system. It is also about the velocity and the variety of the data stream. An example of browser interactions might make this clearer.

Just a few years ago, there were only so many user’s interactions on the web the developers were interested in. Almost exclusively clicks were the most interesting ones. Nowadays, analytics measure not just clicks, but hovers, scrolls, speed of scrolls, etc. literally every move the user makes on a web page. A casual user browsing a web page just for five minutes can easily generate hundreds or thousands of data points. Make that just a hundred users and you end of having millions of data points streaming into your application.

How do we program a system that handles this scenario in a transparent way?

Go and the Observable Streams

In Go, is it natural to just think of a stream as a channel. That makes perfect sense. Whatever data comes in can be chucked into one or more channels.

items := make(chan interface{})
for {
if payload != nil {
items <- payload.Data
} else {
break
}
}

It looks fine until you start to think about what you want to do with the data.

for data := range items {
process(data)
}

Wait, what if we need to process the data based on some criteria, create a finishing hook after the channel is exhausted, etc.?

for data := range items {
if d, ok := data.(int); ok {
if d < 10 {
go func() {
process(d)
}()
} else {
go func() {
process(d * 10)
}()
}
}
}
cleanup()

At a glance, while it seems really clean and simple, it does not really express your intention without following it step by step. Programming this way can easily distract the programmer from the bigger picture of building a reactive system that handles streaming data.

Enter RxGo

RxGo is the latest reactive extension for the Go language. What does it mean by extension? It is simply a package that helps the developers write reactive programs easier and more expressively without going into the nitty gritty of managing channels and goroutines.

There are reactive extensions for many languages under the ReactiveX’s umbrella. RxGo is currently the official one for the language.

RxGo wraps over channels and goroutines and provides a nice set of APIs complying to ReactiveX operators. For instance, according to the above scenarios, we can program in RxGo this way:

for {
if payload != nil {
items <- payload.Data
}
}
isInt := func(i interface{}) bool {
_, ok := i.(int)
return ok
}
ltTen := func(i interface{}) bool {
return i.(int) < 10
}
timesTen := func(i interface{}) interface{} {
return i.(int) * 10
}
it, err := iterable.New(items)
if err != nil {
panic(err)
}
src := observable.From(it)
ob := observer.New(process, cleanup)
s1 := src.Filter(isInt).Filter(ltTen).Subscribe(ob)
s2 := src.Filter(isInt).Subscribe(ob)
s3 := src.Filter(isInt).Map(timesTen).Subscribe(ob)
// Do other stuff hereselect {
case <-s1:
case <-s2:
case <-s3:
}

This may look like much more code, but if you look closely, you’ll see a very simple pattern at play here:

  1. Set up a stream or Observable.
  2. Modify it and create a new stream (optional).
  3. Subscribe an Observer or a handler function to the stream and activate it.
  4. A Subscription channel is returned which can be used to wait upon.
  5. Stream is processed and each item is handled by the subscribed handler.
  6. Meanwhile, the processes are asynchronous and while waiting the main goroutine is free to work on something else.

The code is readable and well-structured. Operators are chainable and each process returns a new Observable until activated at Subscribe. Side-effects are grouped into an Observer, which take a sole responsibility of watching over a stream it is subscribed to and act upon its items.

Conclusion

RxGo introduces a new pattern of programming reactive and asynchronous channels that is more functional, structured and much more declarative while providing a normalized interface to conventional Go code by means of channels.

--

--

pancy

I’m interested in Web3 and machine learning, and helping ambitious people. I like programming in Ocaml and Rust. I angel invest sometimes.