diff --git a/hw2/.idea/.gitignore b/hw2/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/hw2/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/hw2/.idea/dictionaries/maggicl.xml b/hw2/.idea/dictionaries/maggicl.xml new file mode 100644 index 0000000..575e258 --- /dev/null +++ b/hw2/.idea/dictionaries/maggicl.xml @@ -0,0 +1,7 @@ + + + + ewma + + + \ No newline at end of file diff --git a/hw2/.idea/libraries/transport.xml b/hw2/.idea/libraries/transport.xml new file mode 100644 index 0000000..16596b2 --- /dev/null +++ b/hw2/.idea/libraries/transport.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/hw2/.idea/misc.xml b/hw2/.idea/misc.xml new file mode 100644 index 0000000..37e641e --- /dev/null +++ b/hw2/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/hw2/.idea/modules.xml b/hw2/.idea/modules.xml new file mode 100644 index 0000000..87db25b --- /dev/null +++ b/hw2/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/hw2/.idea/uiDesigner.xml b/hw2/.idea/uiDesigner.xml new file mode 100644 index 0000000..e96534f --- /dev/null +++ b/hw2/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/hw2/.idea/vcs.xml b/hw2/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/hw2/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/hw2/hw2.iml b/hw2/hw2.iml new file mode 100644 index 0000000..c583ea4 --- /dev/null +++ b/hw2/hw2.iml @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/hw2/src/GBNTReceiver.java b/hw2/src/GBNTReceiver.java new file mode 100644 index 0000000..bf3cf83 --- /dev/null +++ b/hw2/src/GBNTReceiver.java @@ -0,0 +1,14 @@ +import transport.Receiver; +import transport.TimeoutAction; + +public class GBNTReceiver extends Receiver implements TimeoutAction { + @Override + public void timeoutAction() { + + } + + @Override + protected void unreliableReceive(byte[] bytes, int i, int i1) { + + } +} diff --git a/hw2/src/GBNTSender.java b/hw2/src/GBNTSender.java new file mode 100644 index 0000000..06fa398 --- /dev/null +++ b/hw2/src/GBNTSender.java @@ -0,0 +1,166 @@ +import transport.TimeoutAction; + +import java.util.*; + +public class GBNTSender extends transport.Sender implements TimeoutAction { + + private static void writeBigEndianChar(byte[] dest, char data) { + dest[0] = (byte) ((data >> 8) & 0xFF); + dest[1] = (byte) (data & 0xFF); + } + + private static char readBigEndianChar(byte[] src, int offset) { + return (char) (src[offset] << 8 + src[offset + 1]); + } + + private static char incrementAndRollover(char c) { + return (char) ((c + 1) % 65536); + } + + private final Map packetMap = new HashMap<>(); + + private class Packet { + private final byte[] contents; + private final char sequence; + private final int length; + private long estimateStart; + + Packet(char sequence, byte[] buffer, int offset, int length) { + this.sequence = sequence; + this.length = Math.min(length + 2, MAX_PACKET_SIZE); + contents = new byte[MAX_PACKET_SIZE]; + writeBigEndianChar(contents, sequence); + System.arraycopy(buffer, offset, this.contents, 0, this.length); + packetMap.put(this.sequence, this); + } + + void send() { + estimateStart = System.currentTimeMillis(); + unreliableSend(contents, 0, length); + } + + void destroy() { + packetMap.remove(this.sequence); + } + + private long getRTT() { + return System.currentTimeMillis() - estimateStart; + } + + int getLength() { + return length; + } + + char getSequence() { return sequence; } + } + + private static final char W = 10; + private char base = (char) (new Random().nextInt() % 65536); + private char waitingACK = 0; + private int timeoutMs = 1000; + private double rttEWMA = 0; + private double devRttEWMA = 0; + + private State state = State.SETUP; + private final Queue packets = new ArrayDeque<>(); + + enum State { + SETUP, + SENDING, + CLOSING, + } + + public GBNTSender() { + sendSetup(); + } + + private void computeTimeoutLength(long rttMIllis) { + devRttEWMA = 0.75 * devRttEWMA + 0.25 * (rttEWMA - rttMIllis); + rttEWMA = 0.875 * rttEWMA + 0.125 * rttMIllis; + timeoutMs = (int) (rttEWMA + 4 * devRttEWMA); + } + + private void flushAckedPackets() { + for (Packet p = packets.peek(); + p != null && p.getSequence() != base; + p = packets.peek()) { + packets.remove(); + p.destroy(); + } + } + + private byte[] createSetupPacket() { + byte[] packet = new byte[2]; + writeBigEndianChar(packet, base); + return packet; + } + + /** + * Sends the randomly chosen sequence number + */ + private void sendSetup() { + unreliableSend(createSetupPacket(), 0, 2); + blockSender(); + } + + @Override + protected int reliableSend(byte[] bytes, int offset, int length) { + if (state != State.SENDING) throw new IllegalStateException(); + + Packet p = new Packet(base, bytes, offset, length); + waitingACK++; + packets.add(p); + p.send(); + setTimeout(timeoutMs, this); + + if (waitingACK >= W) { + blockSender(); + } + + return p.getLength(); + } + + @Override + public void close() { + state = State.CLOSING; + if (waitingACK > 0) { + blockSender(); + } + } + + @Override + public void timeoutAction() { + for (Packet p : packets) { + p.send(); + } + } + + @Override + protected void unreliableReceive(byte[] bytes, int offset, int length) { + switch (state) { + case SETUP: + byte[] expected = createSetupPacket(); + if (bytes[offset] == expected[0] && bytes[1] == expected[offset + 1]) { + state = State.SENDING; + resumeSender(); + } else { + sendSetup(); + } + break; + case SENDING: + case CLOSING: + char ackSeq = readBigEndianChar(bytes, offset); + + cancelTimeout(this); + computeTimeoutLength(packetMap.get(ackSeq).getRTT()); + + boolean mustResume = waitingACK == W && state == State.SENDING; + waitingACK -= (ackSeq - base); + base = incrementAndRollover(ackSeq); + flushAckedPackets(); + if (mustResume || (waitingACK == 0 && state == State.CLOSING)) { + resumeSender(); + } + } + } +} diff --git a/hw2/src/SAWReceiver2.java b/hw2/src/SAWReceiver2.java new file mode 100644 index 0000000..350bd25 --- /dev/null +++ b/hw2/src/SAWReceiver2.java @@ -0,0 +1,89 @@ +import transport.*; + +import java.net.*; +import java.io.*; + +public class SAWReceiver2 extends Receiver implements TimeoutAction { + private static final int R0 = 0; + private static final int R1 = 1; + private static final int CLOSED = 2; + + private byte[] ack1; + private byte[] ack0; + private byte[] ackfin; + private int state; + + public SAWReceiver2() { + ack0 = new byte[1]; + ack0[0] = 0; + + ack1 = new byte[1]; + ack1[0] = 1; + + ackfin = new byte[1]; + ackfin[0] = 2; + + state = R0; + } + + public void timeoutAction() { + try { + disconnect(); + } catch (java.lang.InterruptedException ex) { + System.out.println("Thread interrupted. Exiting directly."); + System.exit(0); + } + } + + private static void print_packet(byte[] buffer, int offset, int length) { + int end = offset + length; + System.out.print("\n["); + while (offset < end) + System.out.print(buffer[offset++] + " "); + System.out.print("]"); + } + + public void unreliableReceive(byte[] buffer, int offset, int length) { + // print_packet(buffer, offset, length); + cancelTimeout(this); + setTimeout(20000, this); + // try { Thread.sleep(1000); } catch (InterruptedException ignored) { } + switch (state) { + case R0: + System.out.print("R0:"); + if (length > 0 && buffer[offset] == 0) { + System.out.print("->R1 "); + state = R1; + deliver(buffer, offset + 1, length - 1); + unreliableSend(ack0, 0, 1); + } else if (length > 0 && buffer[offset] == 2) { + System.out.print("->CLOSED "); + state = CLOSED; + deliver(buffer, offset + 1, length - 1); + unreliableSend(ackfin, 0, 1); + deliver(null, 0, END_OF_STREAM); + } else { + System.out.print("!->R0 "); + unreliableSend(ack1, 0, 1); + } + return; + case R1: + System.out.print("R1:"); + if (length > 0 && buffer[offset] == 1) { + System.out.print("->R0 "); + state = R0; + deliver(buffer, offset + 1, length - 1); + unreliableSend(ack1, 0, 1); + } else if (length > 0 && buffer[offset] == 2) { + System.out.print("->CLOSED "); + state = CLOSED; + unreliableSend(ackfin, 0, 1); + deliver(null, 0, END_OF_STREAM); + } else { + System.out.print("!->R1 "); + unreliableSend(ack0, 0, 1); + } + return; + } + } +} diff --git a/hw2/src/SAWSender2.java b/hw2/src/SAWSender2.java new file mode 100644 index 0000000..b03f1b4 --- /dev/null +++ b/hw2/src/SAWSender2.java @@ -0,0 +1,226 @@ +import transport.*; + +public class SAWSender2 extends Sender implements TimeoutAction { + + private static final int S0 = 0; + private static final int ACK0 = 1; + private static final int S1 = 2; + private static final int ACK1 = 3; + private static final int ACK0FIN = 4; // waiting for ack0, then go to FIN + private static final int ACK1FIN = 5; // waiting for ack1, then go to FIN + private static final int FIN = 6; // waiting for final ack, then go to CLOSE + private static final int CLOSED = 7; + + private static final int SENDER_TIMEOUT_MS = 5000; + + private int state; + private byte[] data_pkt; + private int data_pkt_len; + + public SAWSender2() { + state = S0; + data_pkt = new byte[MAX_PACKET_SIZE]; + data_pkt_len = 0; + } + + private void make_packet(int seq, byte[] buffer, int offset, int length) { + if (length + 1 > MAX_PACKET_SIZE) { + data_pkt_len = MAX_PACKET_SIZE; + } else { + data_pkt_len = length + 1; + } + data_pkt[0] = (byte) seq; + for (int i = 1; i < data_pkt_len; ++i) + data_pkt[i] = buffer[offset++]; + } + + private static void print_packet(byte[] buffer, int offset, int length) { + int end = offset + length; + System.out.print("\n["); + while (offset < end) + System.out.print(buffer[offset++] + " "); + System.out.print("]"); + } + + public void unreliableReceive(byte[] buffer, int offset, int length) { + // print_packet(buffer, offset, length); + switch (state) { + case S0: + System.err.println("SAWSender.unreliableReceive: received packet in S0. Ignoring."); + return; + + case S1: + System.err.println("SAWSender.unreliableReceive: received packet in S1. Ignoring."); + return; + + case ACK0: + System.out.print("ACK0:"); + if (length > 0 && buffer[offset] == 0) { + System.out.print("->S1 "); + state = S1; + cancelTimeout(this); + allowDisconnect(); + resumeSender(); + } else { + System.out.print("!->ACK0 "); + cancelTimeout(this); + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + } + return; + + case ACK0FIN: + System.out.print("ACK0FIN:"); + if (length > 0 && buffer[offset] == 0) { + cancelTimeout(this); + goToFinState(); + } else { + System.out.print("!->ACK1FIN "); + cancelTimeout(this); + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + } + return; + + case ACK1: + System.out.print("ACK1:"); + if (length > 0 && buffer[offset] == 1) { + System.out.print("->S0 "); + state = S0; + cancelTimeout(this); + allowDisconnect(); + resumeSender(); + } else { + System.out.print("!->ACK1 "); + cancelTimeout(this); + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + } + return; + + case ACK1FIN: + System.out.print("ACK1FIN:"); + if (length > 0 && buffer[offset] == 1) { + cancelTimeout(this); + goToFinState(); + } else { + System.out.print("!->ACK1FIN "); + cancelTimeout(this); + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + } + return; + + case FIN: + System.out.print("FIN:"); + if (length > 0 && buffer[offset] == 2) { + System.out.print("->CLOSED "); + state = CLOSED; + cancelTimeout(this); + allowDisconnect(); + resumeSender(); + } else { + System.out.print("!->FIN "); + cancelTimeout(this); + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + } + return; + } + } + + public void timeoutAction() { + switch (state) { + case S0: + System.err.println("SAWSender.timeoutAction: timeout in S0. Ignoring timeout."); + return; + + case S1: + System.err.println("SAWSender.timeoutAction: timeout in S1. Ignoring timeout."); + return; + + case FIN: + case ACK0: + case ACK1: + case ACK0FIN: + case ACK1FIN: + System.out.print("T! "); + cancelTimeout(this); + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + } + } + + public int reliableSend(byte[] buffer, int offset, int length) { + switch (state) { + case S0: + System.out.print("S0:->ACK0 "); + blockSender(); + blockDisconnect(); + make_packet(0, buffer, offset, length); + state = ACK0; + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + return data_pkt_len - 1; + + case S1: + System.out.print("S1:->ACK1 "); + blockSender(); + blockDisconnect(); + make_packet(1, buffer, offset, length); + state = ACK1; + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + return data_pkt_len - 1; + + case ACK0: + case ACK1: + case ACK0FIN: + case ACK1FIN: + case FIN: + default: + System.err.println("SAWSender.reliableSend: sender should be blocked. Ignoring request"); + return 0; + } + } + + private void goToFinState() { + System.out.print("->FIN "); + blockSender(); + blockDisconnect(); + make_packet(2, null, 0, 0); + state = FIN; + setTimeout(SENDER_TIMEOUT_MS, this); + unreliableSend(data_pkt, 0, data_pkt_len); + } + + public void close() { + switch (state) { + case S0: + System.out.print("S0:"); + goToFinState(); + return; + + case S1: + System.out.print("S1:"); + goToFinState(); + return; + + case ACK0: + System.out.print("ACK0:->ACK0FIN "); + state = ACK0FIN; + return; + + case ACK1: + System.out.print("ACK1:->ACK1FIN "); + state = ACK1FIN; + return; + + case FIN: + case CLOSED: + default: + System.err.println("SAWSender.closeConnection: connection already closed or in closing state. Ignoring request"); + return; + } + } +} diff --git a/hw2/transport.jar b/hw2/transport.jar new file mode 100644 index 0000000..f4b2dbc Binary files /dev/null and b/hw2/transport.jar differ