WIP: Worked on new server thread class that would create and store multiple THREADS of connections. Researched Authorative server structures.

Fixed the current structure of the server to work with the old StreamReciever style and hook up to the Stream Parser

tags: #story[1047] pair[wmu16, mra106]
This commit is contained in:
William Muir
2017-07-14 17:09:33 +12:00
parent 5b908ec355
commit 77e7db79cc
8 changed files with 244 additions and 161 deletions
-2
View File
@@ -30,8 +30,6 @@ public class App extends Application {
System.exit(0); System.exit(0);
}); });
} }
public static void main(String[] args) { public static void main(String[] args) {
@@ -2,13 +2,13 @@ package seng302.controllers;
import javafx.fxml.FXML; import javafx.fxml.FXML;
import javafx.fxml.FXMLLoader; import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
import javafx.scene.control.TextField; import javafx.scene.control.TextField;
import javafx.scene.layout.AnchorPane; import javafx.scene.layout.AnchorPane;
import javafx.scene.layout.GridPane; import javafx.scene.layout.GridPane;
import javafx.scene.layout.Pane; import javafx.scene.layout.Pane;
import seng302.gameServer.GameServerThread; import seng302.gameServer.GameServerThread;
import seng302.gameServer.GameState; import seng302.gameServer.GameState;
import seng302.gameServerWithThreading.MainServerThread;
import seng302.models.stream.StreamReceiver; import seng302.models.stream.StreamReceiver;
import java.io.IOException; import java.io.IOException;
@@ -61,12 +61,10 @@ public class StartScreenController {
try { try {
String ipAddress = InetAddress.getLocalHost().getHostAddress(); String ipAddress = InetAddress.getLocalHost().getHostAddress();
new GameState(ipAddress); new GameState(ipAddress);
GameServerThread gameServerThread = new GameServerThread("Game Server"); new MainServerThread().start();
System.out.println("Server thread started");
// get the lobby controller so that we can pass the game server thread to it // get the lobby controller so that we can pass the game server thread to it
LobbyController lobbyController = (LobbyController) setContentPane("/views/LobbyView.fxml"); setContentPane("/views/LobbyView.fxml");
lobbyController.setGameServerThread(gameServerThread);
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
System.err.println("COULD NOT FIND YOUR IP ADDRESS!"); System.err.println("COULD NOT FIND YOUR IP ADDRESS!");
@@ -2,31 +2,40 @@ package seng302.gameServerWithThreading;
import seng302.gameServer.GameStages; import seng302.gameServer.GameStages;
import seng302.gameServer.GameState; import seng302.gameServer.GameState;
import seng302.models.stream.PacketBufferDelegate;
import seng302.models.stream.StreamParser;
import seng302.models.stream.packets.StreamPacket;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.PriorityBlockingQueue;
/** /**
* A class describing the overall server, which creates and collects server threads for each client * A class describing the overall server, which creates and collects server threads for each client
* Created by wmu16 on 13/07/17. * Created by wmu16 on 13/07/17.
*/ */
public class ServerThreadHandler extends Thread { public class MainServerThread extends Thread implements PacketBufferDelegate{
private static final int PORT = 4950; private static final int PORT = 4950;
private static final Integer MAX_NUM_PLAYERS = 10; private static final Integer MAX_NUM_PLAYERS = 10;
private ServerSocket serverSocket = null; private ServerSocket serverSocket = null;
private Socket socket; private Socket socket;
private ArrayList<ServerThread> serverThreads = new ArrayList<>(); private ArrayList<ServerToClientThread> serverToClientThreads = new ArrayList<>();
public ServerThreadHandler() { private PriorityBlockingQueue<StreamPacket> packetBuffer;
public MainServerThread() {
try { try {
serverSocket = new ServerSocket(PORT); serverSocket = new ServerSocket(PORT);
} catch (IOException e) { } catch (IOException e) {
System.out.println("IO error in server thread handler upon trying to make new server socket"); System.out.println("IO error in server thread handler upon trying to make new server socket");
} }
packetBuffer = new PriorityBlockingQueue<>();
} }
@@ -38,30 +47,61 @@ public class ServerThreadHandler extends Thread {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
//LOBBYING
if (GameState.getCurrentStage() == GameStages.LOBBYING && GameState.getPlayers().size() < MAX_NUM_PLAYERS) { if (GameState.getCurrentStage() == GameStages.LOBBYING && GameState.getPlayers().size() < MAX_NUM_PLAYERS) {
try { try {
// TODO: 14/07/17 wmu16 - Get out of blocking call somehow after a time
socket = serverSocket.accept(); socket = serverSocket.accept();
} catch (IOException e) { } catch (IOException e) {
System.out.println("IO error in server thread handler upon trying to accept connection"); System.out.println("IO error in server thread handler upon trying to accept connection");
} }
ServerThread thread = new ServerThread(socket); ServerToClientThread thread = new ServerToClientThread(socket, this);
serverThreads.add(thread); serverToClientThreads.add(thread);
thread.start(); thread.start();
} }
updateClients(); //RACING
else if (GameState.getCurrentStage() == GameStages.RACING) {
} }
//FINISHED
else if (GameState.getCurrentStage() == GameStages.FINISHED) {
}
updateClients();
while (!packetBuffer.isEmpty()){
try {
StreamPacket packet = packetBuffer.take();
StreamParser.parsePacket(packet);
} catch (InterruptedException e) {
continue;
}
}
}
// TODO: 14/07/17 wmu16 - Send out disconnect packet to clients
try { try {
serverSocket.close(); serverSocket.close();
return;
} catch (IOException e) { } catch (IOException e) {
System.out.println("IO error in server thread handler upon closing socket"); System.out.println("IO error in server thread handler upon closing socket");
} }
} }
public void updateClients() { public void updateClients() {
for (ServerThread serverThread : serverThreads) { for (ServerToClientThread serverToClientThread : serverToClientThreads) {
serverThread.updateClient(); serverToClientThread.updateClient();
} }
} }
@Override
public boolean addToBuffer(StreamPacket streamPacket) {
return packetBuffer.add(streamPacket);
}
} }
@@ -1,105 +0,0 @@
package seng302.gameServerWithThreading;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
* A class describing a single connection to a Client for the purposes of sending and receiving on its own thread.
* All server threads created and owned by the server thread handler which can trigger client updates on its threads
* Created by wmu16 on 13/07/17.
*/
public class ServerThread extends Thread {
private static final Integer MAX_ID_ATTEMPTS = 10;
private InputStream is;
private OutputStream os;
private Socket socket;
private Boolean userIdentified = false;
private Boolean connected = true;
private Boolean updateClient = true;
public ServerThread(Socket socket) {
this.socket = socket;
}
public void run() {
try {
is = socket.getInputStream();
os = socket.getOutputStream();
} catch (IOException e) {
System.out.println("IO error in server thread upon grabbing streams");
}
threeWayHandshake();
// TODO: 13/07/17 wmu16 - Some way of knowing if the client is still connected. perhaps when we read disconnect message switch this bool?
while (connected) {
//Perform a read and update game state
try {
Integer userInput = is.read();
} catch (IOException e) {
System.out.println("IO error in server thread upon reading input stream");
}
//Perform a write if it is time to as delegated by the ServerThreadHandler
if (updateClient) {
// TODO: 13/07/17 wmu16 - Write out game state - some function that would write all appropriate messages to this output stream
// try {
// GameState.outputState(os);
// } catch (IOException e) {
// System.out.println("IO error in server thread upon writing to output stream");
// }
updateClient = false;
}
}
closeSocket();
}
public void updateClient() {
updateClient = true;
}
/**
* Tries to confirm the connection just accepted.
* Sends ID, expects that ID echoed for confirmation,
* if so, sends a confirmation packet back to that connection
* Creates a player instance with that ID and this thread and adds it to the GameState
* If not, close the socket and end the threads execution
*/
private void threeWayHandshake() {
// TODO: 13/07/17 Finish using AC35
// Integer playerID = GameState.getUniquePlayerID();
// Integer confirmationID = null;
// Integer identificationAttempt = 0
// while (!userIdentified) {
// os.write(playerID); //Send out new ID looking for echo
// confirmationID = is.read();
// if (playerID == idConfirmation) { //ID is echoed back. Connection is a client
// os.write( some determined confirmation message ); //Confirm to client
// GameState.addPlayer(new Player(playerID, this)); //Create a player in game state for client
// userIdentified = true;
// } else if (identificationAttempt > MAX_ID_ATTEMPTS) { //No response. not a client. tidy up and go home.
// closeSocket();
// return;
// }
// identificationAttempt++;
// }
}
public void closeSocket() {
try {
socket.close();
} catch (IOException e) {
System.out.println("IO error in server thread upon trying to close socket");
}
}
}
@@ -0,0 +1,163 @@
package seng302.gameServerWithThreading;
import seng302.models.stream.PacketBufferDelegate;
import seng302.models.stream.packets.StreamPacket;
import seng302.server.messages.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
/**
* A class describing a single connection to a Client for the purposes of sending and receiving on its own thread.
* All server threads created and owned by the server thread handler which can trigger client updates on its threads
* Created by wmu16 on 13/07/17.
*/
public class ServerToClientThread extends Thread {
private static final Integer MAX_ID_ATTEMPTS = 10;
private InputStream is;
private OutputStream os;
private Socket socket;
private ByteArrayOutputStream crcBuffer;
private final PacketBufferDelegate packetBufferDelegate;
private Boolean userIdentified = false;
private Boolean connected = true;
private Boolean updateClient = true;
public ServerToClientThread(Socket socket, PacketBufferDelegate packetBufferDelegate) {
this.socket = socket;
this.packetBufferDelegate = packetBufferDelegate;
}
public void run() {
try {
is = socket.getInputStream();
os = socket.getOutputStream();
} catch (IOException e) {
System.out.println("IO error in server thread upon grabbing streams");
}
threeWayHandshake();
int sync1;
int sync2;
// TODO: 14/07/17 wmu16 - Work out how to fix this while loop
while(true) {
try {
//Perform a write if it is time to as delegated by the MainServerThread
if (updateClient) {
// TODO: 13/07/17 wmu16 - Write out game state - some function that would write all appropriate messages to this output stream
// try {
// GameState.outputState(os);
// } catch (IOException e) {
// System.out.println("IO error in server thread upon writing to output stream");
// }
updateClient = false;
}
crcBuffer = new ByteArrayOutputStream();
sync1 = readByte();
sync2 = readByte();
//checking if it is the start of the packet
if(sync1 == 0x47 && sync2 == 0x83) {
int type = readByte();
//No. of milliseconds since Jan 1st 1970
long timeStamp = Message.bytesToLong(getBytes(6));
skipBytes(4);
long payloadLength = Message.bytesToLong(getBytes(2));
byte[] payload = getBytes((int) payloadLength);
Checksum checksum = new CRC32();
checksum.update(crcBuffer.toByteArray(), 0, crcBuffer.size());
long computedCrc = checksum.getValue();
long packetCrc = Message.bytesToLong(getBytes(4));
if (computedCrc == packetCrc) {
packetBufferDelegate.addToBuffer(new StreamPacket(type, payloadLength, timeStamp, payload));
} else {
System.err.println("Packet has been dropped");
}
}
} catch (Exception e) {
closeSocket();
return;
}
}
}
public void updateClient() {
updateClient = true;
}
/**
* Tries to confirm the connection just accepted.
* Sends ID, expects that ID echoed for confirmation,
* if so, sends a confirmation packet back to that connection
* Creates a player instance with that ID and this thread and adds it to the GameState
* If not, close the socket and end the threads execution
*/
private void threeWayHandshake() {
// // TODO: 13/07/17 Finish using AC35
// Integer playerID = GameState.getUniquePlayerID();
// Integer confirmationID = null;
// Integer identificationAttempt = 0
// while (!userIdentified) {
// os.write(playerID); //Send out new ID looking for echo
// confirmationID = is.read();
// if (playerID == idConfirmation) { //ID is echoed back. Connection is a client
// os.write( some determined confirmation message ); //Confirm to client
// GameState.addPlayer(new Player(playerID, this)); //Create a player in game state for client
// userIdentified = true;
// } else if (identificationAttempt > MAX_ID_ATTEMPTS) { //No response. not a client. tidy up and go home.
// closeSocket();
// return;
// }
// identificationAttempt++;
// }
}
public void closeSocket() {
try {
socket.close();
} catch (IOException e) {
System.out.println("IO error in server thread upon trying to close socket");
}
}
private int readByte() throws Exception {
int currentByte = -1;
try {
currentByte = is.read();
crcBuffer.write(currentByte);
} catch (IOException e) {
e.printStackTrace();
}
if (currentByte == -1){
throw new Exception();
}
return currentByte;
}
private byte[] getBytes(int n) throws Exception{
byte[] bytes = new byte[n];
for (int i = 0; i < n; i++){
bytes[i] = (byte) readByte();
}
return bytes;
}
private void skipBytes(long n) throws Exception{
for (int i=0; i < n; i++){
readByte();
}
}
}
@@ -0,0 +1,7 @@
package seng302.models.stream;
import seng302.models.stream.packets.StreamPacket;
public interface PacketBufferDelegate {
boolean addToBuffer(StreamPacket streamPacket);
}
@@ -32,7 +32,7 @@ import seng302.models.stream.packets.StreamPacket;
* that are threadsafe so the visualiser can always access the latest speed and position available * that are threadsafe so the visualiser can always access the latest speed and position available
* Created by kre39 on 23/04/17. * Created by kre39 on 23/04/17.
*/ */
public class StreamParser extends Thread { public class StreamParser{
public static ConcurrentHashMap<Long, PriorityBlockingQueue<BoatPositionPacket>> markLocations = new ConcurrentHashMap<>(); public static ConcurrentHashMap<Long, PriorityBlockingQueue<BoatPositionPacket>> markLocations = new ConcurrentHashMap<>();
public static ConcurrentHashMap<Long, PriorityBlockingQueue<BoatPositionPacket>> boatLocations = new ConcurrentHashMap<>(); public static ConcurrentHashMap<Long, PriorityBlockingQueue<BoatPositionPacket>> boatLocations = new ConcurrentHashMap<>();
@@ -58,54 +58,16 @@ public class StreamParser extends Thread {
/** /**
* Used to initialise the thread name and stream parser object so a thread can be executed * Used to initialise the thread name and stream parser object so a thread can be executed
*
* @param threadName name of the thread
*/ */
public StreamParser(String threadName) { public StreamParser() {
this.threadName = threadName;
} }
/**
* Used to within threading so when the stream parser thread runs, it will keep looking for a
* packet to process until it is unable to find anymore packets
*/
public void run() {
appRunning = true;
try {
streamStatus = true;
xmlObject = new XMLParser();
while (StreamReceiver.packetBuffer == null || StreamReceiver.packetBuffer.size() < 1) {
Thread.sleep(1);
}
while (appRunning) {
StreamPacket packet = StreamReceiver.packetBuffer.take();
parsePacket(packet);
Thread.sleep(1);
while (StreamReceiver.packetBuffer.peek() == null) {
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Used to start the stream parser thread when multithreading
*/
public void start() {
if (t == null) {
t = new Thread(this, threadName);
t.start();
}
}
/** /**
* Looks at the type of the packet then sends it to the appropriate parser to extract the * Looks at the type of the packet then sends it to the appropriate parser to extract the
* specific data associated with that packet type * specific data associated with that packet type
* *
* @param packet the packet to be looked at and processed * @param packet the packet to be looked at and processed
*/ */
private static void parsePacket(StreamPacket packet) { public static void parsePacket(StreamPacket packet) {
try { try {
switch (packet.getType()) { switch (packet.getType()) {
case HEARTBEAT: case HEARTBEAT:
@@ -194,6 +194,25 @@ public abstract class Message {
return data; return data;
} }
/**
* takes an array of up to 7 bytes in little endian format and
* returns a positive long constructed from the input bytes
*
* @return a positive long if there is less than 8 bytes -1 otherwise
*/
public static long bytesToLong(byte[] bytes){
long partialLong = 0;
int index = 0;
for (byte b: bytes){
if (index > 6){
return -1;
}
partialLong = partialLong | (b & 0xFFL) << (index * 8);
index++;
}
return partialLong;
}
/** /**
* Reverse an array of bytes * Reverse an array of bytes
* @param data The byte[] to reverse * @param data The byte[] to reverse
@@ -205,4 +224,5 @@ public abstract class Message {
data[right] = (byte) (temp & 0xff); data[right] = (byte) (temp & 0xff);
} }
} }
} }