re-engineered stream receiver to make it cleaner and ready to be used with the rest of the program. #story[817]

This commit is contained in:
Peter Galloway
2017-04-24 17:38:29 +12:00
parent 71e14259f6
commit c1e4a6156c
@@ -3,35 +3,83 @@ package seng302.models.parsers;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.math.BigInteger;
import java.net.Socket; import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.Checksum; import java.util.zip.Checksum;
public class StreamReceiver { public class StreamReceiver {
private InputStream stream;
private Socket host;
private ByteArrayOutputStream crcBuffer;
public PriorityBlockingQueue<StreamPacket> packetBuffer;
private static ByteArrayOutputStream buffer; public StreamReceiver(String hostAddress, int hostPort, PriorityBlockingQueue packetBuffer) {
private static InputStream stream = null; try {
private static boolean reading = true; host = new Socket(hostAddress, hostPort);
private static Collection<StreamPacket> priorityQue = new ArrayList<>(); } catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
this.packetBuffer = packetBuffer;
}
public void connect(){
try {
stream = host.getInputStream();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
int sync1;
int sync2;
boolean moreBytes = true;
while(moreBytes) {
try {
crcBuffer = new ByteArrayOutputStream();
sync1 = readByte();
sync2 = readByte();
//checking if it is the start of the packet
if(sync1 == 0x47 && sync2 == 0x83) {
int type = readByte();
System.out.println("message type: " + type);
long timeStamp = bytesToLong(getBytes(6));
skipBytes(4);
long payloadLength = bytesToLong(getBytes(2));
//No. of milliseconds since Jan 1st 1970
System.out.println("timeStamp = " + timeStamp);
System.out.println("payload length: " + payloadLength);
Checksum checksum = new CRC32();
checksum.update(crcBuffer.toByteArray(), 0, crcBuffer.size());
// System.out.println(checksum.getValue());
long crc = bytesToLong(getBytes(4));
// System.out.println(crc);
if (checksum.getValue() == crc) {
packetBuffer.add(new StreamPacket(type, payloadLength, timeStamp, getBytes((int) payloadLength)));
} else {
System.err.println("Packet has been dropped");
}
}
} catch (Exception e) {
moreBytes = false;
}
private static void skipBytes(long n){
for (int i=0; i < n; i++){
readByte();
} }
} }
private static int readByte() { private int readByte() {
int currentByte = -1; int currentByte = -1;
try { try {
currentByte = stream.read(); currentByte = stream.read();
buffer.write(currentByte); crcBuffer.write(currentByte);
if (currentByte == -1){ if (currentByte == -1){
reading = false;
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
@@ -39,95 +87,32 @@ public class StreamReceiver {
return currentByte; return currentByte;
} }
private static void runTest() { private byte[] getBytes(int n){
Socket host = null;
// String hostAddress = "livedata.americascup.com";
String hostAddress = "csse-s302staff.canterbury.ac.nz";
int hostPort = 4941;
try {
host = new Socket(hostAddress, hostPort);
stream = host.getInputStream();
} catch (IOException e) {
e.printStackTrace();
}
int sync1;
int sync2;
//currently "reading" will not break the program nicely (because there are multiple readBytes within the while loop)
while(reading) {
buffer = new ByteArrayOutputStream();
sync1 = readByte();
// System.out.println("sync1 = " + Integer.toBinaryString(sync1));
sync2 = readByte();
//checking if it is the start of the packet
if(sync1 == 0x47 && sync2 == 0x83) {
int type = readByte();
// System.out.println("message type: " + type);
byte[] timeStampBytes = getBytes(6);
skipBytes(4);
// byte[] b = new byte[2];
// try {
// stream.read(b);
// } catch (IOException e){
// e.printStackTrace();
// }
// System.out.println("b = " + Integer.toBinaryString(b[0]));
// System.out.println(timeStamp);
long timeStamp = 0;
long multiplier=1;
for(int i = 0;i < 6;i++) {
timeStamp += timeStampBytes[i]*multiplier;
multiplier *= 256;
}
long payloadLength = bytesToLong(getBytes(2));
//No. of milliseconds since Jan 1st 1970
// System.out.println("timeStamp = " + timeStamp);
// System.out.println("payload length: " + payloadLength);
priorityQue.add(new StreamPacket(type, payloadLength, timeStamp, getBytes((int)payloadLength)));
Checksum checksum = new CRC32();
checksum.update(buffer.toByteArray(), 0, buffer.size());
// System.out.println(checksum.getValue());
long crc = bytesToLong(getBytes(4));
// System.out.println(crc);
}
}
try {
if (host != null) {
host.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static byte[] getBytes(int n){
byte[] bytes = new byte[n]; byte[] bytes = new byte[n];
for (int i = 0; i < n; i++){ for (int i = 0; i < n; i++){
bytes[i] = (byte) readByte(); bytes[i] = (byte) readByte();
// System.out.println(Integer.toBinaryString(bytes[i]));
// System.out.println(bytes[i]);
} }
return bytes; return bytes;
} }
private void skipBytes(long n){
for (int i=0; i < n; i++){
readByte();
}
}
/** /**
* takes an array of up to 4 bytes and returns a positive * takes an array of up to 7 bytes and returns a positive
* long constructed from the input bytes * long constructed from the input bytes
* *
* (note it is assumed the bytes coming in need to be reversed like those from a stream) * @return a positive long if there is less than 7 bytes -1 otherwise
*
* @return a positive long if there is less than 4 bytes -1 otherwise
*/ */
static long bytesToLong(byte[] bytes){ private long bytesToLong(byte[] bytes){
long partialLong = 0; long partialLong = 0;
int index = 0; int index = 0;
for (byte b: bytes){ for (byte b: bytes){
if (index > 3){ if (index > 6){
return -1; return -1;
} }
partialLong = partialLong | (b & 0xFFL) << (index * 8); partialLong = partialLong | (b & 0xFFL) << (index * 8);
@@ -138,9 +123,7 @@ public class StreamReceiver {
public static void main(String[] args) { public static void main(String[] args) {
// StreamReceiver sr = new StreamReceiver("livedata.americascup.com", 4941, pq);
runTest(); // sr.connect();
} }
} }