the stream receiver can now be passed a threadsafe priorityQueue that it will add the packets to as they are received (note the priority queue passed should be initialized with a comparitor for "StreamPacket"s) #story[817]

This commit is contained in:
Peter Galloway
2017-04-24 18:29:50 +12:00
parent c1e4a6156c
commit f078c34bf9
3 changed files with 48 additions and 22 deletions
@@ -22,4 +22,8 @@ public class StreamPacket {
StreamParser.extractBoatLocation(payload); StreamParser.extractBoatLocation(payload);
} }
} }
public long getTimeStamp() {
return timeStamp;
}
} }
@@ -53,10 +53,10 @@ public class StreamParser {
byte[] boatIdBytes = Arrays.copyOfRange(payload,8,12); byte[] boatIdBytes = Arrays.copyOfRange(payload,8,12);
extractTimeStamp(Arrays.copyOfRange(payload,1,7)); extractTimeStamp(Arrays.copyOfRange(payload,1,7));
// int boatSeq = ByteBuffer.wrap(seqBytes).getInt(); // int boatSeq = ByteBuffer.wrap(seqBytes).getInt();
long seq = StreamReceiver.bytesToLong(seqBytes); long seq = bytesToLong(seqBytes);
long boatId = StreamReceiver.bytesToLong(boatIdBytes); long boatId = bytesToLong(boatIdBytes);
long lat = StreamReceiver.bytesToLong(latBytes); long lat = bytesToLong(latBytes);
long lon = StreamReceiver.bytesToLong(lonBytes); long lon = bytesToLong(lonBytes);
if (!ids.contains(boatId)) { if (!ids.contains(boatId)) {
ids.add(boatId); ids.add(boatId);
} }
@@ -81,6 +81,23 @@ public class StreamParser {
System.out.println("timeStamp = " + timeStamp); System.out.println("timeStamp = " + timeStamp);
} }
/**
* takes an array of up to 7 bytes and returns a positive
* long constructed from the input bytes
*
* @return a positive long if there is less than 7 bytes -1 otherwise
*/
private static long bytesToLong(byte[] bytes){
long partialLong = 0;
int index = 0;
for (byte b: bytes){
if (index > 6){
return -1;
}
partialLong = partialLong | (b & 0xFFL) << (index * 8);
index++;
}
return partialLong;
}
} }
@@ -47,22 +47,20 @@ public class StreamReceiver {
//checking if it is the start of the packet //checking if it is the start of the packet
if(sync1 == 0x47 && sync2 == 0x83) { if(sync1 == 0x47 && sync2 == 0x83) {
int type = readByte(); int type = readByte();
System.out.println("message type: " + type);
long timeStamp = bytesToLong(getBytes(6)); long timeStamp = bytesToLong(getBytes(6));
skipBytes(4); skipBytes(4);
long payloadLength = bytesToLong(getBytes(2)); long payloadLength = bytesToLong(getBytes(2));
//No. of milliseconds since Jan 1st 1970 //No. of milliseconds since Jan 1st 1970
System.out.println("timeStamp = " + timeStamp); byte[] payload = getBytes((int) payloadLength);
System.out.println("payload length: " + payloadLength);
Checksum checksum = new CRC32(); Checksum checksum = new CRC32();
checksum.update(crcBuffer.toByteArray(), 0, crcBuffer.size()); checksum.update(crcBuffer.toByteArray(), 0, crcBuffer.size());
// System.out.println(checksum.getValue()); long computedCrc = checksum.getValue();
long crc = bytesToLong(getBytes(4)); long packetCrc = bytesToLong(getBytes(4));
// System.out.println(crc); if (computedCrc == packetCrc) {
if (checksum.getValue() == crc) { System.out.println("message type: " + type);
packetBuffer.add(new StreamPacket(type, payloadLength, timeStamp, getBytes((int) payloadLength))); System.out.println("timeStamp = " + timeStamp);
System.out.println("payload length: " + payloadLength);
packetBuffer.add(new StreamPacket(type, payloadLength, timeStamp, payload));
} else { } else {
System.err.println("Packet has been dropped"); System.err.println("Packet has been dropped");
} }
@@ -74,20 +72,21 @@ public class StreamReceiver {
} }
} }
private int readByte() { private int readByte() throws Exception {
int currentByte = -1; int currentByte = -1;
try { try {
currentByte = stream.read(); currentByte = stream.read();
crcBuffer.write(currentByte); crcBuffer.write(currentByte);
if (currentByte == -1){
}
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
if (currentByte == -1){
throw new Exception();
}
return currentByte; return currentByte;
} }
private byte[] getBytes(int n){ private byte[] getBytes(int n) throws Exception{
byte[] bytes = new byte[n]; byte[] bytes = new byte[n];
for (int i = 0; i < n; i++){ for (int i = 0; i < n; i++){
bytes[i] = (byte) readByte(); bytes[i] = (byte) readByte();
@@ -96,7 +95,7 @@ public class StreamReceiver {
} }
private void skipBytes(long n){ private void skipBytes(long n) throws Exception{
for (int i=0; i < n; i++){ for (int i=0; i < n; i++){
readByte(); readByte();
} }
@@ -123,7 +122,13 @@ public class StreamReceiver {
public static void main(String[] args) { public static void main(String[] args) {
// StreamReceiver sr = new StreamReceiver("livedata.americascup.com", 4941, pq); PriorityBlockingQueue<StreamPacket> pq = new PriorityBlockingQueue<>(256, new Comparator<StreamPacket>() {
// sr.connect(); @Override
public int compare(StreamPacket s1, StreamPacket s2) {
return (int) (s1.getTimeStamp() - s2.getTimeStamp());
}
});
StreamReceiver sr = new StreamReceiver("livedata.americascup.com", 4941, pq);
sr.connect();
} }
} }