Skip to content

Commit

Permalink
Merge pull request #56 from polarising-oss/AKHQ-18-Topics-Data-Logs
Browse files Browse the repository at this point in the history
AKHQ-18-Topics-Data-Logs
  • Loading branch information
angelaalves authored Mar 16, 2020
2 parents aecc10b + 643f268 commit c54ec7f
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 25 deletions.
2 changes: 1 addition & 1 deletion client/src/containers/TopicList/Topic/Topic.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Topic extends Component {
case 'acls':
return <TopicAcls history={history} />;
case 'logs':
return <TopicLogs history={history} />;
return <TopicLogs clusterId={clusterId} topic={topicId} history={history} />;
default:
return <TopicData history={history} />;
}
Expand Down
110 changes: 102 additions & 8 deletions client/src/containers/TopicList/Topic/TopicLogs/TopicLogs.jsx
Original file line number Diff line number Diff line change
@@ -1,13 +1,107 @@
import React, {Component} from 'react';
import React, { Component } from 'react';
import { uriTopicsLogs } from '../../../../utils/endpoints';
import Table from '../../../../components/Table';
import { get } from '../../../../utils/api';
import converters from '../../../../utils/converters';


class TopicLogs extends Component {
render() {
return (
<h2>
Logs
</h2>
);
state = {
data: [],
selectedCluster: this.props.clusterId,
selectedTopic: this.props.topic
};

componentDidMount() {
this.getTopicLogs();
}

async getTopicLogs() {
let logs = [];
const { selectedCluster, selectedTopic } = this.state;
const { history } = this.props;
history.push({
loading: true
});
try {
logs = await get(uriTopicsLogs(selectedCluster, selectedTopic));
this.handleData(logs.data);
} catch (err) {
console.error('Error:', err);
} finally {
history.push({
loading: false
});
}
}

handleData(logs) {
let tableLogs = logs.map(log => {
return {
broker: log.broker,
topic: log.topic,
partition: log.partition,
size: log.size,
offsetLag: log.offsetLag
};
});
this.setState({ data: tableLogs });
}

handleSize(size) {

return (
<label>
{converters.showBytes(size, 0)}
</label>
);
}
render() {
const { data } = this.state;
return (
<div>
<Table
columns={[
{
id: 'broker',
accessor: 'broker',
colName: 'Broker',
type: 'text'
},
{
id: 'topic',
accessor: 'topic',
colName: 'Topic',
type: 'text'
},
{
id: 'partition',
accessor: 'partition',
colName: 'Partition',
type: 'text'
},

{
id: 'size',
accessor: 'size',
colName: 'Size',
type: 'text',
cell: (obj, col) => {
return this.handleSize(obj[col.accessor]);
}
},
{
id: 'offsetLag',
accessor: 'offsetLag',
colName: 'OffsetLag',
type: 'text'
}
]}
data={data}
/>
</div>
);
}
}

export default TopicLogs;
export default TopicLogs;
8 changes: 8 additions & 0 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ export const uriTopicsPartitions = (clusterId, topicId) => {
);
};

export const uriTopicsLogs = (clusterId, topicId) => {
return (
`${apiUrl}/topic/logs${clusterId ? '?clusterId=' + clusterId : ''}` +
`${topicId ? '&topicId=' + topicId : ''}`
);
};

export const uriNodesConfigs = (clusterId, nodeId) => {
return (
`${apiUrl}/cluster/nodes/configs${clusterId ? '?clusterId=' + clusterId : ''}` +
Expand Down Expand Up @@ -60,6 +67,7 @@ export default {
uriNodes,
uriNodesConfigs,
uriTopics,
uriTopicsLogs,
uriDeleteTopics,
uriTopicsProduce,
uriConsumerGroups
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/org/kafkahq/rest/TopicResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@
import lombok.extern.slf4j.Slf4j;
import org.kafkahq.repositories.RecordRepository;
import org.kafkahq.service.TopicService;
import org.kafkahq.service.dto.topic.CreateTopicDTO;
import org.kafkahq.service.dto.topic.DeleteTopicDTO;
import org.kafkahq.service.dto.topic.PartitionDTO;
import org.kafkahq.service.dto.topic.ProduceTopicDTO;
import org.kafkahq.service.dto.topic.RecordDTO;
import org.kafkahq.service.dto.topic.TopicListDTO;
import org.kafkahq.service.dto.topic.*;

import javax.annotation.Nullable;
import javax.inject.Inject;
Expand Down Expand Up @@ -66,6 +61,12 @@ public List<PartitionDTO> fetchTopicPartitions(String clusterId, String topicId)
return topicService.getTopicPartitions(clusterId, topicId);
}

@Get("/topic/logs")
public List<LogDTO> fetchTopicLogs(String clusterId, String topicId) throws ExecutionException, InterruptedException {
log.debug("Fetch logs from topic: {}", topicId);
return topicService.getTopicLogs(clusterId, topicId);
}

@Delete("/topic/delete")
public TopicListDTO deleteTopic(@Body DeleteTopicDTO deleteTopicDTO) throws ExecutionException, InterruptedException {
log.debug("Delete topic: {}", deleteTopicDTO.getTopicId());
Expand Down
15 changes: 9 additions & 6 deletions src/main/java/org/kafkahq/service/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@
import io.micronaut.context.env.Environment;
import org.apache.kafka.clients.admin.TopicListing;
import org.kafkahq.models.Config;
import org.kafkahq.models.LogDir;
import org.kafkahq.models.Record;
import org.kafkahq.models.Topic;
import org.kafkahq.modules.AbstractKafkaWrapper;
import org.kafkahq.modules.KafkaModule;
import org.kafkahq.repositories.RecordRepository;
import org.kafkahq.repositories.TopicRepository;
import org.kafkahq.service.dto.topic.CreateTopicDTO;
import org.kafkahq.service.dto.topic.PartitionDTO;
import org.kafkahq.service.dto.topic.ProduceTopicDTO;
import org.kafkahq.service.dto.topic.RecordDTO;
import org.kafkahq.service.dto.topic.TopicDTO;
import org.kafkahq.service.dto.topic.TopicListDTO;
import org.kafkahq.service.dto.topic.*;
import org.kafkahq.service.mapper.TopicMapper;
import org.kafkahq.utils.PagedList;
import org.kafkahq.utils.Pagination;
Expand Down Expand Up @@ -107,6 +103,13 @@ public List<PartitionDTO> getTopicPartitions(String clusterId, String topicId) t
.collect(Collectors.toList());
}

public List<LogDTO> getTopicLogs(String clusterId, String topicId) throws ExecutionException, InterruptedException {
Topic topic = this.topicRepository.findByName(clusterId, topicId);

return topic.getLogDir().stream().map(log -> topicMapper.fromLogToLogDTO(log))
.collect(Collectors.toList());
}

public void createTopic(CreateTopicDTO createTopicDTO) throws ExecutionException, InterruptedException {
List<Config> options = new ArrayList<>();
options.add(new Config("retention.ms", createTopicDTO.getRetention()));
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/kafkahq/service/dto/topic/LogDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.kafkahq.service.dto.topic;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class LogDTO {
private int broker;
private String topic;
private int partition;
private long size;
private long offsetLag;
}
13 changes: 9 additions & 4 deletions src/main/java/org/kafkahq/service/mapper/TopicMapper.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package org.kafkahq.service.mapper;

import org.apache.commons.lang3.tuple.Pair;
import org.kafkahq.models.ConsumerGroup;
import org.kafkahq.models.Partition;
import org.kafkahq.models.Record;
import org.kafkahq.models.Topic;
import org.kafkahq.models.*;
import org.kafkahq.service.dto.topic.LogDTO;
import org.kafkahq.service.dto.topic.PartitionDTO;
import org.kafkahq.service.dto.topic.PartitionDTO.OffsetsDTO;
import org.kafkahq.service.dto.topic.PartitionDTO.ReplicaDTO;
Expand Down Expand Up @@ -47,4 +45,11 @@ public PartitionDTO fromPartitionToPartitionDTO(Partition partition) {

return new PartitionDTO(partition.getId(), partition.getLeader().getId(), replicas, offsetsDTO, sizesDTO);
}

public LogDTO fromLogToLogDTO(LogDir log) {

return new LogDTO(log.getBrokerId(),log.getTopic(),log.getPartition(),log.getSize(),log.getOffsetLag());
}


}

0 comments on commit c54ec7f

Please sign in to comment.