Skip to content
Snippets Groups Projects
Commit dcf3a2ec authored by Matthias's avatar Matthias
Browse files

fixed sending only freshest data and added WebRTC through Peer.js

parent 65f01e71
Branches
No related merge requests found
......@@ -14,6 +14,7 @@ Includes the
## Run
0. Start peerjs server in the peerjs-server directory
1. Start the dummy point cloud data sender `g++ data_generator.cpp -fpermissive -o main && ./main`
2. Start the Node process which sends Data to the visualization via WebRTC `node index.js`
3. Start the visualization
const Peer = require('peerjs-nodejs'); // WebRTC
const Stream = require('stream')
const app = require('express')();
const http = require('http').Server(app);
const socket = require('socket.io')(http);
// const socket = require('socket.io')(http);
// const p2p = require('socket.io-p2p-server').Server;
// socket.use(p2p); // use webrtc
const fs = require('fs');
const fifoPath = '/tmp/nodeFIFO';
......@@ -24,14 +27,13 @@ function getSummary(s) {
// unecessary webserver
app.get('/', (req, res) => {
res.send('<h1>okcool but you should visit the visualization site</h1>');
});
http.listen(3000, () => {
console.log('listening on *:3000');
});
// app.get('/', (req, res) => {
// res.send('<h1>okcool but you should visit the visualization site</h1>');
// });
// http.listen(3000, () => {
// console.log('listening on *:3000');
// });
// named pipe anzapfen
......@@ -53,13 +55,15 @@ let buffer = ""; // buffers incoming data
let frameNo = 0; // debug
const packetDelimiter = "EOT"; // string that separates packets
const sendAllPackets = true;
const sendAllPackets = false;
// buffer the FIFO stream, collect pieces of data, and turn into packet stream
fifo.on("data", msg => {
buffer += msg;
let packet;
if (sendAllPackets) {
// there could potentially be multiple packets in the buffer
// so loop until we pushed all of them to the packet stream
......@@ -67,81 +71,102 @@ fifo.on("data", msg => {
delIndex !== -1;
delIndex = buffer.indexOf(packetDelimiter)) {
const packet = buffer
packet = buffer
.substring(0, delIndex)
.replace(/\0/g, ''); // dis shit too null terminated boi
buffer = buffer.substring(delIndex + packetDelimiter.length, buffer.length);
packetStream.push(packet);
frameNo++; // debug
}
} else {
if (!~buffer.indexOf(packetDelimiter)) return;
const split = buffer.split(packetDelimiter);
// keep the beginning of the freshest packet (might be empty)
// throw everything else away
buffer = split[split.length - 1];
const packet = split[split.length - 2].replace(/\0/g, '');
// send the freshest packet
packet = split[split.length - 2].replace(/\0/g, '');
}
packetStream.push(packet);
frameNo++; // debug
});
let peers = 0;
function logAction(action, socket) {
const ip = socket.request.connection.remoteAddress;
const userAgent = socket.request.headers['user-agent'];
function logAction(action, buddy) {
// const ip = socket.request.connection.remoteAddress;
// const userAgent = socket.request.headers['user-agent'];
console.log(
"\n"+action, ip, "(" + peers + " connections)",
"\n\t User Agent:", userAgent, "\n\t Socket ID:", socket.id
"\n"+action, /*ip, */"(" + peers + " connections)",
/*"\n\t User Agent:", userAgent, */"\n\t buddy ID:", buddy.id
);
}
socket.on("connection", async socket => {
peers++;
logAction("connected", socket);
const myStream = new Stream.PassThrough();
myStream.setEncoding('utf8');
packetStream.pipe(myStream); // packet stream anzapfen
myStream.on('readable', () => {
// console.log("\n",socket.id + " listened")
let rawData = myStream.read();
if (!rawData) throw new Error("rawData was " + rawData);
let data;
try {
data = JSON.parse(rawData);
} catch (e) {
if (rawData) {
console.error(
"couldn't parse JSON of length ", rawData.length,
"\t`"+getSummary(rawData)+"`"
);
} else {
console.error("rawData was", rawData);
}
console.error(e)
console.log(rawData);
throw e;
return;
}
// send the packet to the visualization through socket.io
socket.emit("data", {
frame: data.frame,
clusters: data.clusters,
id: frameNo, // debug
timestamps: {
sent: +new Date(),
birth: data.birth,
},
const id = "lidar";
const peer = new Peer(id);
// someone tries to connect to you
peer.on('connection', async clientPeer => {
const buddy = clientPeer.peer;
console.log(`connected to ${clientPeer.peer}`)
clientPeer.on('open', async () => {
console.log("now able to send data to " + buddy)
clientPeer.serialization = "json"; // because we don't support anything else yet
peers++;
logAction("connected", buddy);
// packet stream anzapfen
const myStream = new Stream.PassThrough();
myStream.setEncoding('utf8');
packetStream.pipe(myStream);
myStream.on('readable', () => {
// console.log("\n",socket.id + " listened")
let rawData = myStream.read();
if (!rawData) throw new Error("rawData was " + rawData);
let data;
try {
data = JSON.parse(rawData);
} catch (e) {
if (rawData) {
console.error(
"couldn't parse JSON of length ", rawData.length,
"\t`"+getSummary(rawData)+"`"
);
} else {
console.error("rawData was", rawData);
}
console.error(e)
console.log(rawData);
throw e;
return;
}
// send the packet to the visualization through socket.io
clientPeer.send({
frame: data.frame,
clusters: data.clusters,
id: frameNo, // debug
timestamps: {
sent: +new Date(),
birth: data.birth,
},
});
});
});
socket.on('disconnect', () => {
clientPeer.on("close", () => {
peers--;
logAction("disconnected", socket);
logAction("closed", clientPeer);
});
});
});
This diff is collapsed.
......@@ -6,7 +6,7 @@
"description": "Dummy backend to send socketio data",
"dependencies": {
"express": "^4.16.4",
"socket.io": "^2.2.0",
"socket.io-p2p-server": "^1.2.0"
"peer": "^0.2.10",
"peerjs-nodejs": "^1.1.3"
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment