Skip to content

Commit

Permalink
Initial version with basic pub sub working
Browse files Browse the repository at this point in the history
  • Loading branch information
daneshzaki authored Oct 15, 2020
1 parent 9afb7e8 commit 425f50c
Show file tree
Hide file tree
Showing 7 changed files with 428 additions and 0 deletions.
42 changes: 42 additions & 0 deletions consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//Kafka Consumer
//setup
console.log("Kafka consumer");
const { Kafka } = require('kafkajs');

//subscriber

module.exports = async function consumer(broker, group, topicName, io) {
console.log("^^^^^^Kafka consumer with " + broker + " topic as " + topicName + " group as " + group);

const kafka = new Kafka({
clientId: 'kokpit-talking',
//externalize
brokers: [broker]
});

const consumer = kafka.consumer({ groupId: group });

await consumer.connect();
console.log("^^^^^^Kafka consumer connected");

await consumer.subscribe({ topic: topicName, fromBeginning: true });
console.log("^^^^^^Kafka consumer subscribed");
console.log("consuming...");
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {

/*console.log({
partition,
offset: message.offset,
value: message.value.toString(),});*/

//working code
//res.send(message.value.toString());
io.sockets.emit('message', message.value.toString());


//todo: send continous messages to browser - use socket.io
},
});

}
Binary file added images/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
221 changes: 221 additions & 0 deletions index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
<!DOCTYPE html>
<html lang="en">

<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Kokpit - Kafka Pub-Sub Client</title>
<link rel="icon" href="./images/icon.png" sizes="16x16">

<!--Begin Bootstrap Material -->
<!-- Material Design for Bootstrap fonts and icons -->
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto:300,400,500,700|Material+Icons">
<link rel="stylesheet"
href="https://unpkg.com/[email protected]/dist/css/bootstrap-material-design.min.css"
integrity="sha384-wXznGJNEXNG1NFsbm0ugrLFMQPWswR3lds2VeinahP8N0zJw9VWSopbjv2x7WCvX" crossorigin="anonymous">
<!--End Bootstrap Material -->

<!-- jQuery library -->
<!-- jQuery (Should be declared before other JS imports) -->
<script>
var $ = jQuery = require("jquery");
</script>

<!-- Popper JS -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.12.9/umd/popper.min.js"></script>

<!-- Latest compiled JavaScript -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/4.0.0/js/bootstrap.min.js"></script>

<!-- SnackbarJS plugin -->
<script src="https://cdn.rawgit.com/FezVrasta/snackbarjs/1.1.0/dist/snackbar.min.js"></script>

<!--Begin Bootstrap Material -->
<script src="https://unpkg.com/[email protected]/dist/js/bootstrap-material-design.js"
integrity="sha384-CauSuKpEqAFajSpkdjv3z9t8E7RlpJ1UP0lKM/+NdtSarroVKu069AlsRPKkFBz9"
crossorigin="anonymous"></script>
<script>$(document).ready(function () { $('body').bootstrapMaterialDesign(); });</script>
<!-- End Bootstrap Material-->

<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.1/socket.io.js"></script>

<script>

$(document).ready(function () {
//pub
$("#publish").click(function () {

var brokerHost = $("#brokerHost").val();
var brokerPort = $("#brokerPort").val();
var topicName = $("#topicName").val();
var pubMsg = $("#message").val();
// alert(brokerHost+' '+brokerPort+' '+topicName+' '+pubMsg);
$.ajax({
url: "http://localhost:8182/publish",
method: 'post',
data: { 'brokerHost': brokerHost, 'brokerPort': brokerPort, 'topicName': topicName, 'message': pubMsg },
success: function (result) {
$("#pubStatus").html(result);
}
//TODO handle errors
});
});

//sub
$("#subscribe").click(function () {

var subBrokerHost = $("#subBrokerHost").val();
var subBrokerPort = $("#subBrokerPort").val();
var subTopicName = $("#subTopicName").val();

//alert("sub - "+subBrokerHost + ":" + subBrokerPort + subTopicName );
$.ajax({
url: "http://localhost:8182/consume",
method: 'post',
data: { 'brokerHost': subBrokerHost, 'brokerPort': subBrokerPort, 'topicName': subTopicName },
success: function (result) {
console.log("sub result - " + result);
//$("#subMessages").html(result);
}
//TODO handle errors
});
});

//copy pub settings to sub
$("#sameAsPub").click(function () {
//alert("$(sameAsPub) checked =" + $(this).is(':checked'));
if ($(this).is(':checked')) {
$("#subBrokerHost").val($("#brokerHost").val());
$("#subBrokerPort").val($("#brokerPort").val());
$("#subTopicName").val($("#topicName").val());
}
else {
$("#subBrokerHost").val("");
$("#subBrokerPort").val("");
$("#subTopicName").val("");
}
});
//socket
//init socket
var socket = io('http://localhost:8182');

//on message call update
socket.on('message', function (msg) {
//alert("Raw message: " + msg);
$("#subMessages").append(msg);
$("#subMessages").append("\n\n");

});
});

</script>
<style>
body {
padding-top: 10px;
}
</style>
</head>

<body>


<div class="container">
<!-- header -->
<nav class="navbar navbar-light bg-light">
<a class="navbar-brand" href="#">
<img src="./images/icon.png" width="24" height="24" class="d-inline-block img-thumbnail" alt="">
Kokpit
</a>
</nav>
<p />
<div class="row">
<!--Publisher-->
<div class="col-sm">
<form id="publishForm" action="http://localhost:8182/publish" method="POST">
<h3 class="display-7">Publisher</h3>
<label>
Broker Settings
</label>
<div class="btn-toolbar mb-3" role="toolbar" aria-label="Toolbar with button groups">
<div class="btn-group mr-2" role="group" aria-label="First group">
<input type="text" class="form-control form-control-sm" name="brokerHost" id="brokerHost"
placeholder="Broker Host" /> &nbsp;
<input type="text" class="form-control form-control-sm" name="brokerPort" id="brokerPort"
placeholder="Broker Port" />
</div>
<div class="input-group">
<div class="switch">
<label>
<input type="checkbox" id="connectPub" value="AutoConnect" disabled checked>
Connect
</label>
</div>
</div>
</div>
<p />
<label for="topicName">
Topic&nbsp;
</label>

<input type="text" class="form-control form-control-sm" id="topicName" name="topicName" />
<label for="message">
Message&nbsp;
</label>
<textArea class="form-control form-control-sm" rows="3" id="message" name="message"></textArea>
<br />
<!--TODO: Add snackbar-->
<input type="button" class="btn btn-raised btn-secondary" id="publish" value="Publish"></input>
</form>

<div id="pubStatus"></div>
</div>
<!-- Consumer -->
<div class="col-sm">
<form id="subscribeForm" action="http://localhost:8182/consume" method="POST">
<h3 class="display-7">Consumer</h3>
<label>Broker Settings</label>
<div class="input-group">
<div class="form-inline">
<label for="sameAsPub">
<small class="text-justify">Same as publisher&nbsp;</small>
</label>
<input type="checkbox" class="form-control form-control-sm" id="sameAsPub" />
</div>

</div>
<div class="btn-toolbar mb-3" role="toolbar" aria-label="Toolbar with button groups">
<div class="btn-group mr-2" role="group" aria-label="First group">
<input type="text" class="form-control form-control-sm" id="subBrokerHost"
name="brokerHost" placeholder="Broker Host" /> &nbsp;
<input type="text" class="form-control form-control-sm" id="subBrokerPort"
name="brokerPort" placeholder="Broker Port" />
</div>
<div class="input-group">
<div class="switch">
<label>
<input type="checkbox" id="connectSub" disabled checked>
AutoConnect
</label>
</div>
</div>
</div>
<label for="subTopicName">
Topic&nbsp;
</label>

<input type="text" class="form-control form-control-sm" id="subTopicName" name="topicName" />
<label for="subMessages">
Message&nbsp;
</label>
<textArea class="form-control form-control-sm" rows="3" id="subMessages" name="messages"
disabled></textArea>
<br />
<!--TODO: Add snackbar-->
<input type="button" class="btn btn-raised btn-secondary" id="subscribe" value="Subscribe"></input>
</form>
<div id="subStatus"></div>
</div>
</div>
</div>
</body>

</html>
52 changes: 52 additions & 0 deletions kokpitserver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//setup
var express = require('express');
var app = express();
var server = app.listen(8182);
var io = require('socket.io').listen(server);

//producer
const producer = require("./producer");

//consumer
const consumer = require("./consumer");

//to parse the payload
var bodyParser = require('body-parser')
app.use( bodyParser.json() ); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ // to support URL-encoded bodies
extended: true
}));


//to serve images etc
app.use(express.static('public'));

//to load index.html
app.get('/', function(req, res, next){
res.sendFile(__dirname + '/public/index.html');
});

//to handle publish requests
app.post('/publish', function(req, res, next){
console.log("**** /publish called");
console.log(req.body.brokerHost+':'+req.body.brokerPort, req.body.topicName, req.body.message);
producer(req.body.brokerHost+':'+req.body.brokerPort, req.body.topicName, req.body.message);
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end("Message "+ req.body.message + " published");
});

//to handle consume requests
app.post('/consume', function(req, res, next){
//TODO: add group to UI
console.log("**** /consume called \n"+req.body.brokerHost+':'+req.body.brokerPort, "kokpitgroup", req.body.topicName);
//todo remove the dummy responses
/*io.sockets.emit('message',"dummy msg1");
io.sockets.emit('message',"dummy msg2");
io.sockets.emit('message',"dummy msg3");*/

//todo uncomment this
consumer(req.body.brokerHost+':'+req.body.brokerPort, "kokpitgroup", req.body.topicName, io);
});



54 changes: 54 additions & 0 deletions main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const { app, BrowserWindow } = require('electron')
const nativeImage = require('electron').nativeImage

function createWindow () {
// Create the browser window.
const win = new BrowserWindow({
width: 1250,
height: 550,
autoHideMenuBar: true,
resizable: true,
fullscreenable:false,

webPreferences: {
nodeIntegration: true,
devTools : false
}
})

//no menu
win.removeMenu();
const image = nativeImage.createFromPath('/code/electronjs/kokpitel/images/icon.png');
//set icon
win.setOverlayIcon(image, '');

// and load the index.html of the app.
win.loadFile('index.html')

}

// This method will be called when Electron has finished
// initialization and is ready to create browser windows.
// Some APIs can only be used after this event occurs.
app.whenReady().then(createWindow)

// Quit when all windows are closed, except on macOS. There, it's common
// for applications and their menu bar to stay active until the user quits
// explicitly with Cmd + Q.
app.on('window-all-closed', () => {
if (process.platform !== 'darwin') {
app.quit()
}
})

app.on('activate', () => {
// On macOS it's common to re-create a window in the app when the
// dock icon is clicked and there are no other windows open.
if (BrowserWindow.getAllWindows().length === 0) {
createWindow()
}
})

// In this file you can include the rest of your app's specific main process
// code. You can also put them in separate files and require them here.
const kokpitserver = require("./kokpitserver");
Loading

0 comments on commit 425f50c

Please sign in to comment.