This repository has been archived on 2021-10-31. You can view files and clone it, but cannot push or open issues or pull requests.
NTW/hw2/src/impl/GBNTSender.java

244 lines
7.0 KiB
Java

/*
* GBNTSender - Claudio Maggioni (2020)
* NTW2020 HW2
*
* No external source was used to create this file, other than regular course material.
*
* Note on implementation: change value of constant W to change the size of the otherwise
* fixed window.
*/
import transport.TimeoutAction;
import java.util.*;
import java.util.concurrent.Semaphore;
public class GBNTSender extends transport.Sender implements TimeoutAction {
private static void writeBigEndianChar(byte[] dest, char data) {
dest[0] = (byte) ((data >> 8) & 0xFF);
dest[1] = (byte) (data & 0xFF);
}
private static char readBigEndianChar(byte[] src, int offset) {
return (char) (((src[offset] << 8) & 0xFF00) | (src[offset + 1] & 0xFF));
}
private static char addAndRollover(char c, char i) {
return (char) ((c + i) % 65536);
}
private static char incrementAndRollover(char c) {
return (char) ((c + 1) % 65536);
}
private final Map<Character, Packet> packetMap = new HashMap<>();
private Semaphore s = new Semaphore(0);
private class Packet {
private final byte[] contents;
private final char sequence;
private final int length;
private long estimateStart;
Packet(char sequence, byte[] buffer, int offset, int length) {
this.sequence = sequence;
this.length = Math.min(length, MAX_PACKET_SIZE - 2);
contents = new byte[this.length + 2];
writeBigEndianChar(contents, sequence);
System.arraycopy(buffer, offset, contents, 2, contents.length - 2);
packetMap.put(this.sequence, this);
}
void send() {
System.out.println("Sending seq: " + (int) sequence + " - (" + length + " + 2 bytes)");
estimateStart = System.currentTimeMillis();
unreliableSend(contents, 0, contents.length);
}
void destroy() {
packetMap.remove(this.sequence);
}
private long getRTT() {
return System.currentTimeMillis() - estimateStart;
}
int getLength() {
return length;
}
char getSequence() { return sequence; }
}
/**
* Change this parameter to change the window size
*/
private static final char W = 16;
private char base = (char) (new Random().nextInt() % 65536);
private char waitingACK = 0;
private int timeoutMs = 500;
private double rttEWMA = 100;
private double devRttEWMA = 100;
private void resetTimeoutEstimation() {
timeoutMs = 500;
rttEWMA = 100;
devRttEWMA = 100;
}
private State state = State.SETUP;
private final Queue<Packet> packets = new ArrayDeque<>();
enum State {
SETUP,
SENDING,
CLOSING,
}
private void computeTimeoutLength(long rttMIllis) {
devRttEWMA = 0.75 * devRttEWMA + 0.25 * (rttEWMA - rttMIllis);
rttEWMA = 0.875 * rttEWMA + 0.125 * rttMIllis;
timeoutMs = (int) (rttEWMA + 4 * devRttEWMA);
System.out.println("devRTT: " + devRttEWMA + " RTT: " + rttEWMA + " timeout: " + timeoutMs);
}
private void flushAckedPackets() {
for (Packet p = packets.peek();
p != null && p.getSequence() != base;
p = packets.peek()) {
packets.remove();
p.destroy();
}
}
private byte[] createSetupPacket() {
byte[] packet = new byte[2];
System.out.println("Sending setup: " + (int) base);
writeBigEndianChar(packet, base);
return packet;
}
/**
* Sends the randomly chosen sequence number
*/
private void sendSetup() {
unreliableSend(createSetupPacket(), 0, 2);
blockSender();
resetTimeout();
}
private boolean timeoutSet = false;
private void resetTimeout() {
if (timeoutSet) cancelTimeout(this);
if (waitingACK > 0 || state == State.SETUP) {
this.setTimeout(timeoutMs, this);
timeoutSet = true;
}
}
@Override
protected int reliableSend(byte[] bytes, int offset, int length) {
if (state == State.SETUP) {
sendSetup();
return 0;
}
Packet p = new Packet(addAndRollover(base, waitingACK), bytes, offset, length);
waitingACK++;
packets.add(p);
p.send();
resetTimeout();
if (waitingACK >= W) {
System.out.println("Window full " + (int) p.sequence);
blockSender();
}
return p.getLength();
}
@Override
public void close() {
state = State.CLOSING;
System.out.print("FIN packet: ");
reliableSend(new byte[0], 0, 0);
if (waitingACK > 0) {
blockSender();
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(255);
}
}
}
private volatile boolean abortTimeout = false;
@Override
public void timeoutAction() {
abortTimeout = true;
timeoutActionImpl();
}
private synchronized void timeoutActionImpl() {
abortTimeout = false;
timeoutSet = false;
if (state != State.SETUP) {
for (Packet p : packets) {
p.send();
if (abortTimeout) {
break;
}
}
}
else sendSetup();
resetTimeoutEstimation();
resetTimeout();
}
@Override
protected void unreliableReceive(byte[] bytes, int offset, int length) {
switch (state) {
case SETUP:
byte[] expected = createSetupPacket();
if (bytes[offset] == expected[0] && bytes[1] == expected[offset + 1]) {
state = State.SENDING;
base = incrementAndRollover(base);
System.out.println("Synchronization successful");
resumeSender();
} else {
System.out.println("Synchronization seq: " + (int) base);
sendSetup();
}
break;
case SENDING:
case CLOSING:
char ackSeq = readBigEndianChar(bytes, offset);
System.out.println("Received ACK: " + (int) ackSeq);
Packet p = packetMap.get(ackSeq);
if (p != null) computeTimeoutLength(p.getRTT());
boolean mustResume = waitingACK < W && state == State.SENDING;
waitingACK -= (ackSeq + 1 - base);
base = incrementAndRollover(ackSeq);
resetTimeout();
flushAckedPackets();
if (mustResume || (waitingACK == 0 && state == State.CLOSING)) {
resumeSender();
if (state == State.CLOSING) {
System.out.println("Closing");
s.release();
}
}
}
}
}