diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index a2bf131..b09929f 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,6 @@
+#Sun Apr 12 12:33:03 CEST 2020
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.2-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.2-bin.zip
-zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
diff --git a/socket_test.html b/socket_test.html
index 687388b..91399e5 100644
--- a/socket_test.html
+++ b/socket_test.html
@@ -1,40 +1,43 @@
-
+
-
-
-
-
Waiting for authentication...
-
-
-
+
+
+
+
diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java
index 59c0343..c349aa2 100644
--- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java
+++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java
@@ -53,7 +53,7 @@ public class MotionSensorController {
sensor.setDetected(detected);
final MotionSensor toReturn = motionSensorService.save(sensor);
- sensorSocketEndpoint.broadcast(sensor, motionSensorService.findUser(sensor.getId()));
+ sensorSocketEndpoint.queueDeviceUpdate(sensor, motionSensorService.findUser(sensor.getId()));
return toReturn;
}
diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java
index ee1be81..5c415dd 100644
--- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java
+++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java
@@ -56,7 +56,7 @@ public class SensorController {
sensor.setValue(value);
final Sensor toReturn = sensorRepository.save(sensor);
- sensorSocketEndpoint.broadcast(sensor, sensorRepository.findUser(sensor.getId()));
+ sensorSocketEndpoint.queueDeviceUpdate(sensor, sensorRepository.findUser(sensor.getId()));
return toReturn;
}
diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java
index 932db5c..9cdb838 100644
--- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java
+++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java
@@ -43,8 +43,7 @@ public class UpdateTasks {
Sensor.TYPICAL_VALUES
.get(sensor.getSensor())
.multiply(
- new BigDecimal(
- 0.9875 + Math.random() / 40))));
+ BigDecimal.valueOf(0.9875 + Math.random() / 40))));
}
/**
@@ -72,6 +71,12 @@ public class UpdateTasks {
public void smartPlugConsumptionFakeUpdate() {
smartPlugRepository.updateTotalConsumption(SmartPlug.AVERAGE_CONSUMPTION_KW);
final Collection c = smartPlugRepository.findByOn(true);
- c.forEach(s -> sensorSocketEndpoint.broadcast(s, sensorRepository.findUser(s.getId())));
+ c.forEach(s -> sensorSocketEndpoint.queueDeviceUpdate(s, sensorRepository.findUser(s.getId())));
+ }
+
+ /** Sends device updates through sensor socket in batch every one second */
+ @Scheduled(fixedDelay = 1000)
+ public void socketFlush() {
+ sensorSocketEndpoint.flushDeviceUpdates();
}
}
diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/AuthenticationMessageListener.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/AuthenticationMessageListener.java
deleted file mode 100644
index 89415aa..0000000
--- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/AuthenticationMessageListener.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package ch.usi.inf.sa4.sanmarinoes.smarthut.socket;
-
-import ch.usi.inf.sa4.sanmarinoes.smarthut.config.GsonConfig;
-import ch.usi.inf.sa4.sanmarinoes.smarthut.config.JWTTokenUtils;
-import ch.usi.inf.sa4.sanmarinoes.smarthut.models.User;
-import ch.usi.inf.sa4.sanmarinoes.smarthut.models.UserRepository;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import io.jsonwebtoken.ExpiredJwtException;
-import java.io.IOException;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import javax.websocket.MessageHandler;
-import javax.websocket.Session;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/** Generates MessageHandlers for unauthenticated socket sessions */
-@Component
-public class AuthenticationMessageListener {
-
- private Gson gson = GsonConfig.gson();
-
- private JWTTokenUtils jwtTokenUtils;
-
- private UserRepository userRepository;
-
- @Autowired
- public AuthenticationMessageListener(
- JWTTokenUtils jwtTokenUtils, UserRepository userRepository) {
- this.jwtTokenUtils = jwtTokenUtils;
- this.userRepository = userRepository;
- }
-
- /**
- * Generates a new message handler to handle socket authentication
- *
- * @param session the session to which authentication must be checked
- * @param authorizedSetter function to call once user is authenticated
- * @return a new message handler to handle socket authentication
- */
- MessageHandler.Whole newHandler(
- final Session session, BiConsumer authorizedSetter) {
- return new MessageHandler.Whole<>() {
- @Override
- public void onMessage(final String message) {
- if (message == null) {
- acknowledge(false);
- return;
- }
-
- String token;
- String username;
-
- try {
- token = gson.fromJson(message, JsonObject.class).get("token").getAsString();
- username = jwtTokenUtils.getUsernameFromToken(token);
- } catch (ExpiredJwtException e) {
- System.err.println(e.getMessage());
- acknowledge(false);
- return;
- } catch (Throwable ignored) {
- System.out.println("Token format not valid");
- acknowledge(false);
- return;
- }
-
- final User user = userRepository.findByUsername(username);
- if (user == null || jwtTokenUtils.isTokenExpired(token)) {
- System.out.println("Token not valid");
- acknowledge(false);
- return;
- }
-
- // Here user is authenticated
- session.removeMessageHandler(this);
-
- // Add user-session pair in authorized list
- authorizedSetter.accept(user, session);
-
- // update client to acknowledge authentication
- acknowledge(true);
- }
-
- private void acknowledge(boolean success) {
- try {
- session.getBasicRemote()
- .sendText(gson.toJson(Map.of("authenticated", success)));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- };
- }
-}
diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java
index 600bdf4..655a773 100644
--- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java
+++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java
@@ -2,67 +2,87 @@ package ch.usi.inf.sa4.sanmarinoes.smarthut.socket;
import ch.usi.inf.sa4.sanmarinoes.smarthut.config.GsonConfig;
+import ch.usi.inf.sa4.sanmarinoes.smarthut.config.JWTTokenUtils;
+import ch.usi.inf.sa4.sanmarinoes.smarthut.models.Device;
import ch.usi.inf.sa4.sanmarinoes.smarthut.models.User;
+import ch.usi.inf.sa4.sanmarinoes.smarthut.models.UserRepository;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.gson.Gson;
+
import java.io.IOException;
import java.util.*;
import javax.websocket.*;
+
+import com.google.gson.JsonObject;
+import io.jsonwebtoken.ExpiredJwtException;
+import org.hibernate.annotations.common.reflection.XProperty;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.core.parameters.P;
import org.springframework.stereotype.Component;
-/** Endpoint of socket at URL /sensor-socket used to update the client with sensor information */
+/**
+ * Endpoint of socket at URL /sensor-socket used to update the client with sensor information
+ */
@Component
public class SensorSocketEndpoint extends Endpoint {
private Gson gson = GsonConfig.gson();
- private AuthenticationMessageListener authenticationMessageListener;
+ private UserRepository userRepository;
- private Set unauthorizedClients = Collections.synchronizedSet(new HashSet<>());
+ private JWTTokenUtils jwtTokenUtils;
private Multimap authorizedClients =
Multimaps.synchronizedMultimap(HashMultimap.create());
+ private final Map> messages = new HashMap<>();
+
@Autowired
- public SensorSocketEndpoint(AuthenticationMessageListener authenticationMessageListener) {
- this.authenticationMessageListener = authenticationMessageListener;
+ public SensorSocketEndpoint(UserRepository userRepository, JWTTokenUtils jwtTokenUtils) {
+ this.jwtTokenUtils = jwtTokenUtils;
+ this.userRepository = userRepository;
}
/**
- * Returns a synchronized set of socket sessions not yet authorized with a token
- *
- * @return a synchronized set of socket sessions not yet authorized with a token
+ * Queues a single device update for a certain user to be sent
+ * @param device the device update to be sent
+ * @param u the user the device belongs
*/
- public Set getUnauthorizedClients() {
- return unauthorizedClients;
+ public void queueDeviceUpdate(Device device, User u) {
+ synchronized (messages) {
+ messages.putIfAbsent(u, new HashMap<>());
+ messages.get(u).put(device.getId(), device);
+ }
}
/**
- * Returns a synchronized User to Session multimap with authorized sessions
- *
- * @return a synchronized User to Session multimap with authorized sessions
+ * Sends all device updates queued to be sent in a unique WebSocket message
*/
- public Multimap getAuthorizedClients() {
- return authorizedClients;
+ public void flushDeviceUpdates() {
+ synchronized (messages) {
+ for (Map.Entry> batchForUser : messages.entrySet()) {
+ broadcast(batchForUser.getKey(), batchForUser.getValue().values());
+ batchForUser.getValue().clear();
+ }
+ }
}
/**
- * Given a message and a user, broadcasts that message in json to all associated clients and
- * returns the number of successful transfers
+ * Given a collection of messages and a user, broadcasts that message in json to all
+ * associated clients
*
- * @param message the message to send
- * @param u the user to which to send the message
- * @return number of successful transfer
+ * @param messages the message batch to send
+ * @param u the user to which to send the message
*/
- public void broadcast(Object message, User u) {
+ private void broadcast(User u, Collection> messages) {
+ if (messages.isEmpty()) return;
final HashSet sessions = new HashSet<>(authorizedClients.get(u));
for (Session s : sessions) {
try {
if (s.isOpen()) {
- s.getBasicRemote().sendText(gson.toJson(message));
+ s.getBasicRemote().sendText(gson.toJson(messages));
} else {
authorizedClients.remove(u, s);
}
@@ -76,17 +96,37 @@ public class SensorSocketEndpoint extends Endpoint {
* Handles the opening of a socket session with a client
*
* @param session the newly born session
- * @param config endpoint configuration
+ * @param config endpoint configuration
*/
@Override
public void onOpen(Session session, EndpointConfig config) {
- unauthorizedClients.add(session);
- session.addMessageHandler(
- authenticationMessageListener.newHandler(
- session,
- (u, s) -> {
- unauthorizedClients.remove(s);
- authorizedClients.put(u, s);
- }));
+ final List tokenQuery = session.getRequestParameterMap().get("token");
+ User u;
+ if (!tokenQuery.isEmpty() && (u = checkToken(tokenQuery.get(0))) != null) {
+ authorizedClients.put(u, session);
+ } else {
+ try {
+ session.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ private User checkToken(String protocolString) {
+ String username;
+
+ try {
+ username = jwtTokenUtils.getUsernameFromToken(protocolString);
+ } catch (Throwable ignored) {
+ System.out.println("Token format not valid");
+ return null;
+ }
+
+ final User user = userRepository.findByUsername(username);
+ if (user != null && !jwtTokenUtils.isTokenExpired(protocolString)) {
+ return user;
+ } else {
+ return null;
+ }
}
}