diff --git a/hw2/.idea/misc.xml b/hw2/.idea/misc.xml
index 37e641e..8f5f4fa 100644
--- a/hw2/.idea/misc.xml
+++ b/hw2/.idea/misc.xml
@@ -1,6 +1,6 @@
-
+
\ No newline at end of file
diff --git a/hw2/ciao.txt b/hw2/ciao.txt
new file mode 100644
index 0000000..a4e075b
--- /dev/null
+++ b/hw2/ciao.txt
@@ -0,0 +1,113 @@
+ Siena (Scalable Internet Event Notification Architectures) is a research project aimed at
+ designing and constructing a generic scalable publish/subscribe event-notification service.
+ The technical basis of Siena is an innovative type of network service called content-based networking.
+
+Overview
+The asynchrony, heterogeneity, and inherent loose coupling that characterize applications in a wide-area
+ network promote event interaction as a natural design abstraction for a growing class of software systems.
+ An emerging building block for such systems is an infrastructure called publish/subscribe event notification
+ service.
+
+Software systems of a significant dimension, especially those that are distributed over a computer network,
+are often engineered by means of the integration of components. A promising approach to support component-based
+software architectures is the so-called event-based style whereby the interaction of components is modeled with
+events. Components emit events to inform other components of a change in their internal state or to request services
+from other components. Upon detecting the occurrence of events, components react by executing some actions and possibly
+ emitting other events. The glue that ties components together in an event-based architecture is an infrastructure that
+ we call event service. The event service registers the interests of components and then dispatches event notifications
+ accordingly. The advantage of using an event service instead of other "classical" integration mechanisms
+ such as direct or remote invocation is that this method increases the degree of de-coupling among components
+ thus eliminating static dependencies and improving interoperability.
+
+We envision a ubiquitous event notification service accessible from every site on a wide-area network and suitable
+for supporting highly distributed applications requiring component interactions ranging in granularity from fine to
+ coarse. Conceptually, the service is implemented as a network of servers that provide access points to clients. Clients
+ use the access points to advertise the information about events that they generate and to publish notifications
+ containing that information. They also use the access points to subscribe for notifications of interest.
+ The service uses the access points to then notify clients by delivering any notifications of interest. Clearly, an
+ event notification service complements other general-purpose middleware services, such as point-to-point and multicast
+ communication mechanisms, by offering a many-to-many communication and integration facility.
+
+Given that the primary purpose of an event notification service is to support notification selection and delivery,
+ the challenge we face in a wide-area setting is maximizing expressiveness in the selection mechanism without sacrificing
+ scalability in the delivery mechanism. Expressiveness refers to the ability of the event notification service to provide
+ a powerful data model with which to capture information about events, to express filters and patterns on notifications
+ of interest, and to use that data model as the basis for optimizing notification delivery. In terms of scalability,
+ we are referring not simply to the number of event generators, the number of event notifications, and the number
+ of notification recipients, but also to the need to discard many of the assumptions made for local-area networks,
+ such as low latency, abundant bandwidth, homogeneous platforms, continuous and reliable connectivity, and
+ centralized control.
+
+
+Intuitively, a simple event notification service that provides no selection mechanism can be reduced to a multicast
+routing and transport mechanism for which there are numerous scalable implementations. However, once the service provides
+ a selection mechanism, then the overall efficiency of the service and its routing of notifications are affected by the
+ power of the language used to construct notifications and to express filters and patterns. As the power of the language
+ increases, so does the complexity of the processing. Thus, in practice, scalability and expressiveness are two
+ conflicting goals that must be traded off.
+
+Siena is an event notification service that we have designed and implemented to maximize both expressiveness and
+scalability. A prototype implementation of Siena is available.
+People
+Siena is primarily the work of Antonio Carzaniga and Alexander L. Wolf. D.S. Rosenblum also made significant
+contributions to the initial design of Siena.
+
+Others also contributed directly to the development of Siena. Among them are John Giacomoni, Mauro Caporuscio,
+Matthew J. Rutherford, Cyrus P. Hall, Yanyan Wang, Giovanni Toffetti, and Amir Malekpour.
+Documents
+Articles are available in Portable Document Format (PDF) or PostScript® format and some of them are compressed
+with gzip. Downloading any one of these documents indicates that you agree to abide by a copyright notice.
+Selected Papers
+
+ Design and Evaluation of a Wide-Area Event Notification Service
+ A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
+ ACM Transactions on Computer Systems, 19(3):332-383, August 2001.
+ Abstract, BibTeX ref., PDF, DOI
+ End-to-End Reliability for Best-Effort Content-Based Publish/Subscribe Networks
+ A. Malekpour, A. Carzaniga, F. Pedone, and G. Toffetti Carughi
+ In Proceedings of the 5th ACM International Conference on Distributed Event-Based Systems (DEBS 2011). New York, New York, July 2011.
+ Abstract, BibTeX ref., PDF, DOI
+ A Routing Scheme for Content-Based Networking
+ A. Carzaniga, M.J. Rutherford, and A.L. Wolf
+ Proceedings of IEEE INFOCOM 2004. Hong Kong, China. March 2004.
+ Abstract, BibTeX ref., PDF, DOI
+ Forwarding in a Content-Based Network
+ A. Carzaniga and A.L. Wolf
+ Proceedings of ACM SIGCOMM 2003. p. 163-174. Karlsruhe, Germany. August 2003.
+ Abstract, BibTeX ref., PDF, DOI
+ Design and Evaluation of a Support Service for Mobile, Wireless Publish/Subscribe Applications
+ M. Caporuscio, A. Carzaniga, and A.L. Wolf
+ IEEE Transactions on Software Engineering, 29(12):1059-1071, December 2003.
+ Abstract, BibTeX ref., PDF, DOI
+ Content-based Networking: A New Communication Infrastructure
+ A. Carzaniga and A.L. Wolf
+ In NSF Workshop on an Infrastructure for Mobile and Wireless Systems. Lecture Notes in Computer Science n. 2538 p. 59-68. Springer-Verlag. Scottsdale, Arizona. October 2001.
+ Abstract, BibTeX ref., PDF
+ Achieving Expressiveness and Scalability in an Internet-Scale Event Notification Service
+ A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
+ Nineteenth ACM Symposium on Principles of Distributed Computing (PODC2000). Portland, Oregon, July 2000.
+ Abstract, BibTeX ref., PDF, DOI
+ Content-Based Addressing and Routing: A General Model and its Application
+ A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
+ Technical Report CU-CS-902-00, Department of Computer Science, University of Colorado, January 2000.
+ Abstract, BibTeX ref., PDF
+ Architectures for an Event Notification Service Scalable to Wide-area Networks
+ A. Carzaniga
+ PhD Thesis. Politecnico di Milano. December, 1998.
+ Abstract, BibTeX ref., PDF
+ A Design Framework for Internet-Scale Event Observation and Notification
+ D.S. Rosenblum and A.L. Wolf
+ 6th European Software Engineering Conference. Lecture Notes in Computer Science 1301, Springer, Berlin, 1997.
+
+Acknowledgments
+This work was supported in part by the Air Force Materiel Command, Rome Laboratory, and the Defense Advanced
+Research Projects Agency under Contract Numbers F30602-94-C-0253, F30602-97-2-0021, F30602-98-2-0163,
+and F30602-99-C-0174; by the Air Force Office of Scientific Research, Air Force Materiel Command, USAF, u
+nder grant number F49620-98-1-0061; and by the National Science Foundation under Grant Number CCR-9701973.
+The content of the information does not necessarily reflect the position or the policy of the US Governmen
+t and no official endorsement should be inferred.
+
+We thank Dennis Heimbigner, Richard S. Hall, and André van der Hoek of the Software Engineering Research Laboratory,
+University of Colorado at Boulder (circa 1997), and Giampaolo Cugola, Elisabetta Di Nitto, and Alfonso Fuggetta of
+ Politecnico di Milano for their contributions to this work.
+this page is maintained by Antonio Carzaniga and was updated on September 15, 2019
\ No newline at end of file
diff --git a/hw2/mamma.txt b/hw2/mamma.txt
new file mode 100644
index 0000000..a4e075b
--- /dev/null
+++ b/hw2/mamma.txt
@@ -0,0 +1,113 @@
+ Siena (Scalable Internet Event Notification Architectures) is a research project aimed at
+ designing and constructing a generic scalable publish/subscribe event-notification service.
+ The technical basis of Siena is an innovative type of network service called content-based networking.
+
+Overview
+The asynchrony, heterogeneity, and inherent loose coupling that characterize applications in a wide-area
+ network promote event interaction as a natural design abstraction for a growing class of software systems.
+ An emerging building block for such systems is an infrastructure called publish/subscribe event notification
+ service.
+
+Software systems of a significant dimension, especially those that are distributed over a computer network,
+are often engineered by means of the integration of components. A promising approach to support component-based
+software architectures is the so-called event-based style whereby the interaction of components is modeled with
+events. Components emit events to inform other components of a change in their internal state or to request services
+from other components. Upon detecting the occurrence of events, components react by executing some actions and possibly
+ emitting other events. The glue that ties components together in an event-based architecture is an infrastructure that
+ we call event service. The event service registers the interests of components and then dispatches event notifications
+ accordingly. The advantage of using an event service instead of other "classical" integration mechanisms
+ such as direct or remote invocation is that this method increases the degree of de-coupling among components
+ thus eliminating static dependencies and improving interoperability.
+
+We envision a ubiquitous event notification service accessible from every site on a wide-area network and suitable
+for supporting highly distributed applications requiring component interactions ranging in granularity from fine to
+ coarse. Conceptually, the service is implemented as a network of servers that provide access points to clients. Clients
+ use the access points to advertise the information about events that they generate and to publish notifications
+ containing that information. They also use the access points to subscribe for notifications of interest.
+ The service uses the access points to then notify clients by delivering any notifications of interest. Clearly, an
+ event notification service complements other general-purpose middleware services, such as point-to-point and multicast
+ communication mechanisms, by offering a many-to-many communication and integration facility.
+
+Given that the primary purpose of an event notification service is to support notification selection and delivery,
+ the challenge we face in a wide-area setting is maximizing expressiveness in the selection mechanism without sacrificing
+ scalability in the delivery mechanism. Expressiveness refers to the ability of the event notification service to provide
+ a powerful data model with which to capture information about events, to express filters and patterns on notifications
+ of interest, and to use that data model as the basis for optimizing notification delivery. In terms of scalability,
+ we are referring not simply to the number of event generators, the number of event notifications, and the number
+ of notification recipients, but also to the need to discard many of the assumptions made for local-area networks,
+ such as low latency, abundant bandwidth, homogeneous platforms, continuous and reliable connectivity, and
+ centralized control.
+
+
+Intuitively, a simple event notification service that provides no selection mechanism can be reduced to a multicast
+routing and transport mechanism for which there are numerous scalable implementations. However, once the service provides
+ a selection mechanism, then the overall efficiency of the service and its routing of notifications are affected by the
+ power of the language used to construct notifications and to express filters and patterns. As the power of the language
+ increases, so does the complexity of the processing. Thus, in practice, scalability and expressiveness are two
+ conflicting goals that must be traded off.
+
+Siena is an event notification service that we have designed and implemented to maximize both expressiveness and
+scalability. A prototype implementation of Siena is available.
+People
+Siena is primarily the work of Antonio Carzaniga and Alexander L. Wolf. D.S. Rosenblum also made significant
+contributions to the initial design of Siena.
+
+Others also contributed directly to the development of Siena. Among them are John Giacomoni, Mauro Caporuscio,
+Matthew J. Rutherford, Cyrus P. Hall, Yanyan Wang, Giovanni Toffetti, and Amir Malekpour.
+Documents
+Articles are available in Portable Document Format (PDF) or PostScript® format and some of them are compressed
+with gzip. Downloading any one of these documents indicates that you agree to abide by a copyright notice.
+Selected Papers
+
+ Design and Evaluation of a Wide-Area Event Notification Service
+ A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
+ ACM Transactions on Computer Systems, 19(3):332-383, August 2001.
+ Abstract, BibTeX ref., PDF, DOI
+ End-to-End Reliability for Best-Effort Content-Based Publish/Subscribe Networks
+ A. Malekpour, A. Carzaniga, F. Pedone, and G. Toffetti Carughi
+ In Proceedings of the 5th ACM International Conference on Distributed Event-Based Systems (DEBS 2011). New York, New York, July 2011.
+ Abstract, BibTeX ref., PDF, DOI
+ A Routing Scheme for Content-Based Networking
+ A. Carzaniga, M.J. Rutherford, and A.L. Wolf
+ Proceedings of IEEE INFOCOM 2004. Hong Kong, China. March 2004.
+ Abstract, BibTeX ref., PDF, DOI
+ Forwarding in a Content-Based Network
+ A. Carzaniga and A.L. Wolf
+ Proceedings of ACM SIGCOMM 2003. p. 163-174. Karlsruhe, Germany. August 2003.
+ Abstract, BibTeX ref., PDF, DOI
+ Design and Evaluation of a Support Service for Mobile, Wireless Publish/Subscribe Applications
+ M. Caporuscio, A. Carzaniga, and A.L. Wolf
+ IEEE Transactions on Software Engineering, 29(12):1059-1071, December 2003.
+ Abstract, BibTeX ref., PDF, DOI
+ Content-based Networking: A New Communication Infrastructure
+ A. Carzaniga and A.L. Wolf
+ In NSF Workshop on an Infrastructure for Mobile and Wireless Systems. Lecture Notes in Computer Science n. 2538 p. 59-68. Springer-Verlag. Scottsdale, Arizona. October 2001.
+ Abstract, BibTeX ref., PDF
+ Achieving Expressiveness and Scalability in an Internet-Scale Event Notification Service
+ A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
+ Nineteenth ACM Symposium on Principles of Distributed Computing (PODC2000). Portland, Oregon, July 2000.
+ Abstract, BibTeX ref., PDF, DOI
+ Content-Based Addressing and Routing: A General Model and its Application
+ A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
+ Technical Report CU-CS-902-00, Department of Computer Science, University of Colorado, January 2000.
+ Abstract, BibTeX ref., PDF
+ Architectures for an Event Notification Service Scalable to Wide-area Networks
+ A. Carzaniga
+ PhD Thesis. Politecnico di Milano. December, 1998.
+ Abstract, BibTeX ref., PDF
+ A Design Framework for Internet-Scale Event Observation and Notification
+ D.S. Rosenblum and A.L. Wolf
+ 6th European Software Engineering Conference. Lecture Notes in Computer Science 1301, Springer, Berlin, 1997.
+
+Acknowledgments
+This work was supported in part by the Air Force Materiel Command, Rome Laboratory, and the Defense Advanced
+Research Projects Agency under Contract Numbers F30602-94-C-0253, F30602-97-2-0021, F30602-98-2-0163,
+and F30602-99-C-0174; by the Air Force Office of Scientific Research, Air Force Materiel Command, USAF, u
+nder grant number F49620-98-1-0061; and by the National Science Foundation under Grant Number CCR-9701973.
+The content of the information does not necessarily reflect the position or the policy of the US Governmen
+t and no official endorsement should be inferred.
+
+We thank Dennis Heimbigner, Richard S. Hall, and André van der Hoek of the Software Engineering Research Laboratory,
+University of Colorado at Boulder (circa 1997), and Giampaolo Cugola, Elisabetta Di Nitto, and Alfonso Fuggetta of
+ Politecnico di Milano for their contributions to this work.
+this page is maintained by Antonio Carzaniga and was updated on September 15, 2019
\ No newline at end of file
diff --git a/hw2/src/GBNTReceiver.java b/hw2/src/impl/GBNTReceiver.java
similarity index 55%
rename from hw2/src/GBNTReceiver.java
rename to hw2/src/impl/GBNTReceiver.java
index 363f781..a45f83d 100644
--- a/hw2/src/GBNTReceiver.java
+++ b/hw2/src/impl/GBNTReceiver.java
@@ -1,3 +1,5 @@
+package impl;
+
import transport.Receiver;
import transport.TimeoutAction;
@@ -5,12 +7,13 @@ public class GBNTReceiver extends Receiver implements TimeoutAction {
private char sequence;
boolean firstPacket = true;
- private static final int RECEIVER_TIMEOUT_MS = 1000;
+ private static final int RECEIVER_TIMEOUT_MS = 5000;
private State state = State.SETUP;
enum State {
SETUP,
- RECEIVING
+ RECEIVING,
+ CLOSED,
}
private static byte[] generateACK(char sequence) {
@@ -21,7 +24,7 @@ public class GBNTReceiver extends Receiver implements TimeoutAction {
}
private static char readBigEndianChar(byte[] src, int offset) {
- return (char) (src[offset] << 8 + src[offset + 1]);
+ return (char) (((src[offset] << 8) & 0xFF00) | (src[offset + 1] & 0xFF));
}
private static char incrementAndRollover(char c) {
@@ -30,8 +33,11 @@ public class GBNTReceiver extends Receiver implements TimeoutAction {
@Override
public void timeoutAction() {
+ System.out.println("Disconnect timeout triggered");
try {
disconnect();
+ deliver(null, 0, END_OF_STREAM);
+ state = State.CLOSED;
} catch (java.lang.InterruptedException ex) {
System.out.println("Thread interrupted. Exiting directly.");
System.exit(0);
@@ -40,31 +46,45 @@ public class GBNTReceiver extends Receiver implements TimeoutAction {
@Override
protected void unreliableReceive(byte[] bytes, int offset, int length) {
+ if (state == State.CLOSED) return;
+ //System.out.println("State: " + state + " receiving: " + length);
+ cancelTimeout(this);
+ setTimeout(RECEIVER_TIMEOUT_MS, this);
switch(this.state) {
case SETUP:
if (length != 2) {
- if (firstPacket) return;
- else {
+ if (!firstPacket) {
state = State.RECEIVING;
this.unreliableReceive(bytes, offset, length);
+ return;
}
+ return;
}
- sequence = readBigEndianChar(bytes, 0);
+ sequence = readBigEndianChar(bytes, offset);
+ System.out.println("Synchronization ACK: " + (int) sequence);
unreliableSend(bytes, offset, 2);
firstPacket = false;
break;
case RECEIVING:
if (length == 2) { // if packet is a close packet
- if (incrementAndRollover(this.sequence) != readBigEndianChar(bytes, offset)) return; // ignore if not synchronized
+ char a = readBigEndianChar(bytes, offset);
+ if (incrementAndRollover(this.sequence) != a) return; // ignore if not synchronized
this.unreliableSend(bytes, offset, 2);
+ System.out.println("Received valid FIN packet");
timeoutAction();
}
if (length < 3) return;
- char seq = readBigEndianChar(bytes, 0);
- if (seq != incrementAndRollover(sequence)) return; //drop the packet
+ char seq = readBigEndianChar(bytes, offset);
+ System.out.print("Read sequence: " + (int) seq + " -> ");
+
+ if (seq != incrementAndRollover(sequence)) {
+ System.out.println("Dropping");
+ return;
+ } //drop the packet
else {
- this.unreliableReceive(bytes, offset + 2, length);
+ this.deliver(bytes, offset + 2, length - 2);
sequence = incrementAndRollover(sequence);
+ System.out.println("Sending ack: "+ (int) sequence);
this.unreliableSend(generateACK(sequence), 0, 2);
}
break;
diff --git a/hw2/src/GBNTSender.java b/hw2/src/impl/GBNTSender.java
similarity index 69%
rename from hw2/src/GBNTSender.java
rename to hw2/src/impl/GBNTSender.java
index 612c26c..72192a8 100644
--- a/hw2/src/GBNTSender.java
+++ b/hw2/src/impl/GBNTSender.java
@@ -1,6 +1,9 @@
+package impl;
+
import transport.TimeoutAction;
import java.util.*;
+import java.util.concurrent.Semaphore;
public class GBNTSender extends transport.Sender implements TimeoutAction {
@@ -10,7 +13,7 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
}
private static char readBigEndianChar(byte[] src, int offset) {
- return (char) (src[offset] << 8 + src[offset + 1]);
+ return (char) (((src[offset] << 8) & 0xFF00) | (src[offset + 1] & 0xFF));
}
private static char addAndRollover(char c, char i) {
@@ -22,6 +25,9 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
}
private final Map packetMap = new HashMap<>();
+ private Semaphore s = new Semaphore(0);
+
+ private static final int MAX_PACKET_SIZE = 128;
private class Packet {
private final byte[] contents;
@@ -31,16 +37,16 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
Packet(char sequence, byte[] buffer, int offset, int length) {
this.sequence = sequence;
- this.length = Math.min(length + 2, MAX_PACKET_SIZE);
- contents = new byte[MAX_PACKET_SIZE];
+ this.length = Math.min(length, MAX_PACKET_SIZE - 2);
+ contents = new byte[this.length + 2];
writeBigEndianChar(contents, sequence);
- System.arraycopy(buffer, offset, this.contents, 0, this.length);
+ System.arraycopy(buffer, offset + 2 - 2, contents, 2, contents.length - 2);
packetMap.put(this.sequence, this);
}
void send() {
estimateStart = System.currentTimeMillis();
- unreliableSend(contents, 0, length);
+ unreliableSend(contents, 0, contents.length);
}
void destroy() {
@@ -62,8 +68,8 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
private char base = (char) (new Random().nextInt() % 65536);
private char waitingACK = 0;
private int timeoutMs = 500;
- private double rttEWMA = 0;
- private double devRttEWMA = 0;
+ private double rttEWMA = 100;
+ private double devRttEWMA = 100;
private State state = State.SETUP;
private final Queue packets = new ArrayDeque<>();
@@ -74,14 +80,11 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
CLOSING,
}
- public GBNTSender() {
- sendSetup();
- }
-
private void computeTimeoutLength(long rttMIllis) {
devRttEWMA = 0.75 * devRttEWMA + 0.25 * (rttEWMA - rttMIllis);
rttEWMA = 0.875 * rttEWMA + 0.125 * rttMIllis;
timeoutMs = (int) (rttEWMA + 4 * devRttEWMA);
+ //System.out.println("devRTT: " + devRttEWMA + " RTT: " + rttEWMA + " timeout: " + timeoutMs);
}
private void flushAckedPackets() {
@@ -109,15 +112,20 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
@Override
protected int reliableSend(byte[] bytes, int offset, int length) {
- if (state != State.SENDING) throw new IllegalStateException();
+ if (state == State.SETUP) {
+ sendSetup();
+ return 0;
+ }
Packet p = new Packet(addAndRollover(base, waitingACK), bytes, offset, length);
waitingACK++;
packets.add(p);
+ System.out.println("Sending seq: " + (int) p.sequence + " - (" + p.length + " + 2 bytes)");
p.send();
setTimeout(timeoutMs, this);
if (waitingACK >= W) {
+ System.out.println("Window full " + (int) p.sequence);
blockSender();
}
@@ -127,16 +135,27 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
@Override
public void close() {
state = State.CLOSING;
+ System.out.print("FIN packet: ");
+ reliableSend(new byte[0], 0, 0);
if (waitingACK > 0) {
blockSender();
+ try {
+ s.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ System.exit(255);
+ }
}
}
@Override
public void timeoutAction() {
+ if (state == State.SENDING)
for (Packet p : packets) {
p.send();
}
+ else if (state == State.SETUP)
+ sendSetup();
}
@Override
@@ -147,24 +166,34 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
if (bytes[offset] == expected[0] && bytes[1] == expected[offset + 1]) {
state = State.SENDING;
base = incrementAndRollover(base);
+ System.out.println("Synchronization successful");
resumeSender();
} else {
+ System.out.println("Synchronization seq: " + (int) base);
sendSetup();
}
break;
case SENDING:
case CLOSING:
char ackSeq = readBigEndianChar(bytes, offset);
+ System.out.println("Received ACK: " + (int) ackSeq);
cancelTimeout(this);
- computeTimeoutLength(packetMap.get(ackSeq).getRTT());
+ Packet p = packetMap.get(ackSeq);
+ if (p != null) computeTimeoutLength(p.getRTT());
boolean mustResume = waitingACK == W && state == State.SENDING;
- waitingACK -= (ackSeq - base);
+
+ waitingACK -= (ackSeq + 1 - base);
base = incrementAndRollover(ackSeq);
flushAckedPackets();
+
if (mustResume || (waitingACK == 0 && state == State.CLOSING)) {
resumeSender();
+ if (state == State.CLOSING) {
+ System.out.println("Closing");
+ s.release();
+ }
}
}
}
diff --git a/hw2/src/transport/FileReceiver.java b/hw2/src/transport/FileReceiver.java
new file mode 100644
index 0000000..5e18e31
--- /dev/null
+++ b/hw2/src/transport/FileReceiver.java
@@ -0,0 +1,42 @@
+package transport;
+
+import impl.GBNTReceiver;
+
+import java.io.FileOutputStream;
+import java.net.InetAddress;
+
+public class FileReceiver {
+
+ public static void main(String[] var0) throws Exception {
+ int var1 = 0;
+ switch(var0.length) {
+ case 6:
+ var1 = Integer.parseInt(var0[5]);
+ case 5:
+ String var6 = var0[0];
+ int var2 = Integer.parseInt(var0[1]);
+ String var3 = var0[2];
+ int var4 = Integer.parseInt(var0[3]);
+ String var5 = var0[4];
+ //Class var7 = ClassLoader.getSystemClassLoader().loadClass(var6);
+ Class> var7 = GBNTReceiver.class;
+ Receiver var8 = (Receiver)var7.newInstance();
+ var8.setErrorPercentage((double)var1);
+ var8.connect(var2, InetAddress.getByName(var3), var4);
+ byte[] var9 = new byte[1024];
+ FileOutputStream var11 = new FileOutputStream(var5);
+
+ int var10;
+ while((var10 = var8.receive(var9, 0, 1024)) > 0) {
+ var11.write(var9, 0, var10);
+ }
+
+ var11.close();
+ var11 = null;
+ var8.disconnect();
+ return;
+ default:
+ System.err.println("Usage: FileReceiver [error%]");
+ }
+ }
+}
diff --git a/hw2/src/transport/FileSender.java b/hw2/src/transport/FileSender.java
new file mode 100644
index 0000000..f29de1e
--- /dev/null
+++ b/hw2/src/transport/FileSender.java
@@ -0,0 +1,53 @@
+package transport;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.InetAddress;
+import impl.GBNTSender;
+
+public class FileSender {
+
+ public static void main(String[] var0) throws Exception {
+ int var1 = 0;
+ switch(var0.length) {
+ case 6:
+ var1 = Integer.parseInt(var0[5]);
+ case 5:
+ String var6 = var0[0];
+ int var2 = Integer.parseInt(var0[1]);
+ String var3 = var0[2];
+ int var4 = Integer.parseInt(var0[3]);
+ String var5 = var0[4];
+ //Class var7 = ClassLoader.getSystemClassLoader().loadClass(var6);
+ Class> var7 = GBNTSender.class;
+ Sender var8 = (Sender)var7.newInstance();
+ var8.setErrorPercentage((double)var1);
+ var8.connect(var2, InetAddress.getByName(var3), var4);
+ File var9 = new File(var5);
+
+ try {
+ FileInputStream var10 = new FileInputStream(var9);
+ byte[] var11 = new byte[4000];
+
+ int var12;
+ int var14;
+ while((var12 = var10.read(var11)) != -1) {
+ for(int var13 = 0; var12 > 0; var13 += var14) {
+ var14 = var8.send(var11, var13, var12);
+ var12 -= var14;
+ }
+ }
+
+ var10.close();
+ } catch (Exception var15) {
+ var15.printStackTrace();
+ }
+
+ var8.close();
+ var8.disconnect();
+ return;
+ default:
+ System.err.println("Usage: FileSender [error%]");
+ }
+ }
+}
diff --git a/hw2/src/transport/MyTimer.java b/hw2/src/transport/MyTimer.java
new file mode 100644
index 0000000..fbcdc58
--- /dev/null
+++ b/hw2/src/transport/MyTimer.java
@@ -0,0 +1,81 @@
+package transport;
+
+import java.util.Vector;
+
+class MyTimer implements Runnable {
+ Object monitor;
+ Vector actions;
+ boolean active;
+
+ public MyTimer(Object var1) {
+ this.monitor = var1;
+ this.actions = new Vector();
+ this.active = true;
+ Thread var2 = new Thread(this);
+ var2.setDaemon(true);
+ var2.start();
+ }
+
+ public synchronized void stop() {
+ this.active = false;
+ }
+
+ public synchronized void schedule(TimeoutAction var1, long var2) {
+ long var4 = System.currentTimeMillis() + var2;
+
+ int var6;
+ for(var6 = 0; var6 < this.actions.size() && ((ScheduledAction)this.actions.elementAt(var6)).time < var4; ++var6) {
+ }
+
+ this.actions.insertElementAt(new ScheduledAction(var4, var1), var6);
+ this.notify();
+ }
+
+ synchronized TimeoutAction getFirst() throws InterruptedException {
+ while(true) {
+ if (this.active) {
+ if (this.actions.isEmpty()) {
+ this.wait();
+ continue;
+ }
+
+ long var1 = ((ScheduledAction)this.actions.elementAt(0)).time - System.currentTimeMillis();
+ if (var1 > 0L) {
+ this.wait(var1);
+ continue;
+ }
+
+ return ((ScheduledAction)this.actions.remove(0)).action;
+ }
+
+ return null;
+ }
+ }
+
+ public synchronized void cancel(TimeoutAction var1) {
+ for(int var2 = 0; var2 < this.actions.size(); ++var2) {
+ if (((ScheduledAction)this.actions.elementAt(var2)).action == var1) {
+ this.actions.remove(var2);
+ break;
+ }
+ }
+
+ }
+
+ public void run() {
+ try {
+ while(true) {
+ TimeoutAction var1 = this.getFirst();
+ if (var1 == null) {
+ return;
+ }
+
+ synchronized(this.monitor) {
+ var1.timeoutAction();
+ }
+ }
+ } catch (Exception var5) {
+ System.err.println("MyTimer: " + var5);
+ }
+ }
+}
diff --git a/hw2/src/transport/Receiver.java b/hw2/src/transport/Receiver.java
new file mode 100644
index 0000000..9e74f3b
--- /dev/null
+++ b/hw2/src/transport/Receiver.java
@@ -0,0 +1,65 @@
+package transport;
+
+public abstract class Receiver extends Transport {
+ private boolean writing_allowed = true;
+ private byte[] rcv_buffer = new byte[1024];
+ private int rcv_offset;
+ private int rcv_length;
+
+ public Receiver() {
+ }
+
+ public final synchronized int receive(byte[] var1, int var2, int var3) throws InterruptedException {
+ while(this.rcv_length == 0) {
+ this.wait();
+ }
+
+ if (this.rcv_length < 0) {
+ return this.rcv_length;
+ } else if (this.rcv_length > var3) {
+ for(int var4 = 0; var4 < var3; ++var4) {
+ var1[var2 + var4] = this.rcv_buffer[this.rcv_offset + var4];
+ }
+
+ this.rcv_offset += var3;
+ this.rcv_length -= var3;
+ this.notifyAll();
+ return var3;
+ } else {
+ var3 = this.rcv_length;
+
+ do {
+ --this.rcv_length;
+ var1[var2 + this.rcv_length] = this.rcv_buffer[this.rcv_offset + this.rcv_length];
+ } while(this.rcv_length > 0);
+
+ this.notify();
+ return var3;
+ }
+ }
+
+ protected final void deliver(byte[] bytes, int offset, int length) {
+ if (length == 0) {
+ System.err.println("Receiver: error: deliver was called with zero length.");
+ } else {
+ synchronized(this) {
+ try {
+ while(this.rcv_length != 0) {
+ this.wait();
+ }
+
+ for(int i = 0; i < length; ++i) {
+ this.rcv_buffer[i] = bytes[i + offset];
+ }
+
+ this.rcv_offset = 0;
+ this.rcv_length = length;
+ this.notifyAll();
+ } catch (InterruptedException ignored) {
+ System.err.println("Receiver.deliver: warning: interrupted while delivering to application. Some data might be lost.");
+ }
+
+ }
+ }
+ }
+}
diff --git a/hw2/src/transport/ScheduledAction.java b/hw2/src/transport/ScheduledAction.java
new file mode 100644
index 0000000..f5afc15
--- /dev/null
+++ b/hw2/src/transport/ScheduledAction.java
@@ -0,0 +1,11 @@
+package transport;
+
+class ScheduledAction {
+ long time;
+ TimeoutAction action;
+
+ public ScheduledAction(long var1, TimeoutAction var3) {
+ this.time = var1;
+ this.action = var3;
+ }
+}
diff --git a/hw2/src/transport/Sender.java b/hw2/src/transport/Sender.java
new file mode 100644
index 0000000..2847fcc
--- /dev/null
+++ b/hw2/src/transport/Sender.java
@@ -0,0 +1,34 @@
+//
+// Source code recreated from a .class file by IntelliJ IDEA
+// (powered by Fernflower decompiler)
+//
+
+package transport;
+
+public abstract class Sender extends Transport {
+ private boolean sending_allowed = true;
+
+ public Sender() {
+ }
+
+ public final synchronized int send(byte[] var1, int var2, int var3) throws InterruptedException {
+ while(!this.sending_allowed) {
+ this.wait();
+ }
+
+ return this.reliableSend(var1, var2, var3);
+ }
+
+ protected final synchronized void blockSender() {
+ this.sending_allowed = false;
+ }
+
+ protected final synchronized void resumeSender() {
+ this.sending_allowed = true;
+ this.notifyAll();
+ }
+
+ protected abstract int reliableSend(byte[] var1, int var2, int var3);
+
+ public abstract void close();
+}
diff --git a/hw2/src/transport/TimeoutAction.java b/hw2/src/transport/TimeoutAction.java
new file mode 100644
index 0000000..055f43e
--- /dev/null
+++ b/hw2/src/transport/TimeoutAction.java
@@ -0,0 +1,5 @@
+package transport;
+
+public interface TimeoutAction {
+ void timeoutAction();
+}
diff --git a/hw2/src/transport/Transport.java b/hw2/src/transport/Transport.java
new file mode 100644
index 0000000..3fa7779
--- /dev/null
+++ b/hw2/src/transport/Transport.java
@@ -0,0 +1,118 @@
+package transport;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.util.Random;
+
+public abstract class Transport implements Runnable {
+ private DatagramSocket socket = null;
+ private DatagramPacket rcv_packet = null;
+ private DatagramPacket snd_packet = null;
+ private static Random generator = new Random();
+ private double error = 0.0D;
+ private boolean disconnect_allowed = true;
+ private MyTimer timer = new MyTimer(this);
+ private Thread listener;
+ public static final int MAX_PACKET_SIZE = 1024;
+ public static final int END_OF_STREAM = -1;
+
+ public Transport() {
+ }
+
+ public final synchronized void connect(int var1, InetAddress var2, int var3) throws IllegalArgumentException, SocketException {
+ this.socket = new DatagramSocket(new InetSocketAddress(var1));
+ this.socket.connect(var2, var3);
+ this.socket.setSoTimeout(60000);
+ if (this.snd_packet == null) {
+ this.snd_packet = new DatagramPacket(new byte[1024], 1024, var2, var3);
+ } else {
+ this.snd_packet.setAddress(var2);
+ this.snd_packet.setPort(var3);
+ }
+
+ if (this.rcv_packet == null) {
+ this.rcv_packet = new DatagramPacket(new byte[1024], 1024);
+ }
+
+ if (this.listener == null) {
+ this.listener = new Thread(this);
+ this.listener.setDaemon(true);
+ this.listener.start();
+ }
+
+ }
+
+ protected final synchronized void allowDisconnect() {
+ this.disconnect_allowed = true;
+ this.notifyAll();
+ }
+
+ protected final synchronized void blockDisconnect() {
+ this.disconnect_allowed = false;
+ }
+
+ public final synchronized void disconnect() throws InterruptedException {
+ while(!this.disconnect_allowed) {
+ this.wait();
+ }
+
+ if (this.socket != null) {
+ this.socket.close();
+ this.listener.interrupt();
+ this.socket = null;
+ this.listener = null;
+ this.timer.stop();
+ }
+ }
+
+ protected abstract void unreliableReceive(byte[] var1, int var2, int var3);
+
+ protected final void unreliableSend(byte[] var1, int var2, int var3) {
+ if (generator.nextDouble() >= this.error) {
+ try {
+ this.snd_packet.setData(var1, var2, var3);
+ this.socket.send(this.snd_packet);
+ } catch (Exception var5) {
+ System.err.println("unreliableSend: error sending packet: " + var5);
+ var5.printStackTrace();
+ }
+ }
+ }
+
+ public final void setErrorPercentage(double var1) {
+ this.error = var1 / 100.0D;
+ }
+
+ public final synchronized void setTimeout(long var1, TimeoutAction var3) {
+ this.timer.schedule(var3, var1);
+ }
+
+ public final synchronized void cancelTimeout(TimeoutAction var1) {
+ this.timer.cancel(var1);
+ }
+
+ public final void run() {
+ while(this.socket != null) {
+ try {
+ this.socket.receive(this.rcv_packet);
+ synchronized(this) {
+ this.unreliableReceive(this.rcv_packet.getData(), this.rcv_packet.getOffset(), this.rcv_packet.getLength());
+ }
+ } catch (IOException var6) {
+ synchronized(this) {
+ if (this.socket == null) {
+ return;
+ }
+ }
+
+ System.err.println("run: error receiving data from the network: " + var6);
+ System.err.println("Terminating.");
+ return;
+ }
+ }
+ }
+}
diff --git a/hw2/transport.jar b/hw2/transport.jar
deleted file mode 100644
index f4b2dbc..0000000
Binary files a/hw2/transport.jar and /dev/null differ