Created AC35 Streaming server

- Sends heartbeat messages every 5 seconds
- Sends XML at beginning

Tags: #story[29]
This commit is contained in:
Michael Rausch
2017-04-19 19:05:19 +12:00
parent 34872a822b
commit edc306da22
12 changed files with 783 additions and 0 deletions
@@ -0,0 +1,71 @@
package seng302.server.messages;
import java.nio.ByteBuffer;
public class Header {
// From API spec
private final int syncByte1 = 0x47;
private final int syncByte2 = 0x83;
private MessageType messageType;
private int timeStamp;
private int sourceId;
private short messageLength;
private static final int MESSAGE_LEN = 15;
/**
* Message Header from section 3.2 of the AC35 Streaming
* Data spec
* @param messageType The type of the message following this header
* @param sourceId The message source (as defined in the spec)
* @param messageLength The length of the message following this header
*/
public Header(MessageType messageType, int sourceId, Short messageLength){
this.messageType = messageType;
this.sourceId = sourceId;
this.messageLength = messageLength;
timeStamp = (int) (System.currentTimeMillis() / 1000L);
}
/**
* @return a ByteBuffer containing the message header
*/
public ByteBuffer getByteBuffer(){
ByteBuffer buff = ByteBuffer.allocate(15);
// Sync Byte 1, 1 byte
buff.put(ByteBuffer.allocate(1).put((byte)syncByte1).array());
buff.position(1);
// Sync Byte 2, 1 byte
buff.put(ByteBuffer.allocate(1).put((byte)syncByte2).array());
buff.position(2);
// Message Type, 1 byte
buff.put(ByteBuffer.allocate(1).put((byte)messageType.getCode()).array());
buff.position(3);
// Timestamp, 6 bytes
int x = ((int) Integer.toUnsignedLong(6));
buff.put(ByteBuffer.allocate(6).putInt(timeStamp).array());
buff.position(9);
// Source ID, 4 bytes
buff.put(ByteBuffer.allocate(4).putInt(sourceId).array());
buff.position(13);
// Message Length, 2 bytes
buff.put(ByteBuffer.allocate(2).putShort(messageLength).array());
buff.position(15);
return buff;
}
/**
* Returns the size of this message
* @return the size of the message
*/
public static Integer getSize(){
return MESSAGE_LEN;
}
}
@@ -0,0 +1,51 @@
package seng302.server.messages;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
public class Heartbeat extends Message {
private final int MESSAGE_SIZE = 4;
private int seqNo;
/**
* Heartbeat from the AC35 Streaming data spec
* @param seqNo Increment every time a message is sent
*/
public Heartbeat(int seqNo){
this.seqNo = seqNo;
}
@Override
public int getSize() {
return MESSAGE_SIZE;
}
@Override
public void send(DataOutputStream outputStream) {
setHeader(new Header(MessageType.HEARTBEAT, 0x01, (short) getSize()));
ByteBuffer buff = ByteBuffer.allocate(Header.getSize() + getSize() + getSize());
// Write header
buff.put(getHeader().getByteBuffer());
buff.position(Header.getSize());
// Write seq num
buff.put(ByteBuffer.allocate(4).putInt(seqNo).array());
buff.position(Header.getSize()+4);
// Write CRC
CRC32 crc = new CRC32();
crc.update(buff.array());
buff.put(ByteBuffer.allocate(4).putInt((short)crc.getValue()).array());
try {
outputStream.write(buff.array());
} catch (IOException e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,31 @@
package seng302.server.messages;
import java.io.DataOutputStream;
public abstract class Message {
Header header;
/**
* @param header Set the header for this message
*/
public void setHeader(Header header){
this.header = header;
}
/**
* @return the header specified for this message
*/
public Header getHeader(){
return header;
}
/**
* @return the size of the message
*/
public abstract int getSize();
/**
* Send the message as through the outputStream
*/
public abstract void send(DataOutputStream outputStream);
}
@@ -0,0 +1,34 @@
package seng302.server.messages;
/**
* Enum containing the types of messages
* sent by the server
*/
public enum MessageType {
HEARTBEAT(1),
RACE_STATUS(12),
DISPLAY_TEXT_MESSAGE(20),
XML_MESSAGE(26),
RACE_START_STATUS(27),
YACHT_EVENT_CODE(29),
YACHT_ACTION_CODE(31),
CHATTER_TEXT(36),
BOAT_LOCATION(37),
MARK_ROUNDING(38),
COURSE_WIND(44),
AVERAGE_WIND(47);
private int code;
MessageType(int code){
this.code = code;
}
/**
* Get the message code (From the API Spec)
* @return the message code
*/
int getCode(){
return this.code;
}
}
@@ -0,0 +1,96 @@
package seng302.server.messages;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
public class XMLMessage extends Message{
private final MessageType MESSAGE_TYPE = MessageType.XML_MESSAGE;
private final int MESSAGE_VERSION = 1; //Always set to 1
private final int MESSAGE_SIZE = 14;
// Message fields
private int timeStamp;
private short ack = 0x00; //Unused
private XMLMessageSubType xmlMessageSubType;
private Short length;
private Short sequence;
private String content;
private CRC32 crc;
/**
* XML Message from the AC35 Streaming data spec
* @param content The XML content
* @param type The XML Message Sub Type
*/
public XMLMessage(String content, XMLMessageSubType type, short sequenceNum){
this.content = content;
this.xmlMessageSubType = type;
crc = new CRC32();
timeStamp = (int) (System.currentTimeMillis() / 1000L);
ack = 0;
length = (short) this.content.length();
sequence = sequenceNum;
setHeader(new Header(MESSAGE_TYPE, 0x01, (short) getSize()));
}
/**
* @return The length of this message
*/
public int getSize(){
return MESSAGE_SIZE + content.length();
}
/**
* Send this message as a stream of bytes
* @param outputStream The output stream to send the message
*/
public void send(DataOutputStream outputStream) {
ByteBuffer buff = ByteBuffer.allocate(Header.getSize() + getSize() + 4);
buff.put(getHeader().getByteBuffer());
buff.position(Header.getSize());
// Version Number, 1 byte
buff.put(ByteBuffer.allocate(1).put((byte)MESSAGE_VERSION).array());
buff.position(Header.getSize() + 1);
// Ack, 2 bytes
buff.put(ByteBuffer.allocate(2).putShort(ack).array());
buff.position(Header.getSize() + 3);
// Timestamp, 6 bytes
buff.put(ByteBuffer.allocate(6).putInt(timeStamp).array());
buff.position(Header.getSize() + 9);
// XML message sub type, 1 byte
buff.put(ByteBuffer.allocate(1).put((byte)xmlMessageSubType.getType()).array());
buff.position(Header.getSize() + 10);
// Seq num, 2 bytes
buff.put(ByteBuffer.allocate(2).putShort(sequence).array());
buff.position(Header.getSize() + 12);
// Message length, 2 bytes
buff.put(ByteBuffer.allocate(2).putShort(length).array());
buff.position(Header.getSize() + 14);
// XML Content
buff.put(this.content.getBytes());
buff.position(Header.getSize() + 14 + this.content.getBytes().length);
// calculate CRC
crc.update(buff.array());
// Add CRC to message
buff.put(ByteBuffer.allocate(4).putInt((short)crc.getValue()).array());
// Send
try {
outputStream.write(buff.array());
} catch (IOException e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,20 @@
package seng302.server.messages;
/**
* Enum containing the types of XML messages
*/
public enum XMLMessageSubType {
REGATTA(5),
RACE(6),
BOAT(7);
private int type;
XMLMessageSubType(int type){
this.type = type;
}
public int getType(){
return this.type;
}
}