Created separate streams for each different data type to be parsed into so the exact needed data would be able to be extracted.

#story[820]
This commit is contained in:
Kusal Ekanayake
2017-04-25 15:32:04 +12:00
parent f078c34bf9
commit d51825ffb7
3 changed files with 105 additions and 41 deletions
@@ -18,11 +18,24 @@ public class StreamPacket {
this.timeStamp = timeStamp; this.timeStamp = timeStamp;
this.payload = payload; this.payload = payload;
// System.out.println("type = " + type); // System.out.println("type = " + type);
if (this.type == PacketType.BOAT_LOCATION){ if (this.type == PacketType.OTHER){
StreamParser.extractBoatLocation(payload); System.out.println("type = " + type);
// StreamParser.extractBoatLocation(payload);
} }
} }
public PacketType getType() {
return type;
}
public long getMessageLength() {
return messageLength;
}
public byte[] getPayload() {
return payload;
}
public long getTimeStamp() { public long getTimeStamp() {
return timeStamp; return timeStamp;
} }
@@ -19,62 +19,112 @@ import java.util.Map;
*/ */
public class StreamParser { public class StreamParser {
private static boolean isWithinTag; static void parseLine(StreamPacket packet) {
public static ArrayList<Long> ids = new ArrayList<>(); switch (packet.getType()){
case HEARTBEAT:
extractHeartBeat(packet);
case RACE_STATUS:
extractRaceStatus(packet);
case DISPLAY_TEXT_MESSAGE:
extractDisplayMessage(packet);
case XML_MESSAGE:
extractXmlMessage(packet);
case RACE_START_STATUS:
extractRaceStartStatus(packet);
case YACHT_EVENT_CODE:
extractYachtEventCode(packet);
case YACHT_ACTION_CODE:
extractYachtActionCode(packet);
case CHATTER_TEXT:
extractChatterText(packet);
case BOAT_LOCATION:
extractBoatLocation(packet);
case MARK_ROUNDING:
extractMarkRounding(packet);
case COURSE_WIND:
extractCourseWind(packet);
case AVG_WIND:
extractAvgWind(packet);
}
static void parseLine(byte[] bytes) {
//TODO overhaul all of this to treat packets as appropriate
String line = new String(bytes);
if (line.startsWith("<")){
isWithinTag = true;
}
// System.out.println("line = ---------------------------------------------\n" + line);
if (isWithinTag) {
// try {
// Element node = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(new ByteArrayInputStream(line.getBytes())).getDocumentElement();
// if (node.getAttributes().getNamedItem("Type") != null) {
// System.out.println(node.getAttributes().getNamedItem("Type") );
// System.out.println(line);
// }
// } catch (Throwable e){
//// e.printStackTrace();
// }
}
if (line.startsWith("</")) {
isWithinTag = false;
}
} }
static void extractBoatLocation(byte[] payload){ private static void extractHeartBeat(StreamPacket packet){
}
private static void extractRaceStatus(StreamPacket packet){
}
private static void extractDisplayMessage(StreamPacket packet){
}
private static void extractXmlMessage(StreamPacket packet){
}
private static void extractRaceStartStatus(StreamPacket packet){
}
private static void extractYachtEventCode(StreamPacket packet){
}
private static void extractYachtActionCode(StreamPacket packet){
}
private static void extractChatterText(StreamPacket packet){
}
static void extractBoatLocation(StreamPacket packet){
byte[] payload = packet.getPayload();
byte deviceType = payload[15]; byte deviceType = payload[15];
byte[] seqBytes = Arrays.copyOfRange(payload,11,15); byte[] seqBytes = Arrays.copyOfRange(payload,11,15);
byte[] latBytes = Arrays.copyOfRange(payload,16,20); byte[] latBytes = Arrays.copyOfRange(payload,16,20);
byte[] lonBytes = Arrays.copyOfRange(payload,20,24); byte[] lonBytes = Arrays.copyOfRange(payload,20,24);
byte[] boatIdBytes = Arrays.copyOfRange(payload,8,12); byte[] boatIdBytes = Arrays.copyOfRange(payload,8,12);
extractTimeStamp(Arrays.copyOfRange(payload,1,7)); extractTimeStamp(Arrays.copyOfRange(payload,1,7), 6);
// int boatSeq = ByteBuffer.wrap(seqBytes).getInt(); // int boatSeq = ByteBuffer.wrap(seqBytes).getInt();
long seq = bytesToLong(seqBytes); long seq = bytesToLong(seqBytes);
long boatId = bytesToLong(boatIdBytes); long boatId = bytesToLong(boatIdBytes);
long lat = bytesToLong(latBytes); long lat = bytesToLong(latBytes);
long lon = bytesToLong(lonBytes); long lon = bytesToLong(lonBytes);
if (!ids.contains(boatId)) {
ids.add(boatId);
}
if (boatId != 0){ if (boatId != 0){
System.out.println("boatId = " + boatId); // System.out.println("boatId = " + boatId);
System.out.println("deviceType = " + (long)deviceType); // System.out.println("deviceType = " + (long)deviceType);
// System.out.println("seq = " + seq); // System.out.println("seq = " + seq);
//needs to be validated //needs to be validated
System.out.println("lon = " + ((180d * (double)lon)/Math.pow(2,31))); // System.out.println("lon = " + ((180d * (double)lon)/Math.pow(2,31)));
System.out.println("lat = " + ((180d *(double)lat)/Math.pow(2,31))); // System.out.println("lat = " + ((180d *(double)lat)/Math.pow(2,31)));
} }
} }
private static void extractTimeStamp(byte[] timeStampBytes){
private static void extractMarkRounding(StreamPacket packet){
}
private static void extractCourseWind(StreamPacket packet){
}
private static void extractAvgWind(StreamPacket packet){
}
private static void extractTimeStamp(byte[] timeStampBytes, int noOfBytes){
long timeStamp = 0; long timeStamp = 0;
long multiplier=1; long multiplier=1;
for(int i = 0;i < 6;i++) { for(int i = 0;i < noOfBytes;i++) {
timeStamp += timeStampBytes[i]*multiplier; timeStamp += timeStampBytes[i]*multiplier;
multiplier *= 256; multiplier *= 256;
} }
@@ -47,19 +47,19 @@ public class StreamReceiver {
//checking if it is the start of the packet //checking if it is the start of the packet
if(sync1 == 0x47 && sync2 == 0x83) { if(sync1 == 0x47 && sync2 == 0x83) {
int type = readByte(); int type = readByte();
//No. of milliseconds since Jan 1st 1970
long timeStamp = bytesToLong(getBytes(6)); long timeStamp = bytesToLong(getBytes(6));
skipBytes(4); skipBytes(4);
long payloadLength = bytesToLong(getBytes(2)); long payloadLength = bytesToLong(getBytes(2));
//No. of milliseconds since Jan 1st 1970
byte[] payload = getBytes((int) payloadLength); byte[] payload = getBytes((int) payloadLength);
Checksum checksum = new CRC32(); Checksum checksum = new CRC32();
checksum.update(crcBuffer.toByteArray(), 0, crcBuffer.size()); checksum.update(crcBuffer.toByteArray(), 0, crcBuffer.size());
long computedCrc = checksum.getValue(); long computedCrc = checksum.getValue();
long packetCrc = bytesToLong(getBytes(4)); long packetCrc = bytesToLong(getBytes(4));
if (computedCrc == packetCrc) { if (computedCrc == packetCrc) {
System.out.println("message type: " + type); // System.out.println("message type: " + type);
System.out.println("timeStamp = " + timeStamp); // System.out.println("timeStamp = " + timeStamp);
System.out.println("payload length: " + payloadLength); // System.out.println("payload length: " + payloadLength);
packetBuffer.add(new StreamPacket(type, payloadLength, timeStamp, payload)); packetBuffer.add(new StreamPacket(type, payloadLength, timeStamp, payload));
} else { } else {
System.err.println("Packet has been dropped"); System.err.println("Packet has been dropped");
@@ -128,7 +128,8 @@ public class StreamReceiver {
return (int) (s1.getTimeStamp() - s2.getTimeStamp()); return (int) (s1.getTimeStamp() - s2.getTimeStamp());
} }
}); });
StreamReceiver sr = new StreamReceiver("livedata.americascup.com", 4941, pq); StreamReceiver sr = new StreamReceiver("csse-s302staff.canterbury.ac.nz", 4941, pq);
// StreamReceiver sr = new StreamReceiver("livedata.americascup.com", 4941, pq);
sr.connect(); sr.connect();
} }
} }