Kafka bindings for Haskell backed by the librdkafka C module. It has been tested and fully supports Kafka 0.9.0.1 using librdkafka 0.9.0.99 and higher on Linux and OS X. Haskakafka supports both producers and consumers with optional batch operations.
Hackage: http://hackage.haskell.org/package/haskakafka
A quick walkthrough of producers and consumers:
import Haskakafka
import qualified Data.ByteString.Char8 as C8
example :: IO ()
example = do
let
-- Optionally, we can configure certain parameters for Kafka
kafkaConfig = [("socket.timeout.ms", "50000")]
topicConfig = [("request.timeout.ms", "50000")]
-- Payloads are just ByteStrings
samplePayload = C8.pack "Hello world"
-- withKafkaProducer opens a producer connection and gives us
-- two objects for subsequent use.
withKafkaProducer kafkaConfig topicConfig
"localhost:9092" "test_topic"
$ \kafka topic -> do
-- Produce a single unkeyed message to partition 0
let message = KafkaProduceMessage samplePayload
_ <- produceMessage topic (KafkaSpecifiedPartition 0) message
-- Produce a single keyed message
let keyMessage = KafkaProduceKeyedMessage (C8.pack "Key") samplePayload
_ <- produceKeyedMessage topic keyMessage
-- We can also use the batch API for better performance
_ <- produceMessageBatch topic KafkaUnassignedPartition [message, keyMessage]
putStrLn "Done producing messages, here was our config: "
dumpConfFromKafka kafka >>= \d -> putStrLn $ "Kafka config: " ++ (show d)
dumpConfFromKafkaTopic topic >>= \d -> putStrLn $ "Topic config: " ++ (show d)
-- withKafkaConsumer opens a consumer connection and starts consuming
let partition = 0
withKafkaConsumer kafkaConfig topicConfig
"localhost:9092" "test_topic"
partition -- locked to a specific partition for each consumer
KafkaOffsetBeginning -- start reading from beginning (alternatively, use
-- KafkaOffsetEnd, KafkaOffset or KafkaOffsetStored)
$ \kafka topic -> do
-- Consume a single message at a time
let timeoutMs = 1000
me <- consumeMessage topic partition timeoutMs
case me of
(Left err) -> putStrLn $ "Uh oh, an error! " ++ (show err)
(Right m) -> putStrLn $ "Woo, payload was " ++ (C8.unpack $ messagePayload m)
-- For better performance, consume in batches
let maxMessages = 10
mes <- consumeMessageBatch topic partition timeoutMs maxMessages
case mes of
(Left err) -> putStrLn $ "Something went wrong in batch consume! " ++ (show err)
(Right ms) -> putStrLn $ "Woohoo, we got " ++ (show $ length ms) ++ " messages"
-- Be a little less noisy
setLogLevel kafka KafkaLogCrit
-- we can also fetch metadata about our Kafka infrastructure
let timeoutMs = 1000
emd <- fetchBrokerMetadata [] "localhost:9092" timeoutMs
case emd of
(Left err) -> putStrLn $ "Uh oh, error time: " ++ (show err)
(Right md) -> putStrLn $ "Kafka metadata: " ++ (show md)
Configuration options are set in the call to withKafkaConsumer
and withKafkaProducer
. For
the full list of supported options, see
librdkafka's list.
High level consumers are supported by librdkafka starting from version 0.9. High-level consumers have the ability to handle more than one partition and even more than one topic. Scalability and rebalancing are taken care of by librdkafka: once a new consumer in the same consumer group is started the rebalance happens and all consumer share the load.
This version of Haskakafka adds (experimental) support for high-level consumers, here is how such a consumer can be used in code:
import Haskakafka
import Haskakafka.Consumer
runConsumerExample :: IO ()
runConsumerExample = do
res <- runConsumer
(ConsumerGroupId "test_group") -- group id is required
[] -- extra kafka conf properties
(BrokersString "localhost:9092") -- kafka brokers to connect to
[TopicName "^hl-test*"] -- list of topics to consume, supporting regex
processMessages -- handler to consume messages
print $ show res
-- this function is used inside consumer
-- and it is responsible for polling and handling messages
-- In this case I will do 10 polls and then return a success
processMessages :: Kafka -> IO (Either KafkaError ())
processMessages kafka = do
mapM_ (\_ -> do
msg1 <- pollMessage kafka 1000
print $ show msg1) [1..10]
return $ Right ()
Although librdkafka is available on many platforms, most of the distribution packages are too old to support haskakafka. As such, we suggest you install from the source:
git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make && sudo make install
If the C++ bindings fail for you, just install the C bindings alone.
cd librdkafka/src
make && sudo make install
On Debian and OS X, this will install the shared and static libraries to /usr/local/lib
.
The full Kafka guide is at http://kafka.apache.org/documentation.html#quickstart
If you want to use cabal—since haskakafka uses c2hs
to generate C bindings—you may need to
explicitly install c2hs
somewhere on your path (i.e. outside of a sandbox).
To do so, run:
cabal install c2hs
Afterwards installation should work, so go for
cabal install haskakafka
This uses the latest version of Haskakafka from Hackage.
Haskakafka ships with a suite of integration tests to verify the library against
a live Kafka instance. To get these setup you must have a broker running
on localhost:9092
(or overwrite the HASKAKAFKA_TEST_BROKER
environment variable)
with a haskakafka_tests
topic created (or overwrite the HASKAKAFKA_TEST_TOPIC
environment variable).
To get a broker running, download a Kafka distribution and untar it into a directory. From there, run zookeeper using
bin/zookeeper-server-start.sh config/zookeeper.properties
and run kafka in a separate window using
bin/kafka-server-start.sh config/server.properties
With both Kafka and Zookeeper running, you can run tests through stack:
stack test
You can also run tests through cabal:
cabal install --only-dependencies --enable-tests
cabal test --log=/dev/stdout
stack build
stack exec -- basic --help
basic example [OPTIONS]
Fetch metadata, produce, and consume a message
Common flags:
-b --brokers=<brokers> Comma separated list in format
<hostname>:<port>,<hostname>:<port>
-t --topic=<topic> Topic to fetch / produce
-C --consumer Consumer mode
-P --producer Producer mode
-L --list Metadata list mode
-A --all Run producer, consumer, and metadata list
-p=<num> Partition (-1 for random partitioner when
using producer)
--pretty Pretty print output
-? --help Display help message
-V --version Print version information
The following will produce 11 messages on partition 5 for topic test_topic
:
stack exec -- basic -b "broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092" -t test_topic -p 5 -P
The following will consume 11 messages on partition 5 for topic test_topic
:
stack exec -- basic -b "broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092" -t test_topic -p 5 -C
The following will pretty print a list of all brokers and topics:
stack exec -- basic -b "broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092" -L --pretty