diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index e51d833630b3..bd7bc0442355 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -78,7 +78,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -253,7 +252,6 @@ public enum Status private volatile DateTime minMessageTime; private volatile DateTime maxMessageTime; private final ScheduledExecutorService rejectionPeriodUpdaterExec; - private final ServiceEmitter emitter; public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @@ -275,7 +273,6 @@ public SeekableStreamIndexTaskRunner( this.sequences = new CopyOnWriteArrayList<>(); this.ingestionState = IngestionState.NOT_STARTED; this.lockGranularityToUse = lockGranularityToUse; - this.emitter = toolbox.getEmitter(); minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN); maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX); @@ -668,7 +665,7 @@ public void run() // Emit the processed bytes metric try { - emitter.emit( + toolbox.getEmitter().emit( ServiceMetricEvent.builder() .setDimension("taskId", task.getId()) .setDimension("dataSource", task.getDataSource())