From 1e1e482b79f25f6b99012e3f7dec3f1ef2f536fc Mon Sep 17 00:00:00 2001 From: Peter Galloway Date: Sat, 29 Apr 2017 18:56:41 +1200 Subject: [PATCH] Added a delay for reading packets from the packet buffer so packets that are recieved out of order have time to order by timestamp in the priority queue #story[820] --- .../java/seng302/controllers/Controller.java | 1 - .../seng302/models/parsers/StreamParser.java | 31 +++++++++++++------ .../models/parsers/StreamReceiver.java | 26 ++++------------ src/main/resources/views/MainView.fxml | 2 +- 4 files changed, 29 insertions(+), 31 deletions(-) diff --git a/src/main/java/seng302/controllers/Controller.java b/src/main/java/seng302/controllers/Controller.java index 1892d472..a788054a 100644 --- a/src/main/java/seng302/controllers/Controller.java +++ b/src/main/java/seng302/controllers/Controller.java @@ -33,6 +33,5 @@ public class Controller implements Initializable { @Override public void initialize(URL location, ResourceBundle resources) { - setContentPane("/views/RaceView.fxml"); } } diff --git a/src/main/java/seng302/models/parsers/StreamParser.java b/src/main/java/seng302/models/parsers/StreamParser.java index 766296a7..7cbfc90d 100644 --- a/src/main/java/seng302/models/parsers/StreamParser.java +++ b/src/main/java/seng302/models/parsers/StreamParser.java @@ -22,14 +22,14 @@ import java.util.concurrent.ConcurrentHashMap; /** * The purpose of this class is to take in the stream of divided packets so they can be read * and parsed in by turning the byte arrays into useful data. There are two public static hashmaps - * that are threadsafe so the visualiser can always access the latest speed and position avaible + * that are threadsafe so the visualiser can always access the latest speed and position available * Created by kre39 on 23/04/17. */ public class StreamParser extends Thread{ public static ConcurrentHashMap boatPositions = new ConcurrentHashMap<>(); public static ConcurrentHashMap boatSpeeds = new ConcurrentHashMap<>(); - private String threadName; + private String threadName; private Thread t; private static boolean raceStarted = false; @@ -48,9 +48,23 @@ public class StreamParser extends Thread{ Thread.sleep(1); } while (StreamReceiver.packetBuffer.peek() != null){ - StreamPacket packet = StreamReceiver.packetBuffer.take(); + StreamPacket packet = StreamReceiver.packetBuffer.peek(); + int delayTime = 1000; + int loopTime = delayTime + 1000; + long sleepTime = 0; + long transitTime = (System.currentTimeMillis()%loopTime - packet.getTimeStamp()%loopTime); + if (transitTime < 0){ + transitTime = loopTime + delayTime; + } + if (transitTime < delayTime) { + sleepTime = delayTime - (transitTime); + Thread.sleep(sleepTime); + } + System.out.println(sleepTime); + + packet = StreamReceiver.packetBuffer.take(); parsePacket(packet); - Thread.sleep(10); + Thread.sleep(1); while (StreamReceiver.packetBuffer.peek() == null) { Thread.sleep(1); } @@ -152,7 +166,6 @@ public class StreamParser extends Thread{ } if (timeTillStart % 10 == 0){ System.out.println("Time since start: " + -1 * timeTillStart + " Seconds"); - } } long windDir = bytesToLong(Arrays.copyOfRange(payload,18,20)); @@ -316,10 +329,10 @@ public class StreamParser extends Thread{ // System.out.println("seq = " + seq); //needs to be validated Point3D point = new Point3D(((180d * (double)lat)/Math.pow(2,31)),((180d *(double)lon)/Math.pow(2,31)),(double)heading); - boatPositions.putIfAbsent(boatId, point); - boatSpeeds.putIfAbsent(boatId, groundSpeed); - boatPositions.replace(boatId, point); - boatSpeeds.replace(boatId, groundSpeed); + boatPositions.put(boatId, point); + boatSpeeds.put(boatId, groundSpeed); +// boatPositions.replace(boatId, point); +// boatSpeeds.replace(boatId, groundSpeed); // System.out.println("lon = " + ((180d * (double)lon)/Math.pow(2,31))); // System.out.println("lat = " + ((180d *(double)lat)/Math.pow(2,31))); } diff --git a/src/main/java/seng302/models/parsers/StreamReceiver.java b/src/main/java/seng302/models/parsers/StreamReceiver.java index b6c1e658..74ce4748 100644 --- a/src/main/java/seng302/models/parsers/StreamReceiver.java +++ b/src/main/java/seng302/models/parsers/StreamReceiver.java @@ -4,8 +4,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.Socket; -import java.util.ArrayList; -import java.util.Collection; import java.util.Comparator; import java.util.concurrent.PriorityBlockingQueue; import java.util.zip.CRC32; @@ -15,8 +13,8 @@ import java.util.zip.Checksum; public class StreamReceiver extends Thread { private InputStream stream; private Socket host; - private ByteArrayOutputStream crcBuffer; - private Thread t; + private ByteArrayOutputStream crcBuffer; + private Thread thread; private String threadName; public static PriorityBlockingQueue packetBuffer; @@ -31,21 +29,20 @@ public class StreamReceiver extends Thread { } public void run(){ - PriorityBlockingQueue pq = new PriorityBlockingQueue<>(256, new Comparator() { + packetBuffer = new PriorityBlockingQueue<>(256, new Comparator() { @Override public int compare(StreamPacket s1, StreamPacket s2) { return (int) (s1.getTimeStamp() - s2.getTimeStamp()); } }); - packetBuffer = pq; connect(); } public void start () { System.out.println("Starting " + threadName ); - if (t == null) { - t = new Thread (this, threadName); - t.start (); + if (thread == null) { + thread = new Thread (this, threadName); + thread.start (); } } @@ -141,15 +138,4 @@ public class StreamReceiver extends Thread { } return partialLong; } - - - - - public static void main(String[] args) { - - StreamReceiver sr = new StreamReceiver("csse-s302staff.canterbury.ac.nz", 4941,"TestThread1"); - //StreamReceiver sr = new StreamReceiver("livedata.americascup.com", 4941, pq); - sr.start(); - - } } diff --git a/src/main/resources/views/MainView.fxml b/src/main/resources/views/MainView.fxml index ac0b944e..4a3d7298 100644 --- a/src/main/resources/views/MainView.fxml +++ b/src/main/resources/views/MainView.fxml @@ -6,6 +6,6 @@ - +