Random notes on stream-first architecture, Flink and friends

Use-cases and Technical requirements

The main use-cases for stream processing systems are applications that need to process a large amount of data coming at high throughput (say high velocity) with very low latency. Low latency is important, because in some use-cases where computations don’t need to be real-time, e.g. computing monthly cost on AWS, preparing quarterly business reports, re-indexing websites for a search engine, etc… then a traditional batch processing systems like MapReduce or Spark would be a better fit. Time-critical applications like processing events in online games or financial applications, monitoring and predicting device failures, processing critical performance metrics, etc.., however, requires real-time or near real-time insights. In those cases, a stream processing system would be more suitable.

That being said, when we say stream processing systems, we mean tools that allow users to perform computations on-the-fly. Related technologies for simple message queues (without computations), especially in the context of the microservice madness, can sometimes overlap with stream processing technologies, but they should be considered separately in various facets and contexts.

The pioneer in stream processing systems was Apache Storm. Storm can provide low latency, but word on the streets is that it is hard to get high throughput. Moreover, Storm doesn’t guarantees exactly-once semantic (more on this later).

A clever compromise to get exactly-once semantic at a relatively low latency was to cut the data stream into small batches (streaming as a special case of batch). Since computations are done on batches, it can be redone when failure happens. With small batches, this can approximate streaming pretty well and is suitable for applications that aren’t too picky on latency. However, cutting the stream into batches have inherit problems dealing with the semantic of times, and can lead to incorrect results when events do not arrive in order (which happens quite often in distributed systems). Nonetheless, this is the approach taken by Spark Streaming. Storm Trident is an improvement over Storm, also seems to take this micro-batch approach.

Message transport systems

In order to feed data into stream processing engines, we need a message transport system, which does what it should: transport messages. Although it might sound simple, doing this at scale is quite challenging. An ideal message transport system for a stream applications must:

  • provide persistence, i.e. the ability to rewind and replay the stream (i.e. time travel). It was once thought that a message transport system can’t have both performance and persistence. Lately, it turns out that this tradeoff is not fundamental, and systems like Apache Kafka can provide high throughput delivery with persistence. This is one of the important revolutions for the streaming community.
  • decouple producers and consumers. A good transport system doesn’t send messages point-to-point, because doing so in a scalable and fault-tolerant way is difficult. One of the solution that Kafka employs is to allow users to create topics, to which producers can send messages to, and consumers can subscribe from.

Having a good message transport system is critical for the stream processing engine down the road.

The notion of Time

In streaming applications, there are 2 notions of time: event time, which is the time that the message was generated in real-world, often stored as a timestamp along with the message, and processing time, which is the time that the message is observed by the machine processing it, normally measured by the system clock of that machine.

Now messing up with times is dangerous. To perform computations, stream processing engines divide the streams into (sliding) windows, based on the time of each message. It is therefore important to specify which time they are using, otherwise it can lead to incorrect results, as demonstrated in an interesting experiment here.

Micro-batching system, like Spark Streaming, only accepts time windows, and it is therefore tricky to get things right.

The bible in handling times in streaming applications is this VLDB paper from Google Dataflow (now Apache Beam), which was one of the inspirations of Flink. The following section was written based on this paper.

Windows, Triggers, and Watermarks

There are different kinds of windows supported by modern stream processing engines:

  • Time windows
  • Count windows
  • Session windows (or timeout windows): based on the gaps between messages, suitable for streams of user interactions on a website, for instance.

All kinds of windows are special cases of triggers – which define when the content of a window is aggregated and returned to the output stream. Good stream processing engines should allow users to specify custom trigger logic.

Another aspect of time is time travel – the ability to rewind the clock and repeat the computation at some certain time in the past. This is important in real-world applications where users need to replace their applications with a new version, or do A/B testing between different implementations of an algorithm, etc… Obviously if we only use processing times, time travel would be impossible, because the new version of the application would just simply process the latest messages coming right now. Therefore, serious stream processing applications must support event times (and coupled with a message transport that provides persistence).

Event times are tricky to work with though. Since events can arrive to the system out-of-order, when it sees the event happened at time 10:01 and then at 10:10, how does it knows that there won’t be another event happened at 10:05 coming to the system? In other words, there needs to be a clock defined by the data, so that event time windows can be correctly defined.

Stream processing engines deal with this problem by the concept of Watermarks, which are embedded messages in the stream, bearing a time marker t, notifying that there won’t be any other event happened before t coming to the stream. With watermarks, event time progresses independently from processing time. If a watermark is late (in processing time), the results will still be computed correctly, albeit it will be late in the result stream.

Stream processing engines would allow developers to define watermarks, because that requires some understanding of the domain, e.g. they know that their messages can’t be late more than 5 seconds, then the watermark will be the current time minus 5 seconds. This can get as elaborated/fancy as having a Machine Learning model to predict how late the messages come to the stream.

Stateful computation and exactly-once semantic

Most interesting stream-based applications are stateful. If the engine doesn’t support stateful, application developers would need to take care of that (by recruiting a database, for instance). More useful engines, however, can take care of the states and recomputing it in case of failures. Doing this properly will make sure the engine provides exactly-once semantics (the output stream will be correctly computed even in case of failures), but normally it is difficult to do, both at the semantic and implementation levels.

Folks behind Flink came up with a solution described in this paper, called Asynchronous Distributed Snapshots. This algorithm allows states to be restored correctly in case of failure, even when there are multiple workers.

The core idea is simple to understand. They inject checkpoints periodically into the coming stream. If a checkpoint event is processed, each worker will dump its states into a persistence storage. When failure happens on one of the worker, it will notify every other workers, everyone resets their states at the same previous checkpoint, and start the computation again (keep in mind that they can do time travel thanks to event times and watermarks). In order for this to work, the checkpoint ID needs to be synchronized across workers. Some tricks also need to be implemented at the output stage to make sure the output stream doesn’t contain duplicated entries. At the end, this checkpointing mechanism will ensure exactly-once semantic with relatively minimal effect on overall latency.

The same checkpointing mechanism allow Flink to create savepoints, at which moment the state of the whole system is saved, and can be loaded later. This is extremely useful in practice since it allows new version of the streaming application to pick up (reasonably) arbitrary savepoints of previous versions.

Now that we have stateful exactly-one semantics, we can even allow users to query the states directly, which makes stream processors behave more or less like a traditional database. This is still on-going research.

Stream-first architecture, Lambda architecture, Kappa architecture

Fancy words for fancy tech. They are all about employing a message transport, like Kafka, as a courier of messages, and some form of stream processors. Sometimes the message transport can dump data periodically into a persistence layer (like HDFS), and batch jobs will be scheduled to pick up latest data and provide exact insights. However, again, this approach is only suitable for applications that don’t require real-time insights.

An interesting use-case of streaming is to predict failures on a stream of sensor data. A feasible scalable approach would be to write a Flink application that monitor the sensor data stream and output a stream of statistics, stored in a Kafka topic. This stream will be then picked up by another Flink application that does the Machine Learning job, and raise alert if anomaly is detected.


Stream processors might have difficulties in batch computation (although batches can be obviously seen as special cases of streams – they are streams that have a definite start and end), e.g. running SQL queries on Flink is still an open effort. Doing SQL properly on streams has always been tricky.

Other computations traditionally suited for batch processors, like Machine Learning, is also tricky to do on systems designed for streaming.

Finally, like any other emerging technologies, streaming might be hyped. Picking the right solution should be based on the nature of the problems, not on the trends.


[RL4a] Policy Optimization

I thought I would write about some theory behind Reinforcement Learning, with eligibility traces, contraction mapping, POMDP and so on, but then I realized if I go down that rabbit hole, I would probably never finish this. So here are some practical stuff that people are actually using these days.

Two main approaches in model-free RL are policy-based, where we build a model of the policy function, and value-based, where we learn the value function. For many practical problems, the structure of the (unknown) policy function might be easier to learn than the (unknown) value function, in which case it makes more sense to use policy-based methods. Moreover, with policy-based methods, we will have an explicit policy, which is the ultimate goal of Reinforcement learning to control (the other type being learn-to-predict, more on that some other time). With value-based methods, like DQN, we will need to do an additional inference step to get the optimal policy.

The hybrid of policy-based and value-based is called Actor-Critic methods, which hopefully will be mentioned in another post.

One of the most straightforward approach in policy-based RL is, unsurprisingly, evolutionary algorithms. In this approach, a population of policies is maintained and evolutionized over time. People show that this works pretty well, e.g. for the Tetris game. However, due to the randomness, this is apparently only applied to problems where the number of parameters of the policy function is small.

A big part of policy-based methods, however, is based on Policy Gradient, where an exact estimate of the gradient of the expected future reward can be computed. Roughly speaking, there is an exact formulation for the gradient of the policy, which we can then use to optimize the policy. Since Deep Learning people basically worship gradients (pun intended), this method suites very well and became trendy about 2 years ago.

So, what is it all about? It is based on a very simple trick called Likelihood Ratio.


On GPU architecture and why it matters

I had a nice conversation recently around the architecture of CPUs versus that of GPUs. It was so good that I still remember the day after, so it is probably worth writing down.

Note that a lot of the following are still several levels of abstraction away from the hardware, and this is in no way a rigorous discussion of modern hardware design. Still, from the software development point of view, they are adequate for everything we need to know.

It started out of the difference in allocating transistors to different components on the chip of CPU and GPU. Roughly speaking, on CPUs, a lot of transistors are reserved for the cache (several levels of those), while on GPUs, most of transistors are used for the ALUs, and cache is not very well-developed. Moreover, a modern CPU merely has a few dozen cores, while GPUs might have thousands.

Why is that? The simple answer is because CPUs are MIMD, while GPUs are SIMD (although modern nVidia GPUs are closer to MIMD).

The long answer is CPUs are designed for the Von-neumann architecture, where data and instructions are stored on RAM and then fetched to the chip on demand. The bandwidth between RAM and CPU is limited (so-called data bus and instruction bus, whose bandwidth are typically ~100 bits on modern computers). For each clock cycle, only ~100bits of data can be transfer from RAM to the chip. If an instruction or data element needed by the CPU is not on the chip, the CPU might need to wait for a few cycles before the data is fetched from RAM. Therefore, a cache is highly needed, and the bigger the cache, the better. Modern CPUs have around 3 levels of cache, unsurprisingly named L1, L2, L3… with higher level cache sits closer to the processor. Data and instructions will first be fetched to the caches, and CPU can read from the cache with much lower latency (cache is expensive though, but that is another story). In short, in order to keep the CPU processors busy, cache is used to reduce the latency of reading from RAM.

GPUs are different. Designed for graphic processing, GPUs need to compute the same, often simple, arithmetic operations on a large amount of data points, because this is what happens in 3D rendering where there are thousands of vertices need to be processed in the shader (for those who are not familiar with computer graphics, that is to compute the color values of each vertex in the scene). Each vertex can be computed independently, therefore it makes sense to have thousands of cores running in parallel. For this to be scalable, all the cores should run the same computation, hence SIMD (otherwise it is a mess to schedule thousands of cores).

For CPUs, even with caches, there are still chances that the chip requires some data or commands that are not in the cache yet, and it would need to wait for a few cycles for the data to be read from RAM. This is obviously wasteful. Modern CPUs have pretty smart and complicated prediction on where to prefetch the data from RAM to minimize latency. For example, when it enters a FOR loop, it could fetch data around the arrays being accessed and the commands around the loops. Nonetheless, even with all those tricks, there are still chances for cache misses!

One simple way to keep the CPU cores busy is context switching. While the CPU is waiting for data from RAM, it can work on something else, and this eventually keeps the cores busy, while also provides the multi-tasking feature. We are not going to dive into context switching, but basically it is about to store the current stack, restore the stack trace, reload the registers, reset the instruction counter, etc…

Let’s talk about GPUs. A typical fragment of data that GPUs have to work with are in the order of megabytes in size, so it could easily take hundreds of cycles for the data to be fetched to the cores. The question then is how to keep the cores busy.

CPUs deal with this problem by context switching. GPUs don’t do that. The threads on GPUs are not switching, because it would be problematic to switch context at the scale of thousands of cores. For the sake of efficiency, there is little of locking mechanism between GPU cores, so context switching is difficult to implement efficiently.
– In fact, the GPUs don’t try to be too smart in this regards. It simply leaves the problem to be solved at the higher level, i.e. the application level.

Talking of applications, GPUs are designed for a very specific set of applications anyway, so can we do something smarter to keep the cores busy? In graphical rendering, the usual workflow is the cores read a big chunk of data from RAM, do computation on each element of the data and write the results back to RAM (sounds like Map Reduce? Actually it is not too far from that, we can talk about GPGPU algorithms in another post). For this to be efficient, both the reading and writing phases should be efficient. Writing is tricky, but reading can be made way faster with, unsurprisingly, a cache. However, the biggest cache system on GPUs are read-only, because writable cache is messy, especially when you have thousands of cores. Historically it is called texture cache, because it is where the graphical application would write the texture (typically a bitmap) for the cores to use to shade the vertices. The cores cant write to this cache because it would not need to, but it is writable from the CPU. When people move to GPGPU, the texture cache is normally used to store constants, where they can be read by multiple cores simultaneously with low latency.

To summarize, the whole point of the discussion was about to avoid the cores being idle because of memory latency. Cache is the answer to both CPUs and GPUs, but cache on GPUs are read-only to the cores due to their massive number of cores. When cache is certainly helpful, CPUs also do context switching to further increase core utilization. GPUs, to the best of my knowledge, don’t do that much. It is left to the developers to design their algorithms so that the cores are fed with enough computation to hide the memory latency (which, by the way, also includes the transfer from RAM to GPU memory via PCIExpress – way slower and hasn’t been discussed so far).

The proper way to optimize GPGPU algorithms is, therefore, to use the data transfer latency as the guide to optimize.

Nowadays, frameworks like tensorflow or torch hide all of these details, but at the price of being a bit inefficient. Tensorflow community is aware of this and trying their best, but still much left to be done.

Self-driving cars, again

This is my second take on Self-driving cars, a bit more serious than last time. You might be surprised to know that it is a combination of many old-school stuff in Computer Vision and Machine Learning like Perspective Transform, thresholding, Image warping,  sliding windows, HoG, linear SVM, etc…

Three months ago I kept wondering how would Self-driving cars work in Vietnam.

Now I am certain that it will never work, at least for the next 20 years (in Vietnam or in India, for that matter).

Secured private Docker registry in Kubernetes

If you run a Docker-based Kubernetes cluster yourself, sooner or later you will find out that you need a Docker registry to store the docker images. You might start out with a public registry out there, but often you might want to keep your images away from the public. Now if your cluster is on the cloud, you can just use the Container Registry provided by AWS EC2 or Google Cloud Platform. If your cluster is on-prem however, then you might want to keep the registry close to your cluster, hence deploying your own registry might be a good idea.

For starters, you can always use the registry addon shipped with Kubernetes. The default setup will give you an unsecured registry, so you will need to setup a DeamonSet to route a local port to the registry, so that to the workers, your registry runs on localhost:PORT, which will not trigger the secured logic of the docker daemon. Check the link for more information.

This setup is rather bad though. If a user, from his machine, wants to push his image to the registry, then he has to use kubectl to setup a proxy to the registry service, so that the service is available on his machine at localhost:PORT. This is rather inconvenient and tedious. We need a registry available at a separated host name, so that it can receive images from any machines in the network, and serve images to any workers in the Kubernetes cluster.


Yes, you should tune your pesky learning rate

For a ConvNN I trained recently, this is the learning curves when using Adam optimizer with initial learning rate = 0.01:


When using the traditional SGD with initial learning rate = 0.01, momentum = 0.9 and decaying learning rate every 3 epochs with decay rate of 0.96, the learning curves become:


I hope you see the drastic difference. With momentum, we got 10% error rate after 5 epochs. While with Adam, we got ~30% error rate after 20 epochs.

Now it might happen that Adam will work better if we add fancy stuff like Batch Norm and the likes to the network, which I didn’t try. However, when everything else being equal, it feels to me that Adam was a bit aggressive in decreasing the learning rate, which makes learning progress slow after a while.

Since the learning rate is more strongly regulated in Adam, perhaps we can be more lax in setting the initial learning rate? This is the learning curves for Adam with initial learning rate = 0.1


It went wild eventually.

But with initial learning rate = 0.001, Adam gives this:


It is much better now.

Over the years, momentum and decaying learning rate has been my first choice for tuning the learning rate. I sometimes use Adagrad/RMSProp/Adam for cross-checking, but the best results are usually found with momentum, often with less training epochs.

The take-away message is you should really tune your learning rate hard. It is still one of the most important hyper-parameters. Although methods like Adam/Adagrad might make the impression that tuning the learning rate is easy, in fact it is very problem-dependent. Momentum has many more knobs to tune, but once used wisely, it will be very flexible and powerful. Often you will end up to the same ballpark with any of those optimizers.

Jenkins server, or why you should never trust engineer’s estimation

Recently, like any other serious engineer, I realized I need a CI server. I didn’t want to spend money on travis-ci, or using 3rd-party services like and alike, so I decided to roll my own Jenkins server. I thought it will be just to start a new instance, install Jenkins, configure some plugins and up it goes! C’mon, how hard could that possibly be? I could probably get it running in couples of hours!

I’ve never been so wrong in my entire life.


Installing Jenkins is indeed a piece of cake, but configuring the thing is a real pain in the butt.

The first weekend passed by and I haven’t got it running, I thought: okay, this is annoying, but I can probably get it done in the next couples of hours.

But the second weekend passed by and Jenkins still didn’t work, I was not annoyed anymore. I was frustrated, so frustrated that I decided to rant about it (after I get it done, of course).

Seriously, can’t you make this a bit more verbose and tell me what could be possibly wrong? All I want is just a simple Jenkins server that builds every Pull Request, and gets trigged when something happened on my git server. Is it too much of a thing to ask? Also, can someone please write some tutorials that is up-to-date?

The cool thing about Jenkins is that it is very flexible, but so flexible that any single plugin can break your system. I ran into NullPointerException, and when that is fixed, my build never gets triggered. At some moment in time I got it running, but naively I updated the plugin to the latest version and it stopped working. Never thought Jenkins will be this troublesome.

So here you go, this is how I get it done (at last, yea, I managed to get it done without committing suicide).

  1. Number 1 rule: use Github Integration plugin, instead of the probably outdated GitHub pull request builder.
  2. In fact if you happened to install GitHub pull request builder, you need to disable Auto-manage webhooks in Jenkins configuration, otherwise all the hooks will be handled by this plugin, instead of GitHub Integration plugin.
  3. Configure Github Integration plugin is pretty easy, although the guy was pretty terse in words.
  4. But then don’t upgrade the plugin! I initially used Github Integration plugin v0.1.0-rc9, and it works. But v0.1.0-rc10 has just been released couple days ago, and it stops working when I upgraded it. So this is the learning: Jenkins plugins might stop working when you upgrade it, think twice before doing that.
  5. As a bonus, use the Environment Script plugin to specify environment variables needed for your build. Don’t use Shell script, because in Jenkins, shell scripts will be executed with sh -ex, so anything you do with export ABC=… will go away once the script finished executing.
    But the Environment Script plugin is pretty retarded. You set the environment variables with echo “ABC=…”, instead of export ABC=….

Since it took me way much more time than I estimated, I decided to take some extra miles. Here are a few things that will make your build a bit cooler:

  1. Use Scoverage plugin to publish test coverage results (produced by sbt-scoverage, for instance)
  2. Use Embedded Build Status plugin to display those tiny logos screen-shot-2016-10-23-at-12-05-01 in your README file on github. Honestly, those logos are quite satisfying to have on your repo.
  3. Use Slack plugin for better integration with Slack.

When something you thought as “trivial” as setting up Jenkins took you this much time, you will start worrying about all other estimations you have made in your entire life…