Skip to content
Snippets Groups Projects
Commit 550fab1f authored by Matthias's avatar Matthias
Browse files

fixed broken buffering, can now handle multiple packets in a buffer

parent 92877d65
No related merge requests found
......@@ -7,30 +7,56 @@ const socket = require('socket.io')(http);
const fs = require('fs');
const fifoPath = '/tmp/nodeFIFO';
// TOOLS
const sleep = time => new Promise(res => setTimeout(() => res(), time));
// try {
// } catch (e) {
// throw new Error("fifo not readable")
// }
/** returns first and last chars of a string */
function getSummary(s) {
if (s.length <= 100) return s;
return s.substring(0, 50) + "" + s.substring(s.length - 50, s.length);
}
// unecessary webserver
app.get('/', (req, res) => {
res.send('<h1>okcool but you gotta go for port 3000 though</h1>');
});
http.listen(3000, () => {
console.log('listening on *:3000');
});
const fifo = fs.createReadStream(fifoPath);
fifo.setEncoding('utf8');
// named pipe anzapfen
let fifo;
try {
fifo = fs.createReadStream(fifoPath);
} catch (e) {
console.error("fifo not readable")
throw e;
}
fifo.setEncoding("utf8");
console.log("started listener...");
// stream providing complete data packets
const dataStream = new Stream.Readable({
// stream providing complete data packets, not the bits and pieces returned from
// the fifo
const packetStream = new Stream.Readable({
read() {}
});
function getSummary(s) {
if (s.length <= 100) return s;
return s.substring(0, 50) + "" + s.substring(s.length - 50, s.length);
}
let buffer = "";
let i = 0;
let buffer = ""; // buffers incoming data
let frameNo = 0; // debug
const packetDelimiter = "EOT"; // string that separates packets
fifo.on("data", msg => {
// console.log("---")
......@@ -39,69 +65,54 @@ fifo.on("data", msg => {
// console.log("summary msg `" + getSummary(msg) + "`")
buffer += msg;
// console.log("summary buffer `" + getSummary(buffer) + "`")
const EOTindex = buffer.indexOf("EOT"); // end of packet
if (EOTindex === -1) {
// console.log("buffer length " + buffer.length + ", still waiting for EOT.");
return;
} else { // need to get more msg
// console.log("filled buffer " + buffer.length)
}
buffer = buffer.substring(0, EOTindex);
let data;
try {
data = buffer.replace(/\0/g, '');
// console.log(
// "got data", "\n\t`" + getSummary(buffer) +
// "`\n\t" + data.frame.length + " datapoints" +
// "\n\ttook "+ (+new Date() - data.birth) + "ms"
// );
i++;
i %= 1000;
// console.log("pushing", getSummary(JSON.stringify(data)))
// give it to the clients
dataStream.push(data);
} catch (e) {
console.error("couldn't parse JSON: ", buffer.length,
"\t`"+getSummary(buffer)+"`");
console.error(e)
}
buffer = "";
// console.log("\n\n")
for (let delIndex = buffer.indexOf(packetDelimiter);
delIndex !== -1;
delIndex = buffer.indexOf(packetDelimiter)) {
});
const packet = buffer
.substring(0, delIndex)
.replace(/\0/g, ''); // dis shit null terminated boi
buffer = buffer.substring(delIndex + packetDelimiter.length, buffer.length);
app.get('/', (req, res) => {
res.send('<h1>Hello world</h1>');
});
packetStream.push(packet);
http.listen(3000, () => {
console.log('listening on *:3000');
frameNo++; // debug
}
});
let peers = 0;
socket.on("connection", async socket => {
peers++;
const ip = socket.request.connection.remoteAddress;
const userAgent = socket.request.headers['user-agent'];
console.log("\nconnected", ip,"("+peers+" connections)","\n\t User Agent:",userAgent,"\n\t Socket ID:", socket.id)
console.log(
"\nconnected", ip, "(" + peers + " connections)",
"\n\t User Agent:", userAgent, "\n\t Socket ID:", socket.id
);
const myStream = new Stream.PassThrough();
myStream.setEncoding('utf8');
dataStream.pipe(myStream);
packetStream.pipe(myStream); // packet stream anzapfen
const listener = () => {
myStream.on('readable', () => {
// console.log("\n",socket.id + " listened")
let rawData, data;
let rawData = myStream.read();
if (!rawData) throw new Error("rawData was " + rawData);
let data;
try {
rawData = myStream.read();
data = JSON.parse(rawData);
} catch (e) {
if (rawData) {
console.error("couldn't parse JSON of length ", rawData.length,
"\t`"+getSummary(rawData)+"`");
console.error(
"couldn't parse JSON of length ", rawData.length,
"\t`"+getSummary(rawData)+"`"
);
} else {
console.error("rawData was", rawData);
}
......@@ -109,26 +120,27 @@ socket.on("connection", async socket => {
return;
}
if (!data) throw new Error("data was null", data);
if (!data) throw new Error("data was " + data);
// console.debug("sending data to", socket.id, "("+i+")");
// console.debug("sending data to", socket.id, "("+frameNo+")");
// console.log("data type:", Object.keys(data))
// const frame = new Uint8Array(12976);
// data.frame.forEach((el, i) => frame[i] = el);
// data.frame.forEach((el, frameNo) => frame[frameNo] = el);
// console.log("sent")
// send the packet to the visualization through socket.io
socket.emit("data", {
frame: data.frame,
id: i, // debug
id: frameNo, // debug
timestamps: {
sent: +new Date(),
birth: data.birth,
},
});
}
});
myStream.on('readable', listener);
socket.on('disconnect', function(){
socket.on('disconnect', () => {
peers--;
const ip = socket.request.connection.remoteAddress;
const userAgent = socket.request.headers['user-agent'];
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment