Skip to content

Commit

Permalink
got rid of the ConsumerConfigService - use proper Spring value inject…
Browse files Browse the repository at this point in the history
…ion; updated comments; updated docs; removed obsolete example implementation (reachkrishnaraj#30)
  • Loading branch information
ppine7 authored and dhyaneshm committed Apr 19, 2016
1 parent cce1f97 commit 063e53f
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 426 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

### _As described in the illustration above, here is how the indexer works:_

* Kafka has a topic named, say `Topic1`

* Lets say, `Topic1` has 5 partitions.
* Lets assume Kafka has a topic `Topic1` with 5 partitions

* In the configuration file, kafka-es-indexer.properties, set firstPartition=0 and lastPartition=4 properties

* start the indexer application as described below

* there will be 5 threads started, one for each consumer from each of the partitions
* there will be 5 threads started, one for each consumer for each of the partitions

* each job will be reading events from Kafka and indexing them (in batches) into ElasticSearch - using configured index name and type

* when a new partition is added to the kafka topic - configuration has to be updated and the indexer application has to be restarted

Expand Down
38 changes: 24 additions & 14 deletions src/main/java/org/elasticsearch/kafka/indexer/jobs/IndexerJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.kafka.indexer.exception.IndexerESException;
import org.elasticsearch.kafka.indexer.exception.KafkaClientNotRecoverableException;
import org.elasticsearch.kafka.indexer.exception.KafkaClientRecoverableException;
import org.elasticsearch.kafka.indexer.service.ConsumerConfigService;
import org.elasticsearch.kafka.indexer.service.IMessageHandler;
import org.elasticsearch.kafka.indexer.service.KafkaClientService;
import org.slf4j.Logger;
Expand All @@ -23,7 +22,6 @@
public class IndexerJob implements Callable<IndexerJobStatus> {

private static final Logger logger = LoggerFactory.getLogger(IndexerJob.class);
private ConsumerConfigService configService;
private IMessageHandler messageHandlerService;
public KafkaClientService kafkaClient;
private long offsetForThisRound;
Expand All @@ -33,17 +31,21 @@ public class IndexerJob implements Callable<IndexerJobStatus> {
private final String currentTopic;
private IndexerJobStatus indexerJobStatus;
private volatile boolean shutdownRequested = false;


public IndexerJob(ConsumerConfigService configService, IMessageHandler messageHandlerService,
KafkaClientService kafkaClient, int partition)
private int consumerSleepBetweenFetchsMs;
// this property can be set to TRUE to enable logging timings of the event processing
private boolean isPerfReportingEnabled = false;
// this property can be set to TRUE to skip indexing into ES
private boolean isDryRun = false;

public IndexerJob(String topic, IMessageHandler messageHandlerService,
KafkaClientService kafkaClient, int partition, int consumerSleepBetweenFetchsMs)
throws Exception {
this.configService = configService;
this.currentPartition = partition;
this.currentTopic = configService.getTopic();
this.currentTopic = topic;
this.messageHandlerService = messageHandlerService;
indexerJobStatus = new IndexerJobStatus(-1L, IndexerJobStatusEnum.Created, partition);
isStartingFirstTime = true;
this.consumerSleepBetweenFetchsMs = consumerSleepBetweenFetchsMs;
this.kafkaClient = kafkaClient;
indexerJobStatus.setJobStatus(IndexerJobStatusEnum.Initialized);
logger.info("Created IndexerJob for topic={}, partition={}; messageHandlerService={}; kafkaClient={}",
Expand All @@ -67,7 +69,7 @@ public IndexerJobStatus call() {
logger.debug("******* Starting a new batch of events from Kafka for partition {} ...", currentPartition);
processMessagesFromKafka();
indexerJobStatus.setJobStatus(IndexerJobStatusEnum.InProgress);
Thread.sleep(configService.getConsumerSleepBetweenFetchsMs() * 1000);
Thread.sleep(consumerSleepBetweenFetchsMs * 1000);
logger.debug("Completed a round of indexing into ES for partition {}", currentPartition);
} catch (IndexerESException | KafkaClientNotRecoverableException e) {
indexerJobStatus.setJobStatus(IndexerJobStatusEnum.Failed);
Expand Down Expand Up @@ -127,7 +129,7 @@ public void processMessagesFromKafka() throws Exception {
if (byteBufferMsgSet != null) {
logger.debug("Starting to prepare for post to ElasticSearch for partition {}", currentPartition);
long proposedNextOffsetToProcess = addMessagesToBatch(jobStartTime, byteBufferMsgSet);
if (configService.isDryRun()) {
if (isDryRun) {
logger.info("**** This is a dry run, NOT committing the offset in Kafka nor posting to ES for partition {}****", currentPartition);
return;
}
Expand All @@ -144,7 +146,7 @@ public void processMessagesFromKafka() throws Exception {
* @throws KafkaClientRecoverableException
*/
private void commitOffSet(long jobStartTime) throws KafkaClientRecoverableException {
if (configService.isPerfReportingEnabled()) {
if (isPerfReportingEnabled) {
long timeAfterEsPost = System.currentTimeMillis();
logger.debug("Approx time to post of ElasticSearch: {} ms for partition {}",
(timeAfterEsPost - jobStartTime), currentPartition);
Expand All @@ -159,7 +161,7 @@ private void commitOffSet(long jobStartTime) throws KafkaClientRecoverableExcept
" after processing and posting to ES; partition=" + currentPartition + "; error: " + e.getMessage(), e);
}

if (configService.isPerfReportingEnabled()) {
if (isPerfReportingEnabled) {
long timeAtEndOfJob = System.currentTimeMillis();
logger.info("*** This round of IndexerJob took about {} ms for partition {} ",
(timeAtEndOfJob - jobStartTime), currentPartition);
Expand Down Expand Up @@ -228,7 +230,7 @@ private long addMessagesToBatch(long jobStartTime, ByteBufferMessageSet byteBuff
"# of successfully transformed and added to Index: {}; # of skipped from indexing: {}; offsetOfNextBatch: {}",
numMessagesInBatch, numProcessedMessages, numSkippedIndexingMessages, offsetOfNextBatch);

if (configService.isPerfReportingEnabled()) {
if (isPerfReportingEnabled) {
long timeAtPrepareES = System.currentTimeMillis();
logger.debug("Completed preparing for post to ElasticSearch. Approx time taken: {}ms for partition {}",
(timeAtPrepareES - jobStartTime), currentPartition);
Expand Down Expand Up @@ -261,7 +263,7 @@ private ByteBufferMessageSet getMessageAndOffsets(long jobStartTime) throws Exce
}

byteBufferMsgSet = fetchResponse.messageSet(currentTopic, currentPartition);
if (configService.isPerfReportingEnabled()) {
if (isPerfReportingEnabled) {
long timeAfterKafkaFetch = System.currentTimeMillis();
logger.debug("Completed MsgSet fetch from Kafka. Approx time taken is {} ms for partition {}",
(timeAfterKafkaFetch - jobStartTime), currentPartition);
Expand Down Expand Up @@ -313,4 +315,12 @@ public IndexerJobStatus getIndexerJobStatus() {
return indexerJobStatus;
}

public void setPerfReportingEnabled(boolean isPerfReportingEnabled) {
this.isPerfReportingEnabled = isPerfReportingEnabled;
}

public void setDryRun(boolean isDryRun) {
this.isDryRun = isDryRun;
}

}
Loading

0 comments on commit 063e53f

Please sign in to comment.