From 8629c1f6b4ff494a21bcccf391a698e475b486d1 Mon Sep 17 00:00:00 2001 From: "Claudio Maggioni (maggicl)" Date: Sun, 12 Apr 2020 15:58:40 +0200 Subject: [PATCH 1/2] Adapted websocket code to new redux specification --- gradle/wrapper/gradle-wrapper.properties | 5 +- socket_test.html | 43 +++++---- .../socket/AuthenticationMessageListener.java | 95 ------------------- .../smarthut/socket/SensorSocketEndpoint.java | 79 ++++++++------- 4 files changed, 71 insertions(+), 151 deletions(-) delete mode 100644 src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/AuthenticationMessageListener.java diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a2bf131..b09929f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ +#Sun Apr 12 12:33:03 CEST 2020 +distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.2-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.2-bin.zip -zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME diff --git a/socket_test.html b/socket_test.html index 687388b..91399e5 100644 --- a/socket_test.html +++ b/socket_test.html @@ -1,40 +1,43 @@ - + - - -
-

Waiting for authentication...

-
- - + + + + diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/AuthenticationMessageListener.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/AuthenticationMessageListener.java deleted file mode 100644 index 89415aa..0000000 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/AuthenticationMessageListener.java +++ /dev/null @@ -1,95 +0,0 @@ -package ch.usi.inf.sa4.sanmarinoes.smarthut.socket; - -import ch.usi.inf.sa4.sanmarinoes.smarthut.config.GsonConfig; -import ch.usi.inf.sa4.sanmarinoes.smarthut.config.JWTTokenUtils; -import ch.usi.inf.sa4.sanmarinoes.smarthut.models.User; -import ch.usi.inf.sa4.sanmarinoes.smarthut.models.UserRepository; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import io.jsonwebtoken.ExpiredJwtException; -import java.io.IOException; -import java.util.Map; -import java.util.function.BiConsumer; -import javax.websocket.MessageHandler; -import javax.websocket.Session; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** Generates MessageHandlers for unauthenticated socket sessions */ -@Component -public class AuthenticationMessageListener { - - private Gson gson = GsonConfig.gson(); - - private JWTTokenUtils jwtTokenUtils; - - private UserRepository userRepository; - - @Autowired - public AuthenticationMessageListener( - JWTTokenUtils jwtTokenUtils, UserRepository userRepository) { - this.jwtTokenUtils = jwtTokenUtils; - this.userRepository = userRepository; - } - - /** - * Generates a new message handler to handle socket authentication - * - * @param session the session to which authentication must be checked - * @param authorizedSetter function to call once user is authenticated - * @return a new message handler to handle socket authentication - */ - MessageHandler.Whole newHandler( - final Session session, BiConsumer authorizedSetter) { - return new MessageHandler.Whole<>() { - @Override - public void onMessage(final String message) { - if (message == null) { - acknowledge(false); - return; - } - - String token; - String username; - - try { - token = gson.fromJson(message, JsonObject.class).get("token").getAsString(); - username = jwtTokenUtils.getUsernameFromToken(token); - } catch (ExpiredJwtException e) { - System.err.println(e.getMessage()); - acknowledge(false); - return; - } catch (Throwable ignored) { - System.out.println("Token format not valid"); - acknowledge(false); - return; - } - - final User user = userRepository.findByUsername(username); - if (user == null || jwtTokenUtils.isTokenExpired(token)) { - System.out.println("Token not valid"); - acknowledge(false); - return; - } - - // Here user is authenticated - session.removeMessageHandler(this); - - // Add user-session pair in authorized list - authorizedSetter.accept(user, session); - - // update client to acknowledge authentication - acknowledge(true); - } - - private void acknowledge(boolean success) { - try { - session.getBasicRemote() - .sendText(gson.toJson(Map.of("authenticated", success))); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - } -} diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java index 600bdf4..44c754b 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java @@ -2,51 +2,43 @@ package ch.usi.inf.sa4.sanmarinoes.smarthut.socket; import ch.usi.inf.sa4.sanmarinoes.smarthut.config.GsonConfig; +import ch.usi.inf.sa4.sanmarinoes.smarthut.config.JWTTokenUtils; import ch.usi.inf.sa4.sanmarinoes.smarthut.models.User; +import ch.usi.inf.sa4.sanmarinoes.smarthut.models.UserRepository; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.gson.Gson; + import java.io.IOException; import java.util.*; import javax.websocket.*; + +import com.google.gson.JsonObject; +import io.jsonwebtoken.ExpiredJwtException; +import org.hibernate.annotations.common.reflection.XProperty; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -/** Endpoint of socket at URL /sensor-socket used to update the client with sensor information */ +/** + * Endpoint of socket at URL /sensor-socket used to update the client with sensor information + */ @Component public class SensorSocketEndpoint extends Endpoint { private Gson gson = GsonConfig.gson(); - private AuthenticationMessageListener authenticationMessageListener; + private UserRepository userRepository; - private Set unauthorizedClients = Collections.synchronizedSet(new HashSet<>()); + private JWTTokenUtils jwtTokenUtils; private Multimap authorizedClients = Multimaps.synchronizedMultimap(HashMultimap.create()); @Autowired - public SensorSocketEndpoint(AuthenticationMessageListener authenticationMessageListener) { - this.authenticationMessageListener = authenticationMessageListener; - } - - /** - * Returns a synchronized set of socket sessions not yet authorized with a token - * - * @return a synchronized set of socket sessions not yet authorized with a token - */ - public Set getUnauthorizedClients() { - return unauthorizedClients; - } - - /** - * Returns a synchronized User to Session multimap with authorized sessions - * - * @return a synchronized User to Session multimap with authorized sessions - */ - public Multimap getAuthorizedClients() { - return authorizedClients; + public SensorSocketEndpoint(UserRepository userRepository, JWTTokenUtils jwtTokenUtils) { + this.jwtTokenUtils = jwtTokenUtils; + this.userRepository = userRepository; } /** @@ -54,8 +46,7 @@ public class SensorSocketEndpoint extends Endpoint { * returns the number of successful transfers * * @param message the message to send - * @param u the user to which to send the message - * @return number of successful transfer + * @param u the user to which to send the message */ public void broadcast(Object message, User u) { final HashSet sessions = new HashSet<>(authorizedClients.get(u)); @@ -76,17 +67,37 @@ public class SensorSocketEndpoint extends Endpoint { * Handles the opening of a socket session with a client * * @param session the newly born session - * @param config endpoint configuration + * @param config endpoint configuration */ @Override public void onOpen(Session session, EndpointConfig config) { - unauthorizedClients.add(session); - session.addMessageHandler( - authenticationMessageListener.newHandler( - session, - (u, s) -> { - unauthorizedClients.remove(s); - authorizedClients.put(u, s); - })); + final List tokenQuery = session.getRequestParameterMap().get("token"); + User u; + if (!tokenQuery.isEmpty() && (u = checkToken(tokenQuery.get(0))) != null) { + authorizedClients.put(u, session); + } else { + try { + session.close(); + } catch (IOException ignored) { + } + } + } + + private User checkToken(String protocolString) { + String username; + + try { + username = jwtTokenUtils.getUsernameFromToken(protocolString); + } catch (Throwable ignored) { + System.out.println("Token format not valid"); + return null; + } + + final User user = userRepository.findByUsername(username); + if (user != null && !jwtTokenUtils.isTokenExpired(protocolString)) { + return user; + } else { + return null; + } } } From 68f9b352cdf5b2e9190cb889eaeef31b27fb74d5 Mon Sep 17 00:00:00 2001 From: "Claudio Maggioni (maggicl)" Date: Sun, 12 Apr 2020 17:47:03 +0200 Subject: [PATCH 2/2] Updaed websocket to perform batch updates --- .../controller/MotionSensorController.java | 2 +- .../smarthut/controller/SensorController.java | 2 +- .../smarthut/scheduled/UpdateTasks.java | 11 ++++-- .../smarthut/socket/SensorSocketEndpoint.java | 39 ++++++++++++++++--- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java index 59c0343..c349aa2 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java @@ -53,7 +53,7 @@ public class MotionSensorController { sensor.setDetected(detected); final MotionSensor toReturn = motionSensorService.save(sensor); - sensorSocketEndpoint.broadcast(sensor, motionSensorService.findUser(sensor.getId())); + sensorSocketEndpoint.queueDeviceUpdate(sensor, motionSensorService.findUser(sensor.getId())); return toReturn; } diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java index ee1be81..5c415dd 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java @@ -56,7 +56,7 @@ public class SensorController { sensor.setValue(value); final Sensor toReturn = sensorRepository.save(sensor); - sensorSocketEndpoint.broadcast(sensor, sensorRepository.findUser(sensor.getId())); + sensorSocketEndpoint.queueDeviceUpdate(sensor, sensorRepository.findUser(sensor.getId())); return toReturn; } diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java index 932db5c..9cdb838 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java @@ -43,8 +43,7 @@ public class UpdateTasks { Sensor.TYPICAL_VALUES .get(sensor.getSensor()) .multiply( - new BigDecimal( - 0.9875 + Math.random() / 40)))); + BigDecimal.valueOf(0.9875 + Math.random() / 40)))); } /** @@ -72,6 +71,12 @@ public class UpdateTasks { public void smartPlugConsumptionFakeUpdate() { smartPlugRepository.updateTotalConsumption(SmartPlug.AVERAGE_CONSUMPTION_KW); final Collection c = smartPlugRepository.findByOn(true); - c.forEach(s -> sensorSocketEndpoint.broadcast(s, sensorRepository.findUser(s.getId()))); + c.forEach(s -> sensorSocketEndpoint.queueDeviceUpdate(s, sensorRepository.findUser(s.getId()))); + } + + /** Sends device updates through sensor socket in batch every one second */ + @Scheduled(fixedDelay = 1000) + public void socketFlush() { + sensorSocketEndpoint.flushDeviceUpdates(); } } diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java index 44c754b..655a773 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java @@ -3,6 +3,7 @@ package ch.usi.inf.sa4.sanmarinoes.smarthut.socket; import ch.usi.inf.sa4.sanmarinoes.smarthut.config.GsonConfig; import ch.usi.inf.sa4.sanmarinoes.smarthut.config.JWTTokenUtils; +import ch.usi.inf.sa4.sanmarinoes.smarthut.models.Device; import ch.usi.inf.sa4.sanmarinoes.smarthut.models.User; import ch.usi.inf.sa4.sanmarinoes.smarthut.models.UserRepository; import com.google.common.collect.HashMultimap; @@ -18,6 +19,7 @@ import com.google.gson.JsonObject; import io.jsonwebtoken.ExpiredJwtException; import org.hibernate.annotations.common.reflection.XProperty; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.core.parameters.P; import org.springframework.stereotype.Component; /** @@ -35,6 +37,8 @@ public class SensorSocketEndpoint extends Endpoint { private Multimap authorizedClients = Multimaps.synchronizedMultimap(HashMultimap.create()); + private final Map> messages = new HashMap<>(); + @Autowired public SensorSocketEndpoint(UserRepository userRepository, JWTTokenUtils jwtTokenUtils) { this.jwtTokenUtils = jwtTokenUtils; @@ -42,18 +46,43 @@ public class SensorSocketEndpoint extends Endpoint { } /** - * Given a message and a user, broadcasts that message in json to all associated clients and - * returns the number of successful transfers + * Queues a single device update for a certain user to be sent + * @param device the device update to be sent + * @param u the user the device belongs + */ + public void queueDeviceUpdate(Device device, User u) { + synchronized (messages) { + messages.putIfAbsent(u, new HashMap<>()); + messages.get(u).put(device.getId(), device); + } + } + + /** + * Sends all device updates queued to be sent in a unique WebSocket message + */ + public void flushDeviceUpdates() { + synchronized (messages) { + for (Map.Entry> batchForUser : messages.entrySet()) { + broadcast(batchForUser.getKey(), batchForUser.getValue().values()); + batchForUser.getValue().clear(); + } + } + } + + /** + * Given a collection of messages and a user, broadcasts that message in json to all + * associated clients * - * @param message the message to send + * @param messages the message batch to send * @param u the user to which to send the message */ - public void broadcast(Object message, User u) { + private void broadcast(User u, Collection messages) { + if (messages.isEmpty()) return; final HashSet sessions = new HashSet<>(authorizedClients.get(u)); for (Session s : sessions) { try { if (s.isOpen()) { - s.getBasicRemote().sendText(gson.toJson(message)); + s.getBasicRemote().sendText(gson.toJson(messages)); } else { authorizedClients.remove(u, s); }