This repository has been archived on 2021-10-31. You can view files and clone it, but cannot push or open issues or pull requests.
NTW/hw2/src/GBNTSender.java

167 lines
4.5 KiB
Java

import transport.TimeoutAction;
import java.util.*;
public class GBNTSender extends transport.Sender implements TimeoutAction {
private static void writeBigEndianChar(byte[] dest, char data) {
dest[0] = (byte) ((data >> 8) & 0xFF);
dest[1] = (byte) (data & 0xFF);
}
private static char readBigEndianChar(byte[] src, int offset) {
return (char) (src[offset] << 8 + src[offset + 1]);
}
private static char incrementAndRollover(char c) {
return (char) ((c + 1) % 65536);
}
private final Map<Character, Packet> packetMap = new HashMap<>();
private class Packet {
private final byte[] contents;
private final char sequence;
private final int length;
private long estimateStart;
Packet(char sequence, byte[] buffer, int offset, int length) {
this.sequence = sequence;
this.length = Math.min(length + 2, MAX_PACKET_SIZE);
contents = new byte[MAX_PACKET_SIZE];
writeBigEndianChar(contents, sequence);
System.arraycopy(buffer, offset, this.contents, 0, this.length);
packetMap.put(this.sequence, this);
}
void send() {
estimateStart = System.currentTimeMillis();
unreliableSend(contents, 0, length);
}
void destroy() {
packetMap.remove(this.sequence);
}
private long getRTT() {
return System.currentTimeMillis() - estimateStart;
}
int getLength() {
return length;
}
char getSequence() { return sequence; }
}
private static final char W = 10;
private char base = (char) (new Random().nextInt() % 65536);
private char waitingACK = 0;
private int timeoutMs = 1000;
private double rttEWMA = 0;
private double devRttEWMA = 0;
private State state = State.SETUP;
private final Queue<Packet> packets = new ArrayDeque<>();
enum State {
SETUP,
SENDING,
CLOSING,
}
public GBNTSender() {
sendSetup();
}
private void computeTimeoutLength(long rttMIllis) {
devRttEWMA = 0.75 * devRttEWMA + 0.25 * (rttEWMA - rttMIllis);
rttEWMA = 0.875 * rttEWMA + 0.125 * rttMIllis;
timeoutMs = (int) (rttEWMA + 4 * devRttEWMA);
}
private void flushAckedPackets() {
for (Packet p = packets.peek();
p != null && p.getSequence() != base;
p = packets.peek()) {
packets.remove();
p.destroy();
}
}
private byte[] createSetupPacket() {
byte[] packet = new byte[2];
writeBigEndianChar(packet, base);
return packet;
}
/**
* Sends the randomly chosen sequence number
*/
private void sendSetup() {
unreliableSend(createSetupPacket(), 0, 2);
blockSender();
}
@Override
protected int reliableSend(byte[] bytes, int offset, int length) {
if (state != State.SENDING) throw new IllegalStateException();
Packet p = new Packet(base, bytes, offset, length);
waitingACK++;
packets.add(p);
p.send();
setTimeout(timeoutMs, this);
if (waitingACK >= W) {
blockSender();
}
return p.getLength();
}
@Override
public void close() {
state = State.CLOSING;
if (waitingACK > 0) {
blockSender();
}
}
@Override
public void timeoutAction() {
for (Packet p : packets) {
p.send();
}
}
@Override
protected void unreliableReceive(byte[] bytes, int offset, int length) {
switch (state) {
case SETUP:
byte[] expected = createSetupPacket();
if (bytes[offset] == expected[0] && bytes[1] == expected[offset + 1]) {
state = State.SENDING;
resumeSender();
} else {
sendSetup();
}
break;
case SENDING:
case CLOSING:
char ackSeq = readBigEndianChar(bytes, offset);
cancelTimeout(this);
computeTimeoutLength(packetMap.get(ackSeq).getRTT());
boolean mustResume = waitingACK == W && state == State.SENDING;
waitingACK -= (ackSeq - base);
base = incrementAndRollover(ackSeq);
flushAckedPackets();
if (mustResume || (waitingACK == 0 && state == State.CLOSING)) {
resumeSender();
}
}
}
}