diff --git a/src/main/java/org/akhq/controllers/TailController.java b/src/main/java/org/akhq/controllers/TailController.java index 30968c21b..fb25a28e5 100644 --- a/src/main/java/org/akhq/controllers/TailController.java +++ b/src/main/java/org/akhq/controllers/TailController.java @@ -8,7 +8,6 @@ import io.micronaut.scheduling.TaskExecutors; import io.micronaut.scheduling.annotation.ExecuteOn; import io.micronaut.security.annotation.Secured; -import io.reactivex.schedulers.Schedulers; import io.swagger.v3.oas.annotations.Operation; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -36,8 +35,8 @@ public TailController(RecordRepository recordRepository) { } @Secured(Role.ROLE_TOPIC_DATA_READ) - @ExecuteOn(TaskExecutors.IO) @Get(value = "api/{cluster}/tail/sse", produces = MediaType.TEXT_EVENT_STREAM) + @ExecuteOn(TaskExecutors.IO) @Operation(tags = {"topic data"}, summary = "Tail for data on multiple topic") public Publisher> sse( String cluster, @@ -51,7 +50,6 @@ public Publisher> sse( return recordRepository .tail(cluster, options) - .observeOn(Schedulers.io()) .map(event -> { TailRecord tailRecord = new TailRecord(); tailRecord.offsets = getOffsets(event); diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java index 094d191f5..d1965b80e 100644 --- a/src/main/java/org/akhq/controllers/TopicController.java +++ b/src/main/java/org/akhq/controllers/TopicController.java @@ -9,50 +9,18 @@ import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; -import io.micronaut.http.annotation.Body; -import io.micronaut.http.annotation.Controller; -import io.micronaut.http.annotation.Delete; -import io.micronaut.http.annotation.Get; -import io.micronaut.http.annotation.Post; -import io.micronaut.http.annotation.QueryValue; +import io.micronaut.http.annotation.*; import io.micronaut.http.sse.Event; import io.micronaut.scheduling.TaskExecutors; import io.micronaut.scheduling.annotation.ExecuteOn; import io.micronaut.security.annotation.Secured; -import io.reactivex.schedulers.Schedulers; import io.swagger.v3.oas.annotations.Operation; -import java.time.Instant; -import java.util.Base64; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import javax.inject.Inject; - -import lombok.AllArgsConstructor; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.ToString; +import lombok.*; import lombok.extern.slf4j.Slf4j; import org.akhq.configs.Role; -import org.akhq.models.AccessControl; -import org.akhq.models.Config; -import org.akhq.models.ConsumerGroup; -import org.akhq.models.LogDir; -import org.akhq.models.Partition; -import org.akhq.models.Record; -import org.akhq.models.Topic; -import org.akhq.models.TopicPartition; +import org.akhq.models.*; import org.akhq.modules.AbstractKafkaWrapper; -import org.akhq.repositories.AccessControlListRepository; -import org.akhq.repositories.ConfigRepository; -import org.akhq.repositories.ConsumerGroupRepository; -import org.akhq.repositories.RecordRepository; -import org.akhq.repositories.TopicRepository; +import org.akhq.repositories.*; import org.akhq.utils.Pagination; import org.akhq.utils.ResultNextList; import org.akhq.utils.ResultPagedList; @@ -61,6 +29,12 @@ import org.codehaus.httpcache4j.uri.URIBuilder; import org.reactivestreams.Publisher; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import javax.inject.Inject; + @Slf4j @Secured(Role.ROLE_TOPIC_READ) @Controller @@ -332,7 +306,6 @@ public Publisher> sse( return recordRepository .search(cluster, options) - .observeOn(Schedulers.io()) .map(event -> { SearchRecord searchRecord = new SearchRecord( event.getData().getPercent(),