From 68f9b352cdf5b2e9190cb889eaeef31b27fb74d5 Mon Sep 17 00:00:00 2001 From: "Claudio Maggioni (maggicl)" Date: Sun, 12 Apr 2020 17:47:03 +0200 Subject: [PATCH] Updaed websocket to perform batch updates --- .../controller/MotionSensorController.java | 2 +- .../smarthut/controller/SensorController.java | 2 +- .../smarthut/scheduled/UpdateTasks.java | 11 ++++-- .../smarthut/socket/SensorSocketEndpoint.java | 39 ++++++++++++++++--- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java index 59c0343..c349aa2 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/MotionSensorController.java @@ -53,7 +53,7 @@ public class MotionSensorController { sensor.setDetected(detected); final MotionSensor toReturn = motionSensorService.save(sensor); - sensorSocketEndpoint.broadcast(sensor, motionSensorService.findUser(sensor.getId())); + sensorSocketEndpoint.queueDeviceUpdate(sensor, motionSensorService.findUser(sensor.getId())); return toReturn; } diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java index ee1be81..5c415dd 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/controller/SensorController.java @@ -56,7 +56,7 @@ public class SensorController { sensor.setValue(value); final Sensor toReturn = sensorRepository.save(sensor); - sensorSocketEndpoint.broadcast(sensor, sensorRepository.findUser(sensor.getId())); + sensorSocketEndpoint.queueDeviceUpdate(sensor, sensorRepository.findUser(sensor.getId())); return toReturn; } diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java index 932db5c..9cdb838 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/scheduled/UpdateTasks.java @@ -43,8 +43,7 @@ public class UpdateTasks { Sensor.TYPICAL_VALUES .get(sensor.getSensor()) .multiply( - new BigDecimal( - 0.9875 + Math.random() / 40)))); + BigDecimal.valueOf(0.9875 + Math.random() / 40)))); } /** @@ -72,6 +71,12 @@ public class UpdateTasks { public void smartPlugConsumptionFakeUpdate() { smartPlugRepository.updateTotalConsumption(SmartPlug.AVERAGE_CONSUMPTION_KW); final Collection c = smartPlugRepository.findByOn(true); - c.forEach(s -> sensorSocketEndpoint.broadcast(s, sensorRepository.findUser(s.getId()))); + c.forEach(s -> sensorSocketEndpoint.queueDeviceUpdate(s, sensorRepository.findUser(s.getId()))); + } + + /** Sends device updates through sensor socket in batch every one second */ + @Scheduled(fixedDelay = 1000) + public void socketFlush() { + sensorSocketEndpoint.flushDeviceUpdates(); } } diff --git a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java index 44c754b..655a773 100644 --- a/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java +++ b/src/main/java/ch/usi/inf/sa4/sanmarinoes/smarthut/socket/SensorSocketEndpoint.java @@ -3,6 +3,7 @@ package ch.usi.inf.sa4.sanmarinoes.smarthut.socket; import ch.usi.inf.sa4.sanmarinoes.smarthut.config.GsonConfig; import ch.usi.inf.sa4.sanmarinoes.smarthut.config.JWTTokenUtils; +import ch.usi.inf.sa4.sanmarinoes.smarthut.models.Device; import ch.usi.inf.sa4.sanmarinoes.smarthut.models.User; import ch.usi.inf.sa4.sanmarinoes.smarthut.models.UserRepository; import com.google.common.collect.HashMultimap; @@ -18,6 +19,7 @@ import com.google.gson.JsonObject; import io.jsonwebtoken.ExpiredJwtException; import org.hibernate.annotations.common.reflection.XProperty; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.core.parameters.P; import org.springframework.stereotype.Component; /** @@ -35,6 +37,8 @@ public class SensorSocketEndpoint extends Endpoint { private Multimap authorizedClients = Multimaps.synchronizedMultimap(HashMultimap.create()); + private final Map> messages = new HashMap<>(); + @Autowired public SensorSocketEndpoint(UserRepository userRepository, JWTTokenUtils jwtTokenUtils) { this.jwtTokenUtils = jwtTokenUtils; @@ -42,18 +46,43 @@ public class SensorSocketEndpoint extends Endpoint { } /** - * Given a message and a user, broadcasts that message in json to all associated clients and - * returns the number of successful transfers + * Queues a single device update for a certain user to be sent + * @param device the device update to be sent + * @param u the user the device belongs + */ + public void queueDeviceUpdate(Device device, User u) { + synchronized (messages) { + messages.putIfAbsent(u, new HashMap<>()); + messages.get(u).put(device.getId(), device); + } + } + + /** + * Sends all device updates queued to be sent in a unique WebSocket message + */ + public void flushDeviceUpdates() { + synchronized (messages) { + for (Map.Entry> batchForUser : messages.entrySet()) { + broadcast(batchForUser.getKey(), batchForUser.getValue().values()); + batchForUser.getValue().clear(); + } + } + } + + /** + * Given a collection of messages and a user, broadcasts that message in json to all + * associated clients * - * @param message the message to send + * @param messages the message batch to send * @param u the user to which to send the message */ - public void broadcast(Object message, User u) { + private void broadcast(User u, Collection messages) { + if (messages.isEmpty()) return; final HashSet sessions = new HashSet<>(authorizedClients.get(u)); for (Session s : sessions) { try { if (s.isOpen()) { - s.getBasicRemote().sendText(gson.toJson(message)); + s.getBasicRemote().sendText(gson.toJson(messages)); } else { authorizedClients.remove(u, s); }