Kafka auto scaling and fault tolerance within AWS
Kafka became a standard for highly loaded streaming systems. It’s horizontally scalable, very fast and reliable. Recently AWS introduced Managed Kafka service, but there are still many cases when you would want to use self-hosted solution. So in this article we’ll consider building our own highly available and fault tolerant Kafka cluster.
The problem
is that Kafka is distributed system by nature, but it doesn’t provide mechanisms for dynamic auto scaling out of the box. If you want to increase the size of your cluster, you need to update configurations of all the brokers, zookeeper instances and producers. If your instance failed, you need to run and configure a new one manually.
Why not Docker clusters
The first possible solution which could come into a mind is using Docker cluster. For example Kubernetes. It provides all what we need out of the box: service discovery, scaling, load balancing and additionally self healing. But firstly you have to consider upcoming overcomplexity. The entire configuration and maintenance (both Kafka and Kubernetes) may turn to be too complicated even for experienced engineers. Also, running Kafka as well as classical databases in Docker can be not that unambiguous decision.
So in this post I’ll consider one the possible approaches to build a distributive, resilient, dynamic and highly available Kafka cluster on AWS infrastructure.
What does AWS give us
AWS provides AutoScaling and Load Balancing services on the level of EC2 instances. In case of Kafka we can utilize Elastic Load Balancer as a single entry point to our Kafka cluster. But unlike classical load balancing we are going to use the ELB only for the metadata session.
We use simple configuration of cluster, where all the EC2 instances are identical, based on the same AMI. Each instance consists of 1 broker and 1 Zookeeper. So we can easily scale this image to any number of instances. But remember, that number of instances must be odd (1, 3, 5), so Zookeeper can elect the lead node by majority.
We also assume, that our producers are located outside of our VPC.
Producer sends request to ELB and gets back via metadata information about the lead node, which it’s going to directly communicate to. Therefore, brokers must be publicly available via Internet Gateway. Because right after handshaking and getting metadata, producer and broker communicate directly.
But the devil is as usual in the detail. Before running a new instance AutoScaling does not know about its IP address, its dynamic and randomly assigned based on CIDR. But we need to know all the brokers and Zookepers in order to configure each broker individually. Otherwise cluster’s topology is going to be broken. So we need to figure out the configuration for each our node and set it up before running the server.
Since IP addresses for our instances must be public for producers, we have to use Elastic IP addresses.
So in short, what we do here. We take from AWS API created in advance set of EIPs by tag. Let’s assume we have cluster of 3 instances, so we should book 3 EIPs with tag `kafka-brokers-demo` and then retrieve them in the user-data script, iterate and assign the first free slot, considering possible failures or race conditions (line 31). As a result we have all 3 IP addresses assigned.
One trick here is zookeeper and brokers id. They must uniquely and unambiguously identify the server. We take the index number of the ip address from array, which AWS EIP API returns, but this order is not guaranteed. So it can happen that you get 2 servers with the same ids and the topology will be broken. So function at line 50 derives IP address notation to integer, since they are both 32 bits. It allows to hardly bind IP address to the server identifier.
Results
Autoscaling
As a result you have independent atomic EC2 instances, which can be easily packed into Autoscaling Group, which is great for fault tolerance and fast cluster scaling (we just have EIPs limit). Even if you have just one instance, Autoscaling is a recommended way to keep you instance running as you can always scale in seconds or connect you cluster to Elastic Load Balancer.
Load Balancing
The advantage of having ELB in Kafka cluster is that your producers need to know only about address of ELB and nothing else. The meta information will be passed through it (make sure that ports 2181 and 9092 are exposed in your ELB)
Fault tolerance
Once your EC2 instance failed, Autoscaling will run a new instance which will assign a released EIP from the pool to itself again via the same user-data script and the cluster topology will be successfully healed successfully.
I hope, that was useful and I really appreciate any feedback regarding your experience.
Find me in Twitter