Quickstart

1
2
> tar -xzf kafka_2.11-1.1.0.tgz
> cd kafka_2.11-1.1.0

Start the server

1
bin/zookeeper-server-start.sh config/zookeeper.properties
1
bin/kafka-server-start.sh config/server.properties

Create a topic

1
bin/kaftopics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
1
bin/kafka-topics.sh --list --zookeeper localhost:2181

Send some messages

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Python client for the Apache Kafka

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python3

import logging
from kafka import KafkaProducer
from kafka.errors import KafkaError
import msgpack


log = logging.getLogger(__name__)

def send():
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=msgpack.dumps)
    # Asynchronous by default
    future = producer.send('my-topic', 'last')

    # Block for 'synchronous' sends
    try:
        record_metadata = future.get(timeout=10)
    except KafkaError:
        log.error('error occur')
        pass

    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)


if __name__ == '__main__':
    send()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#!/usr/bin/env python3

from kafka import KafkaConsumer
import msgpack


def receive():
    consumer = KafkaConsumer('my-topic', bootstrap_servers=['localhost:9092'], value_deserializer=msgpack.unpackb)
    for msg in consumer:
        print(msg)
        print ("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))

if __name__ == '__main__':
    receive()