diff --git a/src/main/java/seng302/models/parsers/StreamPacket.java b/src/main/java/seng302/models/parsers/StreamPacket.java index 4afd51f5..1392f4d4 100644 --- a/src/main/java/seng302/models/parsers/StreamPacket.java +++ b/src/main/java/seng302/models/parsers/StreamPacket.java @@ -22,4 +22,8 @@ public class StreamPacket { StreamParser.extractBoatLocation(payload); } } + + public long getTimeStamp() { + return timeStamp; + } } diff --git a/src/main/java/seng302/models/parsers/StreamParser.java b/src/main/java/seng302/models/parsers/StreamParser.java index 44321c9a..6dbe2d86 100644 --- a/src/main/java/seng302/models/parsers/StreamParser.java +++ b/src/main/java/seng302/models/parsers/StreamParser.java @@ -53,10 +53,10 @@ public class StreamParser { byte[] boatIdBytes = Arrays.copyOfRange(payload,8,12); extractTimeStamp(Arrays.copyOfRange(payload,1,7)); // int boatSeq = ByteBuffer.wrap(seqBytes).getInt(); - long seq = StreamReceiver.bytesToLong(seqBytes); - long boatId = StreamReceiver.bytesToLong(boatIdBytes); - long lat = StreamReceiver.bytesToLong(latBytes); - long lon = StreamReceiver.bytesToLong(lonBytes); + long seq = bytesToLong(seqBytes); + long boatId = bytesToLong(boatIdBytes); + long lat = bytesToLong(latBytes); + long lon = bytesToLong(lonBytes); if (!ids.contains(boatId)) { ids.add(boatId); } @@ -81,6 +81,23 @@ public class StreamParser { System.out.println("timeStamp = " + timeStamp); } - + /** + * takes an array of up to 7 bytes and returns a positive + * long constructed from the input bytes + * + * @return a positive long if there is less than 7 bytes -1 otherwise + */ + private static long bytesToLong(byte[] bytes){ + long partialLong = 0; + int index = 0; + for (byte b: bytes){ + if (index > 6){ + return -1; + } + partialLong = partialLong | (b & 0xFFL) << (index * 8); + index++; + } + return partialLong; + } } diff --git a/src/main/java/seng302/models/parsers/StreamReceiver.java b/src/main/java/seng302/models/parsers/StreamReceiver.java index bda89dea..1c44337f 100644 --- a/src/main/java/seng302/models/parsers/StreamReceiver.java +++ b/src/main/java/seng302/models/parsers/StreamReceiver.java @@ -47,22 +47,20 @@ public class StreamReceiver { //checking if it is the start of the packet if(sync1 == 0x47 && sync2 == 0x83) { int type = readByte(); - System.out.println("message type: " + type); long timeStamp = bytesToLong(getBytes(6)); skipBytes(4); long payloadLength = bytesToLong(getBytes(2)); //No. of milliseconds since Jan 1st 1970 - System.out.println("timeStamp = " + timeStamp); - System.out.println("payload length: " + payloadLength); - - + byte[] payload = getBytes((int) payloadLength); Checksum checksum = new CRC32(); checksum.update(crcBuffer.toByteArray(), 0, crcBuffer.size()); -// System.out.println(checksum.getValue()); - long crc = bytesToLong(getBytes(4)); -// System.out.println(crc); - if (checksum.getValue() == crc) { - packetBuffer.add(new StreamPacket(type, payloadLength, timeStamp, getBytes((int) payloadLength))); + long computedCrc = checksum.getValue(); + long packetCrc = bytesToLong(getBytes(4)); + if (computedCrc == packetCrc) { + System.out.println("message type: " + type); + System.out.println("timeStamp = " + timeStamp); + System.out.println("payload length: " + payloadLength); + packetBuffer.add(new StreamPacket(type, payloadLength, timeStamp, payload)); } else { System.err.println("Packet has been dropped"); } @@ -74,20 +72,21 @@ public class StreamReceiver { } } - private int readByte() { + private int readByte() throws Exception { int currentByte = -1; try { currentByte = stream.read(); crcBuffer.write(currentByte); - if (currentByte == -1){ - } } catch (IOException e) { e.printStackTrace(); } + if (currentByte == -1){ + throw new Exception(); + } return currentByte; } - private byte[] getBytes(int n){ + private byte[] getBytes(int n) throws Exception{ byte[] bytes = new byte[n]; for (int i = 0; i < n; i++){ bytes[i] = (byte) readByte(); @@ -96,7 +95,7 @@ public class StreamReceiver { } - private void skipBytes(long n){ + private void skipBytes(long n) throws Exception{ for (int i=0; i < n; i++){ readByte(); } @@ -123,7 +122,13 @@ public class StreamReceiver { public static void main(String[] args) { -// StreamReceiver sr = new StreamReceiver("livedata.americascup.com", 4941, pq); -// sr.connect(); + PriorityBlockingQueue pq = new PriorityBlockingQueue<>(256, new Comparator() { + @Override + public int compare(StreamPacket s1, StreamPacket s2) { + return (int) (s1.getTimeStamp() - s2.getTimeStamp()); + } + }); + StreamReceiver sr = new StreamReceiver("livedata.americascup.com", 4941, pq); + sr.connect(); } }