From 3dc1a7f9c0310bb7011c9f232472bf74740d7523 Mon Sep 17 00:00:00 2001 From: Kusal Ekanayake Date: Sun, 23 Apr 2017 20:14:41 +1200 Subject: [PATCH] StreamPacket class created so that we can store all packets generically. The timestamp has also been extracted and stored with the packet so that in the future we may turn the current ArrayList into a priority que. #story[817] --- .../seng302/models/parsers/StreamPacket.java | 21 +++++++++ .../seng302/models/parsers/StreamParser.java | 44 +++++++++++++++++++ .../models/parsers/StreamReceiver.java | 41 +++++++++++------ 3 files changed, 93 insertions(+), 13 deletions(-) create mode 100644 src/main/java/seng302/models/parsers/StreamPacket.java create mode 100644 src/main/java/seng302/models/parsers/StreamParser.java diff --git a/src/main/java/seng302/models/parsers/StreamPacket.java b/src/main/java/seng302/models/parsers/StreamPacket.java new file mode 100644 index 00000000..e71e5897 --- /dev/null +++ b/src/main/java/seng302/models/parsers/StreamPacket.java @@ -0,0 +1,21 @@ +package seng302.models.parsers; + +/** + * Created by kre39 on 23/04/17. + */ +public class StreamPacket { + + //Change int to an ENUM for the type + private int type; + + private long messageLength; + private long timeStamp; + private byte[] payload; + + public StreamPacket(int type, long messageLength, long timeStamp, byte[] payload) { + this.type = type; + this.messageLength = messageLength; + this.timeStamp = timeStamp; + this.payload = payload; + } +} diff --git a/src/main/java/seng302/models/parsers/StreamParser.java b/src/main/java/seng302/models/parsers/StreamParser.java new file mode 100644 index 00000000..aeba5b1b --- /dev/null +++ b/src/main/java/seng302/models/parsers/StreamParser.java @@ -0,0 +1,44 @@ +package seng302.models.parsers; + + +import org.w3c.dom.Element; +import org.xml.sax.SAXException; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.ByteArrayInputStream; +import java.io.IOException; + +/** + * Created by kre39 on 23/04/17. + */ +public class StreamParser { + + private static boolean isWithinTag; + + + static void parseLine(byte[] bytes) { + //TODO overhaul all of this to treat packets as appropriate + String line = new String(bytes); + if (line.startsWith("<")){ + isWithinTag = true; + } +// System.out.println("line = ---------------------------------------------\n" + line); + if (isWithinTag) { +// try { +// Element node = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(new ByteArrayInputStream(line.getBytes())).getDocumentElement(); +// if (node.getAttributes().getNamedItem("Type") != null) { +// System.out.println(node.getAttributes().getNamedItem("Type") ); +// System.out.println(line); +// } +// } catch (Throwable e){ +//// e.printStackTrace(); +// } + } + if (line.startsWith(" priorityQue = new ArrayList<>(); private static void skipBytes(long n){ for (int i=0; i < n; i++){ @@ -37,14 +42,13 @@ public class InputStreamParser { private static void runTest() { Socket host = null; - String hostAddress = "livedata.americascup.com"; +// String hostAddress = "livedata.americascup.com"; + String hostAddress = "csse-s302staff.canterbury.ac.nz"; int hostPort = 4941; try { host = new Socket(hostAddress, hostPort); - if (host != null) { - stream = host.getInputStream(); - } + stream = host.getInputStream(); } catch (IOException e) { e.printStackTrace(); } @@ -55,12 +59,15 @@ public class InputStreamParser { while(reading) { buffer = new ByteArrayOutputStream(); sync1 = readByte(); - System.out.println("sync1 = " + Integer.toBinaryString(sync1)); +// System.out.println("sync1 = " + Integer.toBinaryString(sync1)); sync2 = readByte(); //checking if it is the start of the packet if(sync1 == 0x47 && sync2 == 0x83) { - System.out.println("message type: " + readByte()); - skipBytes(10); + int type = readByte(); +// System.out.println("message type: " + type); + byte[] timeStampBytes = getBytes(6); + skipBytes(4); + // byte[] b = new byte[2]; // try { // stream.read(b); @@ -68,15 +75,23 @@ public class InputStreamParser { // e.printStackTrace(); // } // System.out.println("b = " + Integer.toBinaryString(b[0])); +// System.out.println(timeStamp); + long timeStamp = 0; + long multiplier=1; + for(int i = 0;i < 6;i++) { + timeStamp += timeStampBytes[i]*multiplier; + multiplier *= 256; + } long payloadLength = bytesToLong(getBytes(2)); - System.out.println("payload length: " + payloadLength); - skipBytes(payloadLength); - + //No. of milliseconds since Jan 1st 1970 + System.out.println("timeStamp = " + timeStamp); +// System.out.println("payload length: " + payloadLength); + priorityQue.add(new StreamPacket(type, payloadLength, timeStamp, getBytes((int)payloadLength))); Checksum checksum = new CRC32(); checksum.update(buffer.toByteArray(), 0, buffer.size()); - System.out.println(checksum.getValue()); +// System.out.println(checksum.getValue()); long crc = bytesToLong(getBytes(4)); - System.out.println(crc); +// System.out.println(crc); } }