Reputation: 4563
I created a Vagrant/Ansible playbook to build a single-node Kafka VM.
The idea is to provide some agility when prototyping: if we want a quick & dirty Kafka message queue we can simply git clone [my 'kafka in a box' repo]
, cd ..
and vagrant up
.
Here's what I've done so far:
Vagrantfile:
VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "hashicorp/precise64"
config.vm.network "forwarded_port", guest:9092, host: 9092
config.vm.provider "virtualbox" do |vb|
vb.customize ["modifyvm", :id, "--memory", "2048"]
end
config.vm.provision "ansible" do |ansible|
ansible.playbook = "kafkaPlaybook.yml"
end
end
... and the Ansible kafkaPlaybook.yml
file:
---
- hosts: all
user: vagrant
sudo: True
tasks:
- name: install linux packages
action: apt update_cache=yes pkg={{item}} state=installed
with_items:
- vim
- openjdk-7-jdk
- name: make /usr/local/kafka directory
shell: "mkdir /usr/local/kafka"
- name: download kafka (the link is from an apache mirror)
get_url: url=http://apache.spinellicreations.com/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz dest=/usr/local/kafka/kafka-0.8.1.1-src.tgz mode=0440
- name: untar file
shell: "tar -xvf /usr/local/kafka/kafka-0.8.1.1-src.tgz -C /usr/local/kafka"
- name: build kafka with gradle
shell: "cd /usr/local/kafka/kafka-0.8.1.1-src && ./gradlew jar"
When I vagrant up
the box gets provisioned. I'm able to vagrant ssh
and perform the basic producer/consumer tests locally, e.g.
cd /usr/local/kafka/kafka-0.8.1.1-src
bin/zookeeper-server-start.sh config/zookeeper.properties #start zookeeper
bin/kafka-server-start.sh config/server.properties #start kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tests #start a producer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning #start a consumer
When I type messages in the producer window they appear in the consumer window. Great.
I tried connecting to kafka from the host using the kafka-python package:
>>> from kafka import KafkaClient, SimpleProducer
>>> kafka = KafkaClient("127.0.0.1:9092", timeout=120)
>>> kafka.ensure_topic_exists('turkey')
No handlers could be found for logger "kafka"
>>> kafka.ensure_topic_exists('turkey')
>>> producer = SimpleProducer(kafka)
>>> producer.send_messages("turkey", "gobble gobble")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/producer.py", line 261, in send_messages
return super(SimpleProducer, self).send_messages(topic, partition, *msg)
File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/producer.py", line 188, in send_messages
timeout=self.ack_timeout)
File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 312, in send_produce_request
resps = self._send_broker_aware_request(payloads, encoder, decoder)
File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 148, in _send_broker_aware_request
conn = self._get_conn(broker.host, broker.port)
File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 55, in _get_conn
timeout=self.timeout
File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 60, in __init__
self.reinit()
File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 195, in reinit
self._raise_connection_error()
File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 75, in _raise_connection_error
raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
kafka.common.ConnectionError: Kafka @ precise64:9092 went away
The kafka.ensure_topic_exists
call was made twice. The first time it's run, it returns a warning and then creates the topic, so I can see that Python is talking to Kafka on port 9092. However, I'm unable to send messages to the queue.
Can you see what I'm doing wrong?
Upvotes: 4
Views: 2724
Reputation: 4563
The advertised.host.name and advertised.port needed to be set in config/server.properties. I added the following two lines to the playbook:
- name: uncomment and set advertised.host.name
lineinfile: dest=/usr/local/kafka/kafka-0.8.1.1-src/config/server.properties
regexp='^#advertised.host.name=<hostname routable by clients>'
insertafter='^#advertised.host.name=<hostname routable by clients>'
line='advertised.host.name=localhost'
state=present
- name: uncomment and set advertised.port line
lineinfile: dest=/usr/local/kafka/kafka-0.8.1.1-src/config/server.properties
regexp='^#advertised.port=<port accessible by clients>'
insertafter='^#advertised.port=<port accessible by clients>'
line='advertised.port=9092'
state=present
... and now it's possible to provision a single-node Kafka cluster:
git clone https://github.com/alexwoolford/vagrantKafkaBox
cd vagrantKafkaBox
vagrant up
If I were to start this again, I would probably have provisioned a lab Kafka using Wirbelsturm.
Upvotes: 6