hw2: done

This commit is contained in:
Claudio Maggioni (maggicl) 2020-05-08 14:25:09 +02:00
parent ea830e85bd
commit 059c627b29
14 changed files with 709 additions and 25 deletions

View file

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ProjectRootManager" version="2" languageLevel="JDK_13" project-jdk-name="13" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_13" default="false" project-jdk-name="13" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>

113
hw2/ciao.txt Normal file
View file

@ -0,0 +1,113 @@
Siena (Scalable Internet Event Notification Architectures) is a research project aimed at
designing and constructing a generic scalable publish/subscribe event-notification service.
The technical basis of Siena is an innovative type of network service called content-based networking.
Overview
The asynchrony, heterogeneity, and inherent loose coupling that characterize applications in a wide-area
network promote event interaction as a natural design abstraction for a growing class of software systems.
An emerging building block for such systems is an infrastructure called publish/subscribe event notification
service.
Software systems of a significant dimension, especially those that are distributed over a computer network,
are often engineered by means of the integration of components. A promising approach to support component-based
software architectures is the so-called event-based style whereby the interaction of components is modeled with
events. Components emit events to inform other components of a change in their internal state or to request services
from other components. Upon detecting the occurrence of events, components react by executing some actions and possibly
emitting other events. The glue that ties components together in an event-based architecture is an infrastructure that
we call event service. The event service registers the interests of components and then dispatches event notifications
accordingly. The advantage of using an event service instead of other "classical" integration mechanisms
such as direct or remote invocation is that this method increases the degree of de-coupling among components
thus eliminating static dependencies and improving interoperability.
We envision a ubiquitous event notification service accessible from every site on a wide-area network and suitable
for supporting highly distributed applications requiring component interactions ranging in granularity from fine to
coarse. Conceptually, the service is implemented as a network of servers that provide access points to clients. Clients
use the access points to advertise the information about events that they generate and to publish notifications
containing that information. They also use the access points to subscribe for notifications of interest.
The service uses the access points to then notify clients by delivering any notifications of interest. Clearly, an
event notification service complements other general-purpose middleware services, such as point-to-point and multicast
communication mechanisms, by offering a many-to-many communication and integration facility.
Given that the primary purpose of an event notification service is to support notification selection and delivery,
the challenge we face in a wide-area setting is maximizing expressiveness in the selection mechanism without sacrificing
scalability in the delivery mechanism. Expressiveness refers to the ability of the event notification service to provide
a powerful data model with which to capture information about events, to express filters and patterns on notifications
of interest, and to use that data model as the basis for optimizing notification delivery. In terms of scalability,
we are referring not simply to the number of event generators, the number of event notifications, and the number
of notification recipients, but also to the need to discard many of the assumptions made for local-area networks,
such as low latency, abundant bandwidth, homogeneous platforms, continuous and reliable connectivity, and
centralized control.
Intuitively, a simple event notification service that provides no selection mechanism can be reduced to a multicast
routing and transport mechanism for which there are numerous scalable implementations. However, once the service provides
a selection mechanism, then the overall efficiency of the service and its routing of notifications are affected by the
power of the language used to construct notifications and to express filters and patterns. As the power of the language
increases, so does the complexity of the processing. Thus, in practice, scalability and expressiveness are two
conflicting goals that must be traded off.
Siena is an event notification service that we have designed and implemented to maximize both expressiveness and
scalability. A prototype implementation of Siena is available.
People
Siena is primarily the work of Antonio Carzaniga and Alexander L. Wolf. D.S. Rosenblum also made significant
contributions to the initial design of Siena.
Others also contributed directly to the development of Siena. Among them are John Giacomoni, Mauro Caporuscio,
Matthew J. Rutherford, Cyrus P. Hall, Yanyan Wang, Giovanni Toffetti, and Amir Malekpour.
Documents
Articles are available in Portable Document Format (PDF) or PostScript® format and some of them are compressed
with gzip. Downloading any one of these documents indicates that you agree to abide by a copyright notice.
Selected Papers
Design and Evaluation of a Wide-Area Event Notification Service
A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
ACM Transactions on Computer Systems, 19(3):332-383, August 2001.
Abstract, BibTeX ref., PDF, DOI
End-to-End Reliability for Best-Effort Content-Based Publish/Subscribe Networks
A. Malekpour, A. Carzaniga, F. Pedone, and G. Toffetti Carughi
In Proceedings of the 5th ACM International Conference on Distributed Event-Based Systems (DEBS 2011). New York, New York, July 2011.
Abstract, BibTeX ref., PDF, DOI
A Routing Scheme for Content-Based Networking
A. Carzaniga, M.J. Rutherford, and A.L. Wolf
Proceedings of IEEE INFOCOM 2004. Hong Kong, China. March 2004.
Abstract, BibTeX ref., PDF, DOI
Forwarding in a Content-Based Network
A. Carzaniga and A.L. Wolf
Proceedings of ACM SIGCOMM 2003. p. 163-174. Karlsruhe, Germany. August 2003.
Abstract, BibTeX ref., PDF, DOI
Design and Evaluation of a Support Service for Mobile, Wireless Publish/Subscribe Applications
M. Caporuscio, A. Carzaniga, and A.L. Wolf
IEEE Transactions on Software Engineering, 29(12):1059-1071, December 2003.
Abstract, BibTeX ref., PDF, DOI
Content-based Networking: A New Communication Infrastructure
A. Carzaniga and A.L. Wolf
In NSF Workshop on an Infrastructure for Mobile and Wireless Systems. Lecture Notes in Computer Science n. 2538 p. 59-68. Springer-Verlag. Scottsdale, Arizona. October 2001.
Abstract, BibTeX ref., PDF
Achieving Expressiveness and Scalability in an Internet-Scale Event Notification Service
A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
Nineteenth ACM Symposium on Principles of Distributed Computing (PODC2000). Portland, Oregon, July 2000.
Abstract, BibTeX ref., PDF, DOI
Content-Based Addressing and Routing: A General Model and its Application
A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
Technical Report CU-CS-902-00, Department of Computer Science, University of Colorado, January 2000.
Abstract, BibTeX ref., PDF
Architectures for an Event Notification Service Scalable to Wide-area Networks
A. Carzaniga
PhD Thesis. Politecnico di Milano. December, 1998.
Abstract, BibTeX ref., PDF
A Design Framework for Internet-Scale Event Observation and Notification
D.S. Rosenblum and A.L. Wolf
6th European Software Engineering Conference. Lecture Notes in Computer Science 1301, Springer, Berlin, 1997.
Acknowledgments
This work was supported in part by the Air Force Materiel Command, Rome Laboratory, and the Defense Advanced
Research Projects Agency under Contract Numbers F30602-94-C-0253, F30602-97-2-0021, F30602-98-2-0163,
and F30602-99-C-0174; by the Air Force Office of Scientific Research, Air Force Materiel Command, USAF, u
nder grant number F49620-98-1-0061; and by the National Science Foundation under Grant Number CCR-9701973.
The content of the information does not necessarily reflect the position or the policy of the US Governmen
t and no official endorsement should be inferred.
We thank Dennis Heimbigner, Richard S. Hall, and André van der Hoek of the Software Engineering Research Laboratory,
University of Colorado at Boulder (circa 1997), and Giampaolo Cugola, Elisabetta Di Nitto, and Alfonso Fuggetta of
Politecnico di Milano for their contributions to this work.
this page is maintained by Antonio Carzaniga and was updated on September 15, 2019

113
hw2/mamma.txt Normal file
View file

@ -0,0 +1,113 @@
Siena (Scalable Internet Event Notification Architectures) is a research project aimed at
designing and constructing a generic scalable publish/subscribe event-notification service.
The technical basis of Siena is an innovative type of network service called content-based networking.
Overview
The asynchrony, heterogeneity, and inherent loose coupling that characterize applications in a wide-area
network promote event interaction as a natural design abstraction for a growing class of software systems.
An emerging building block for such systems is an infrastructure called publish/subscribe event notification
service.
Software systems of a significant dimension, especially those that are distributed over a computer network,
are often engineered by means of the integration of components. A promising approach to support component-based
software architectures is the so-called event-based style whereby the interaction of components is modeled with
events. Components emit events to inform other components of a change in their internal state or to request services
from other components. Upon detecting the occurrence of events, components react by executing some actions and possibly
emitting other events. The glue that ties components together in an event-based architecture is an infrastructure that
we call event service. The event service registers the interests of components and then dispatches event notifications
accordingly. The advantage of using an event service instead of other "classical" integration mechanisms
such as direct or remote invocation is that this method increases the degree of de-coupling among components
thus eliminating static dependencies and improving interoperability.
We envision a ubiquitous event notification service accessible from every site on a wide-area network and suitable
for supporting highly distributed applications requiring component interactions ranging in granularity from fine to
coarse. Conceptually, the service is implemented as a network of servers that provide access points to clients. Clients
use the access points to advertise the information about events that they generate and to publish notifications
containing that information. They also use the access points to subscribe for notifications of interest.
The service uses the access points to then notify clients by delivering any notifications of interest. Clearly, an
event notification service complements other general-purpose middleware services, such as point-to-point and multicast
communication mechanisms, by offering a many-to-many communication and integration facility.
Given that the primary purpose of an event notification service is to support notification selection and delivery,
the challenge we face in a wide-area setting is maximizing expressiveness in the selection mechanism without sacrificing
scalability in the delivery mechanism. Expressiveness refers to the ability of the event notification service to provide
a powerful data model with which to capture information about events, to express filters and patterns on notifications
of interest, and to use that data model as the basis for optimizing notification delivery. In terms of scalability,
we are referring not simply to the number of event generators, the number of event notifications, and the number
of notification recipients, but also to the need to discard many of the assumptions made for local-area networks,
such as low latency, abundant bandwidth, homogeneous platforms, continuous and reliable connectivity, and
centralized control.
Intuitively, a simple event notification service that provides no selection mechanism can be reduced to a multicast
routing and transport mechanism for which there are numerous scalable implementations. However, once the service provides
a selection mechanism, then the overall efficiency of the service and its routing of notifications are affected by the
power of the language used to construct notifications and to express filters and patterns. As the power of the language
increases, so does the complexity of the processing. Thus, in practice, scalability and expressiveness are two
conflicting goals that must be traded off.
Siena is an event notification service that we have designed and implemented to maximize both expressiveness and
scalability. A prototype implementation of Siena is available.
People
Siena is primarily the work of Antonio Carzaniga and Alexander L. Wolf. D.S. Rosenblum also made significant
contributions to the initial design of Siena.
Others also contributed directly to the development of Siena. Among them are John Giacomoni, Mauro Caporuscio,
Matthew J. Rutherford, Cyrus P. Hall, Yanyan Wang, Giovanni Toffetti, and Amir Malekpour.
Documents
Articles are available in Portable Document Format (PDF) or PostScript® format and some of them are compressed
with gzip. Downloading any one of these documents indicates that you agree to abide by a copyright notice.
Selected Papers
Design and Evaluation of a Wide-Area Event Notification Service
A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
ACM Transactions on Computer Systems, 19(3):332-383, August 2001.
Abstract, BibTeX ref., PDF, DOI
End-to-End Reliability for Best-Effort Content-Based Publish/Subscribe Networks
A. Malekpour, A. Carzaniga, F. Pedone, and G. Toffetti Carughi
In Proceedings of the 5th ACM International Conference on Distributed Event-Based Systems (DEBS 2011). New York, New York, July 2011.
Abstract, BibTeX ref., PDF, DOI
A Routing Scheme for Content-Based Networking
A. Carzaniga, M.J. Rutherford, and A.L. Wolf
Proceedings of IEEE INFOCOM 2004. Hong Kong, China. March 2004.
Abstract, BibTeX ref., PDF, DOI
Forwarding in a Content-Based Network
A. Carzaniga and A.L. Wolf
Proceedings of ACM SIGCOMM 2003. p. 163-174. Karlsruhe, Germany. August 2003.
Abstract, BibTeX ref., PDF, DOI
Design and Evaluation of a Support Service for Mobile, Wireless Publish/Subscribe Applications
M. Caporuscio, A. Carzaniga, and A.L. Wolf
IEEE Transactions on Software Engineering, 29(12):1059-1071, December 2003.
Abstract, BibTeX ref., PDF, DOI
Content-based Networking: A New Communication Infrastructure
A. Carzaniga and A.L. Wolf
In NSF Workshop on an Infrastructure for Mobile and Wireless Systems. Lecture Notes in Computer Science n. 2538 p. 59-68. Springer-Verlag. Scottsdale, Arizona. October 2001.
Abstract, BibTeX ref., PDF
Achieving Expressiveness and Scalability in an Internet-Scale Event Notification Service
A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
Nineteenth ACM Symposium on Principles of Distributed Computing (PODC2000). Portland, Oregon, July 2000.
Abstract, BibTeX ref., PDF, DOI
Content-Based Addressing and Routing: A General Model and its Application
A. Carzaniga, D.S. Rosenblum, and A.L. Wolf
Technical Report CU-CS-902-00, Department of Computer Science, University of Colorado, January 2000.
Abstract, BibTeX ref., PDF
Architectures for an Event Notification Service Scalable to Wide-area Networks
A. Carzaniga
PhD Thesis. Politecnico di Milano. December, 1998.
Abstract, BibTeX ref., PDF
A Design Framework for Internet-Scale Event Observation and Notification
D.S. Rosenblum and A.L. Wolf
6th European Software Engineering Conference. Lecture Notes in Computer Science 1301, Springer, Berlin, 1997.
Acknowledgments
This work was supported in part by the Air Force Materiel Command, Rome Laboratory, and the Defense Advanced
Research Projects Agency under Contract Numbers F30602-94-C-0253, F30602-97-2-0021, F30602-98-2-0163,
and F30602-99-C-0174; by the Air Force Office of Scientific Research, Air Force Materiel Command, USAF, u
nder grant number F49620-98-1-0061; and by the National Science Foundation under Grant Number CCR-9701973.
The content of the information does not necessarily reflect the position or the policy of the US Governmen
t and no official endorsement should be inferred.
We thank Dennis Heimbigner, Richard S. Hall, and André van der Hoek of the Software Engineering Research Laboratory,
University of Colorado at Boulder (circa 1997), and Giampaolo Cugola, Elisabetta Di Nitto, and Alfonso Fuggetta of
Politecnico di Milano for their contributions to this work.
this page is maintained by Antonio Carzaniga and was updated on September 15, 2019

View file

@ -1,3 +1,5 @@
package impl;
import transport.Receiver; import transport.Receiver;
import transport.TimeoutAction; import transport.TimeoutAction;
@ -5,12 +7,13 @@ public class GBNTReceiver extends Receiver implements TimeoutAction {
private char sequence; private char sequence;
boolean firstPacket = true; boolean firstPacket = true;
private static final int RECEIVER_TIMEOUT_MS = 1000; private static final int RECEIVER_TIMEOUT_MS = 5000;
private State state = State.SETUP; private State state = State.SETUP;
enum State { enum State {
SETUP, SETUP,
RECEIVING RECEIVING,
CLOSED,
} }
private static byte[] generateACK(char sequence) { private static byte[] generateACK(char sequence) {
@ -21,7 +24,7 @@ public class GBNTReceiver extends Receiver implements TimeoutAction {
} }
private static char readBigEndianChar(byte[] src, int offset) { private static char readBigEndianChar(byte[] src, int offset) {
return (char) (src[offset] << 8 + src[offset + 1]); return (char) (((src[offset] << 8) & 0xFF00) | (src[offset + 1] & 0xFF));
} }
private static char incrementAndRollover(char c) { private static char incrementAndRollover(char c) {
@ -30,8 +33,11 @@ public class GBNTReceiver extends Receiver implements TimeoutAction {
@Override @Override
public void timeoutAction() { public void timeoutAction() {
System.out.println("Disconnect timeout triggered");
try { try {
disconnect(); disconnect();
deliver(null, 0, END_OF_STREAM);
state = State.CLOSED;
} catch (java.lang.InterruptedException ex) { } catch (java.lang.InterruptedException ex) {
System.out.println("Thread interrupted. Exiting directly."); System.out.println("Thread interrupted. Exiting directly.");
System.exit(0); System.exit(0);
@ -40,31 +46,45 @@ public class GBNTReceiver extends Receiver implements TimeoutAction {
@Override @Override
protected void unreliableReceive(byte[] bytes, int offset, int length) { protected void unreliableReceive(byte[] bytes, int offset, int length) {
if (state == State.CLOSED) return;
//System.out.println("State: " + state + " receiving: " + length);
cancelTimeout(this);
setTimeout(RECEIVER_TIMEOUT_MS, this);
switch(this.state) { switch(this.state) {
case SETUP: case SETUP:
if (length != 2) { if (length != 2) {
if (firstPacket) return; if (!firstPacket) {
else {
state = State.RECEIVING; state = State.RECEIVING;
this.unreliableReceive(bytes, offset, length); this.unreliableReceive(bytes, offset, length);
return;
} }
return;
} }
sequence = readBigEndianChar(bytes, 0); sequence = readBigEndianChar(bytes, offset);
System.out.println("Synchronization ACK: " + (int) sequence);
unreliableSend(bytes, offset, 2); unreliableSend(bytes, offset, 2);
firstPacket = false; firstPacket = false;
break; break;
case RECEIVING: case RECEIVING:
if (length == 2) { // if packet is a close packet if (length == 2) { // if packet is a close packet
if (incrementAndRollover(this.sequence) != readBigEndianChar(bytes, offset)) return; // ignore if not synchronized char a = readBigEndianChar(bytes, offset);
if (incrementAndRollover(this.sequence) != a) return; // ignore if not synchronized
this.unreliableSend(bytes, offset, 2); this.unreliableSend(bytes, offset, 2);
System.out.println("Received valid FIN packet");
timeoutAction(); timeoutAction();
} }
if (length < 3) return; if (length < 3) return;
char seq = readBigEndianChar(bytes, 0); char seq = readBigEndianChar(bytes, offset);
if (seq != incrementAndRollover(sequence)) return; //drop the packet System.out.print("Read sequence: " + (int) seq + " -> ");
if (seq != incrementAndRollover(sequence)) {
System.out.println("Dropping");
return;
} //drop the packet
else { else {
this.unreliableReceive(bytes, offset + 2, length); this.deliver(bytes, offset + 2, length - 2);
sequence = incrementAndRollover(sequence); sequence = incrementAndRollover(sequence);
System.out.println("Sending ack: "+ (int) sequence);
this.unreliableSend(generateACK(sequence), 0, 2); this.unreliableSend(generateACK(sequence), 0, 2);
} }
break; break;

View file

@ -1,6 +1,9 @@
package impl;
import transport.TimeoutAction; import transport.TimeoutAction;
import java.util.*; import java.util.*;
import java.util.concurrent.Semaphore;
public class GBNTSender extends transport.Sender implements TimeoutAction { public class GBNTSender extends transport.Sender implements TimeoutAction {
@ -10,7 +13,7 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
} }
private static char readBigEndianChar(byte[] src, int offset) { private static char readBigEndianChar(byte[] src, int offset) {
return (char) (src[offset] << 8 + src[offset + 1]); return (char) (((src[offset] << 8) & 0xFF00) | (src[offset + 1] & 0xFF));
} }
private static char addAndRollover(char c, char i) { private static char addAndRollover(char c, char i) {
@ -22,6 +25,9 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
} }
private final Map<Character, Packet> packetMap = new HashMap<>(); private final Map<Character, Packet> packetMap = new HashMap<>();
private Semaphore s = new Semaphore(0);
private static final int MAX_PACKET_SIZE = 128;
private class Packet { private class Packet {
private final byte[] contents; private final byte[] contents;
@ -31,16 +37,16 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
Packet(char sequence, byte[] buffer, int offset, int length) { Packet(char sequence, byte[] buffer, int offset, int length) {
this.sequence = sequence; this.sequence = sequence;
this.length = Math.min(length + 2, MAX_PACKET_SIZE); this.length = Math.min(length, MAX_PACKET_SIZE - 2);
contents = new byte[MAX_PACKET_SIZE]; contents = new byte[this.length + 2];
writeBigEndianChar(contents, sequence); writeBigEndianChar(contents, sequence);
System.arraycopy(buffer, offset, this.contents, 0, this.length); System.arraycopy(buffer, offset + 2 - 2, contents, 2, contents.length - 2);
packetMap.put(this.sequence, this); packetMap.put(this.sequence, this);
} }
void send() { void send() {
estimateStart = System.currentTimeMillis(); estimateStart = System.currentTimeMillis();
unreliableSend(contents, 0, length); unreliableSend(contents, 0, contents.length);
} }
void destroy() { void destroy() {
@ -62,8 +68,8 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
private char base = (char) (new Random().nextInt() % 65536); private char base = (char) (new Random().nextInt() % 65536);
private char waitingACK = 0; private char waitingACK = 0;
private int timeoutMs = 500; private int timeoutMs = 500;
private double rttEWMA = 0; private double rttEWMA = 100;
private double devRttEWMA = 0; private double devRttEWMA = 100;
private State state = State.SETUP; private State state = State.SETUP;
private final Queue<Packet> packets = new ArrayDeque<>(); private final Queue<Packet> packets = new ArrayDeque<>();
@ -74,14 +80,11 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
CLOSING, CLOSING,
} }
public GBNTSender() {
sendSetup();
}
private void computeTimeoutLength(long rttMIllis) { private void computeTimeoutLength(long rttMIllis) {
devRttEWMA = 0.75 * devRttEWMA + 0.25 * (rttEWMA - rttMIllis); devRttEWMA = 0.75 * devRttEWMA + 0.25 * (rttEWMA - rttMIllis);
rttEWMA = 0.875 * rttEWMA + 0.125 * rttMIllis; rttEWMA = 0.875 * rttEWMA + 0.125 * rttMIllis;
timeoutMs = (int) (rttEWMA + 4 * devRttEWMA); timeoutMs = (int) (rttEWMA + 4 * devRttEWMA);
//System.out.println("devRTT: " + devRttEWMA + " RTT: " + rttEWMA + " timeout: " + timeoutMs);
} }
private void flushAckedPackets() { private void flushAckedPackets() {
@ -109,15 +112,20 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
@Override @Override
protected int reliableSend(byte[] bytes, int offset, int length) { protected int reliableSend(byte[] bytes, int offset, int length) {
if (state != State.SENDING) throw new IllegalStateException(); if (state == State.SETUP) {
sendSetup();
return 0;
}
Packet p = new Packet(addAndRollover(base, waitingACK), bytes, offset, length); Packet p = new Packet(addAndRollover(base, waitingACK), bytes, offset, length);
waitingACK++; waitingACK++;
packets.add(p); packets.add(p);
System.out.println("Sending seq: " + (int) p.sequence + " - (" + p.length + " + 2 bytes)");
p.send(); p.send();
setTimeout(timeoutMs, this); setTimeout(timeoutMs, this);
if (waitingACK >= W) { if (waitingACK >= W) {
System.out.println("Window full " + (int) p.sequence);
blockSender(); blockSender();
} }
@ -127,16 +135,27 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
@Override @Override
public void close() { public void close() {
state = State.CLOSING; state = State.CLOSING;
System.out.print("FIN packet: ");
reliableSend(new byte[0], 0, 0);
if (waitingACK > 0) { if (waitingACK > 0) {
blockSender(); blockSender();
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(255);
}
} }
} }
@Override @Override
public void timeoutAction() { public void timeoutAction() {
if (state == State.SENDING)
for (Packet p : packets) { for (Packet p : packets) {
p.send(); p.send();
} }
else if (state == State.SETUP)
sendSetup();
} }
@Override @Override
@ -147,24 +166,34 @@ public class GBNTSender extends transport.Sender implements TimeoutAction {
if (bytes[offset] == expected[0] && bytes[1] == expected[offset + 1]) { if (bytes[offset] == expected[0] && bytes[1] == expected[offset + 1]) {
state = State.SENDING; state = State.SENDING;
base = incrementAndRollover(base); base = incrementAndRollover(base);
System.out.println("Synchronization successful");
resumeSender(); resumeSender();
} else { } else {
System.out.println("Synchronization seq: " + (int) base);
sendSetup(); sendSetup();
} }
break; break;
case SENDING: case SENDING:
case CLOSING: case CLOSING:
char ackSeq = readBigEndianChar(bytes, offset); char ackSeq = readBigEndianChar(bytes, offset);
System.out.println("Received ACK: " + (int) ackSeq);
cancelTimeout(this); cancelTimeout(this);
computeTimeoutLength(packetMap.get(ackSeq).getRTT()); Packet p = packetMap.get(ackSeq);
if (p != null) computeTimeoutLength(p.getRTT());
boolean mustResume = waitingACK == W && state == State.SENDING; boolean mustResume = waitingACK == W && state == State.SENDING;
waitingACK -= (ackSeq - base);
waitingACK -= (ackSeq + 1 - base);
base = incrementAndRollover(ackSeq); base = incrementAndRollover(ackSeq);
flushAckedPackets(); flushAckedPackets();
if (mustResume || (waitingACK == 0 && state == State.CLOSING)) { if (mustResume || (waitingACK == 0 && state == State.CLOSING)) {
resumeSender(); resumeSender();
if (state == State.CLOSING) {
System.out.println("Closing");
s.release();
}
} }
} }
} }

View file

@ -0,0 +1,42 @@
package transport;
import impl.GBNTReceiver;
import java.io.FileOutputStream;
import java.net.InetAddress;
public class FileReceiver {
public static void main(String[] var0) throws Exception {
int var1 = 0;
switch(var0.length) {
case 6:
var1 = Integer.parseInt(var0[5]);
case 5:
String var6 = var0[0];
int var2 = Integer.parseInt(var0[1]);
String var3 = var0[2];
int var4 = Integer.parseInt(var0[3]);
String var5 = var0[4];
//Class var7 = ClassLoader.getSystemClassLoader().loadClass(var6);
Class<?> var7 = GBNTReceiver.class;
Receiver var8 = (Receiver)var7.newInstance();
var8.setErrorPercentage((double)var1);
var8.connect(var2, InetAddress.getByName(var3), var4);
byte[] var9 = new byte[1024];
FileOutputStream var11 = new FileOutputStream(var5);
int var10;
while((var10 = var8.receive(var9, 0, 1024)) > 0) {
var11.write(var9, 0, var10);
}
var11.close();
var11 = null;
var8.disconnect();
return;
default:
System.err.println("Usage: FileReceiver <ReceiverClass> <local port> <destination host> <destination port> <filename> [error%]");
}
}
}

View file

@ -0,0 +1,53 @@
package transport;
import java.io.File;
import java.io.FileInputStream;
import java.net.InetAddress;
import impl.GBNTSender;
public class FileSender {
public static void main(String[] var0) throws Exception {
int var1 = 0;
switch(var0.length) {
case 6:
var1 = Integer.parseInt(var0[5]);
case 5:
String var6 = var0[0];
int var2 = Integer.parseInt(var0[1]);
String var3 = var0[2];
int var4 = Integer.parseInt(var0[3]);
String var5 = var0[4];
//Class var7 = ClassLoader.getSystemClassLoader().loadClass(var6);
Class<?> var7 = GBNTSender.class;
Sender var8 = (Sender)var7.newInstance();
var8.setErrorPercentage((double)var1);
var8.connect(var2, InetAddress.getByName(var3), var4);
File var9 = new File(var5);
try {
FileInputStream var10 = new FileInputStream(var9);
byte[] var11 = new byte[4000];
int var12;
int var14;
while((var12 = var10.read(var11)) != -1) {
for(int var13 = 0; var12 > 0; var13 += var14) {
var14 = var8.send(var11, var13, var12);
var12 -= var14;
}
}
var10.close();
} catch (Exception var15) {
var15.printStackTrace();
}
var8.close();
var8.disconnect();
return;
default:
System.err.println("Usage: FileSender <SenderClass> <local port> <destination host> <destination port> <filename> [error%]");
}
}
}

View file

@ -0,0 +1,81 @@
package transport;
import java.util.Vector;
class MyTimer implements Runnable {
Object monitor;
Vector<ScheduledAction> actions;
boolean active;
public MyTimer(Object var1) {
this.monitor = var1;
this.actions = new Vector();
this.active = true;
Thread var2 = new Thread(this);
var2.setDaemon(true);
var2.start();
}
public synchronized void stop() {
this.active = false;
}
public synchronized void schedule(TimeoutAction var1, long var2) {
long var4 = System.currentTimeMillis() + var2;
int var6;
for(var6 = 0; var6 < this.actions.size() && ((ScheduledAction)this.actions.elementAt(var6)).time < var4; ++var6) {
}
this.actions.insertElementAt(new ScheduledAction(var4, var1), var6);
this.notify();
}
synchronized TimeoutAction getFirst() throws InterruptedException {
while(true) {
if (this.active) {
if (this.actions.isEmpty()) {
this.wait();
continue;
}
long var1 = ((ScheduledAction)this.actions.elementAt(0)).time - System.currentTimeMillis();
if (var1 > 0L) {
this.wait(var1);
continue;
}
return ((ScheduledAction)this.actions.remove(0)).action;
}
return null;
}
}
public synchronized void cancel(TimeoutAction var1) {
for(int var2 = 0; var2 < this.actions.size(); ++var2) {
if (((ScheduledAction)this.actions.elementAt(var2)).action == var1) {
this.actions.remove(var2);
break;
}
}
}
public void run() {
try {
while(true) {
TimeoutAction var1 = this.getFirst();
if (var1 == null) {
return;
}
synchronized(this.monitor) {
var1.timeoutAction();
}
}
} catch (Exception var5) {
System.err.println("MyTimer: " + var5);
}
}
}

View file

@ -0,0 +1,65 @@
package transport;
public abstract class Receiver extends Transport {
private boolean writing_allowed = true;
private byte[] rcv_buffer = new byte[1024];
private int rcv_offset;
private int rcv_length;
public Receiver() {
}
public final synchronized int receive(byte[] var1, int var2, int var3) throws InterruptedException {
while(this.rcv_length == 0) {
this.wait();
}
if (this.rcv_length < 0) {
return this.rcv_length;
} else if (this.rcv_length > var3) {
for(int var4 = 0; var4 < var3; ++var4) {
var1[var2 + var4] = this.rcv_buffer[this.rcv_offset + var4];
}
this.rcv_offset += var3;
this.rcv_length -= var3;
this.notifyAll();
return var3;
} else {
var3 = this.rcv_length;
do {
--this.rcv_length;
var1[var2 + this.rcv_length] = this.rcv_buffer[this.rcv_offset + this.rcv_length];
} while(this.rcv_length > 0);
this.notify();
return var3;
}
}
protected final void deliver(byte[] bytes, int offset, int length) {
if (length == 0) {
System.err.println("Receiver: error: deliver was called with zero length.");
} else {
synchronized(this) {
try {
while(this.rcv_length != 0) {
this.wait();
}
for(int i = 0; i < length; ++i) {
this.rcv_buffer[i] = bytes[i + offset];
}
this.rcv_offset = 0;
this.rcv_length = length;
this.notifyAll();
} catch (InterruptedException ignored) {
System.err.println("Receiver.deliver: warning: interrupted while delivering to application. Some data might be lost.");
}
}
}
}
}

View file

@ -0,0 +1,11 @@
package transport;
class ScheduledAction {
long time;
TimeoutAction action;
public ScheduledAction(long var1, TimeoutAction var3) {
this.time = var1;
this.action = var3;
}
}

View file

@ -0,0 +1,34 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package transport;
public abstract class Sender extends Transport {
private boolean sending_allowed = true;
public Sender() {
}
public final synchronized int send(byte[] var1, int var2, int var3) throws InterruptedException {
while(!this.sending_allowed) {
this.wait();
}
return this.reliableSend(var1, var2, var3);
}
protected final synchronized void blockSender() {
this.sending_allowed = false;
}
protected final synchronized void resumeSender() {
this.sending_allowed = true;
this.notifyAll();
}
protected abstract int reliableSend(byte[] var1, int var2, int var3);
public abstract void close();
}

View file

@ -0,0 +1,5 @@
package transport;
public interface TimeoutAction {
void timeoutAction();
}

View file

@ -0,0 +1,118 @@
package transport;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.Random;
public abstract class Transport implements Runnable {
private DatagramSocket socket = null;
private DatagramPacket rcv_packet = null;
private DatagramPacket snd_packet = null;
private static Random generator = new Random();
private double error = 0.0D;
private boolean disconnect_allowed = true;
private MyTimer timer = new MyTimer(this);
private Thread listener;
public static final int MAX_PACKET_SIZE = 1024;
public static final int END_OF_STREAM = -1;
public Transport() {
}
public final synchronized void connect(int var1, InetAddress var2, int var3) throws IllegalArgumentException, SocketException {
this.socket = new DatagramSocket(new InetSocketAddress(var1));
this.socket.connect(var2, var3);
this.socket.setSoTimeout(60000);
if (this.snd_packet == null) {
this.snd_packet = new DatagramPacket(new byte[1024], 1024, var2, var3);
} else {
this.snd_packet.setAddress(var2);
this.snd_packet.setPort(var3);
}
if (this.rcv_packet == null) {
this.rcv_packet = new DatagramPacket(new byte[1024], 1024);
}
if (this.listener == null) {
this.listener = new Thread(this);
this.listener.setDaemon(true);
this.listener.start();
}
}
protected final synchronized void allowDisconnect() {
this.disconnect_allowed = true;
this.notifyAll();
}
protected final synchronized void blockDisconnect() {
this.disconnect_allowed = false;
}
public final synchronized void disconnect() throws InterruptedException {
while(!this.disconnect_allowed) {
this.wait();
}
if (this.socket != null) {
this.socket.close();
this.listener.interrupt();
this.socket = null;
this.listener = null;
this.timer.stop();
}
}
protected abstract void unreliableReceive(byte[] var1, int var2, int var3);
protected final void unreliableSend(byte[] var1, int var2, int var3) {
if (generator.nextDouble() >= this.error) {
try {
this.snd_packet.setData(var1, var2, var3);
this.socket.send(this.snd_packet);
} catch (Exception var5) {
System.err.println("unreliableSend: error sending packet: " + var5);
var5.printStackTrace();
}
}
}
public final void setErrorPercentage(double var1) {
this.error = var1 / 100.0D;
}
public final synchronized void setTimeout(long var1, TimeoutAction var3) {
this.timer.schedule(var3, var1);
}
public final synchronized void cancelTimeout(TimeoutAction var1) {
this.timer.cancel(var1);
}
public final void run() {
while(this.socket != null) {
try {
this.socket.receive(this.rcv_packet);
synchronized(this) {
this.unreliableReceive(this.rcv_packet.getData(), this.rcv_packet.getOffset(), this.rcv_packet.getLength());
}
} catch (IOException var6) {
synchronized(this) {
if (this.socket == null) {
return;
}
}
System.err.println("run: error receiving data from the network: " + var6);
System.err.println("Terminating.");
return;
}
}
}
}

Binary file not shown.