Implementing a Service Registry using Websockets
In this post I will explain how to create a service registry using websockets.
The final result is published in this github repository.
Outline
- Initial structure
- Generate server & client projects
- Implementation of the server
- Implementation of the client
- Conclusion
Initial structure (Aggregator)
First of all, let’s start by creating our project structure, which is a simple multi-module maven
project
websocket-service-registry
├── client
├── pom.xml
└── server
pom.xml
is only aggregator configuration for server and client apps.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ferdisonmezay.websocket</groupId>
<artifactId>service-rigistry</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>websocket-service-registry</name>
<description>Websocket Service Registry</description>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<modules>
<module>client</module>
<module>server</module>
</modules>
</project>
Generate server & client projects
To keep it simple we will generate server
and client
projects using spring initializr.
For both applications we will use same configurations and dependencies:
- Project: Maven
- Language: Java
- Spring Boot: 2.6.1
- Packaging: Jar
- Java Jvm Version: 17
- Dependencies: WebSocket, Lombok
If you want to check the configurations on spring initializr and download applications yourself this link is for server, and this is for client.
Imlementation of the server
Clients will regester themselves to our server application via websockets, and they will send the updates of their fields to the server.
Server stores websocket sessions and information about clients. When a client gets connected or disconnected server publishes the latest information with manager-ui
, a frontend application, to manage and see connected clients.
The structure of the server application will be as follows:
# src/main/java/com/ferdisonmezay/websocket/server
├── configuration
│ └── WebsocketConfig.java
├── controller
│ └── WebsocketController.java
├── dto
│ └── ClientDto.java
├── listeners
│ ├── SessionDisconnectEventListener.java
│ └── SessionSubscribeEventListener.java
├── ServerApplication.java # main class
└── service
└── ClientService.java
1. Create websocket configuration file:
package com.ferdisonmezay.websocket.server.configuration;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/client");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/server").withSockJS();
}
}
Now let’s create event listeners for the websocket. We will have only two listeners for the simplicity of this application:
- Subscribe Event Listener, to register new clients
- Session Disconnect Event Listener, to remove disconnected clients
Please note that ClientService.java dependency is also provided after the listeners.
2. Create SessionSubscribeEventListener:
This listener will register new client when they subscribe to a topic, and usind ClientService we will publish current client list with the frontend.
package com.ferdisonmezay.websocket.server.listeners;
import com.ferdisonmezay.websocket.server.service.ClientService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {
private final ClientService clientService;
public SessionSubscribeEventListener(ClientService clientService) {
this.clientService = clientService;
}
@Override
public void onApplicationEvent(SessionSubscribeEvent event) {
MessageHeaders headers = event.getMessage().getHeaders();
String sessionId = (String) headers.get("simpSessionId");
Map<String, Object> nativeHeaders = (Map<String, Object>) headers.get("nativeHeaders");
List<String> destinations = (List<String>) nativeHeaders.get("destination");
String clientId = destinations.get(0).substring(8);
log.info("Received subscribe event, clientId: {}, sessionId: {}", clientId, sessionId);
clientService.registerNewClient(sessionId, clientId);
clientService.publishClientList();
}
}
3. Create SessionDisconnectEventListener:
This listener will remove client when they disconnect from the server, and usind ClientService we will publish current client list with the frontend.
package com.ferdisonmezay.websocket.server.listeners;
import com.ferdisonmezay.websocket.server.service.ClientService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
@Slf4j
@Component
public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {
private final ClientService clientService;
public SessionDisconnectEventListener(ClientService clientService) {
this.clientService = clientService;
}
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
log.info("Received disconnect event for session {}", event.getSessionId());
clientService.removeClient(event.getSessionId());
clientService.publishClientList();
}
}
4. Create ClientService:
We will use this service class to implement our registery feature. Information about clients are stored in a HashMap with sessionId as key and ClientDto as value.
We will also publish messages to the clients using this service.
package com.ferdisonmezay.websocket.server.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ferdisonmezay.websocket.server.dto.ClientDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
@Slf4j
@Service
public class ClientService {
private final Map<String, ClientDto> clientRegistry = new HashMap<>();
private final SimpMessagingTemplate messageTemplate;
private final ObjectMapper objectMapper;
public ClientService(SimpMessagingTemplate messageTemplate, ObjectMapper objectMapper) {
this.messageTemplate = messageTemplate;
this.objectMapper = objectMapper;
}
public void publishToClient(String client, String data) {
messageTemplate.convertAndSend(createDestination(client), data);
}
public void registerNewClient(String connectionSessionId, String clientId) {
clientRegistry.put(connectionSessionId, new ClientDto(clientId));
log.info("Client registered, total number of clients: {}", clientRegistry.size());
}
public void removeClient(String connectionSessionId) {
log.info("Removing client with id {}", clientRegistry.get(connectionSessionId).getClientId());
clientRegistry.remove(connectionSessionId);
}
public void publishClientList() {
String data;
try {
data = objectMapper.writeValueAsString(new ArrayList<>(clientRegistry.values()));
publishToClient("manager-ui", data);
} catch (Exception e) {
log.error("Error creating json!");
}
}
public ClientDto getClientBySessionId(String sessionId) {
if (!clientRegistry.containsKey(sessionId)) {
throw new NoSuchElementException();
}
return clientRegistry.get(sessionId);
}
public ClientDto getClientByClientId(String clientId) {
return clientRegistry.values().stream()
.filter(i -> i.getClientId().equals(clientId))
.findAny().orElseThrow(NoSuchElementException::new);
}
public String createDestination(String client) {
return String.join("/", "", "client", client);
}
}
5. Create WebsocketController :
This class is responsible to receive websocket messages. There are two endpoints in this controller:
- the first one is
/{clientId}
endpoint, which is responsible for receiving commands frommanager-ui
and send it to the client. - the second one is
/update
endpoint, which receives message from clients and updates information stored in HashMap for that client.
package com.ferdisonmezay.websocket.server.controller;
import com.ferdisonmezay.websocket.server.dto.ClientDto;
import com.ferdisonmezay.websocket.server.service.ClientService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.stereotype.Controller;
import java.util.NoSuchElementException;
@Slf4j
@Controller
public class WebsocketController {
private final ClientService clientService;
public WebsocketController(ClientService clientService) {
this.clientService = clientService;
}
@MessageMapping("/{clientId}")
public void processMessageFromClient(@Payload String message, @DestinationVariable String clientId) {
log.info("Received: {} for client: {}", message, clientId);
try {
ClientDto clientDto = clientService.getClientByClientId(clientId);
clientService.publishToClient(clientDto.getClientId(), message);
}
catch (NoSuchElementException e) {
log.error("Client {} could not be found!", clientId);
}
}
@MessageMapping("/update")
public void updateClientStatus(@Payload String message, SimpMessageHeaderAccessor headerAccessor) {
String sessionId = (String) headerAccessor.getHeader("simpSessionId");
try {
ClientDto clientDto = clientService.getClientBySessionId(sessionId);
clientDto.setActive(Boolean.parseBoolean(message));
clientService.publishClientList();
log.info("Received: {} from client: {}", message, clientDto.getClientId());
}
catch (NoSuchElementException e) {
log.error("Session {} could not be found!", sessionId);
}
}
}
Now our server application is ready to receive connections, and we’re done with java for the server application.
The only missing part for the server is to create the ‘manager-ui’ frontend application.
6. Create manager-ui
frontend application:
Now it’s time to create the frontend application where we will see the connected clients, and send commands to them.
File structure of the frontend application will be as follows:
# /websocket-service-registry/server/src/main/resources
static
├── assets
│ ├── css
│ │ ├── bootstrap.min.css
│ │ ├── bootstrap.min.css.map
│ │ └── main.css #overrede some css
│ └── js
│ ├── app.js # our main frontend application
│ ├── bootstrap.bundle.min.js # other dependencies
│ ├── jquery.min.js
│ ├── sockjs.min.js
│ └── stomp.min.js
└── index.html
To keep it simple I will only add index.html
and app.js
files.
Index html only contains necessary js and css files. does not have anything special.
<!-- index.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket Service Registry Management UI</title>
<link rel="stylesheet" type="text/css" href="./assets/css/bootstrap.min.css">
<link rel="stylesheet" type="text/css" href="./assets/css/main.css">
<script src="./assets/js/jquery.min.js"></script>
<script src="./assets/js/sockjs.min.js"></script>
<script src="./assets/js/stomp.min.js"></script>
<script src="./assets/js/bootstrap.bundle.min.js"></script>
<script src="./assets/js/app.js"></script>
</head>
<body>
<nav class="navbar navbar-light bg-white">
<div class="container py-2">
<a class="navbar-brand" href="/">Websocket Client Manager</a>
<div>
<button id="disconnect" class="btn btn-danger" type="submit" disabled="disabled">Disconnect</button>
<button id="connect" class="btn btn-outline-success ml-3" type="submit">Connect</button>
</div>
</div>
</nav>
<div id="client-container" class="container mt-5">
<h3 class="mb-4">Clients</h3>
<div class="row" id="clients"></div>
</div>
</body>
</html>
app.js file is the file where we manage the clients and connections. For each client connected to the server, we will have one box, which contains information about the connected client and a form to send a command.
In the UI application we will be able to send commands:
- directly to the client by using
Send > Direct to Client
- through server by using
Send > Through Server
command.
In the second case manager ui will send the message/command to the server and server will process/forward that command to the client.
Again for the simplicity we will only have one boolean property (status) in each client and one command INVERT
. Whenever client receives an ‘INVERT’ command it will change its state.
//app.js
var stompClient = null;
$(function () {
setConnected(false);
$("#connect").click(function() {
connectToWebsocket();
});
$("#disconnect").click(function() {
disconnect();
});
});
function setConnected(connected) {
$("#connect").prop("disabled", connected);
$("#disconnect").prop("disabled", !connected);
if (connected) {
$("#client-container").show();
updateButtonParams('#connect', 'Connected!', 'btn-success', 'btn-outline-success');
updateButtonParams('#disconnect', 'Disconnect', 'btn-outline-danger', 'btn-danger');
}
else {
$("#client-container").hide();
updateButtonParams('#connect', 'Connect', 'btn-outline-success', 'btn-success');
updateButtonParams('#disconnect', 'Disconnected!', 'btn-danger', 'btn-outline-danger');
}
$("#messages").html("");
}
function updateButtonParams(elementId, buttonText, classToRemove, classToAdd) {
$(elementId).html(buttonText);
$(elementId).removeClass(classToRemove);
$(elementId).addClass(classToAdd);
}
function connectToWebsocket() {
var endpoint = 'http://localhost:8080/server'; // WebsocketConfig
var socket = new SockJS(endpoint);
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
setConnected(true);
console.log('Connected: ' + frame);
stompClient.subscribe('/client/manager-ui', function (data) { // ClientService.publishClientList
buildClientList(data.body);
});
});
}
function buildClientList(_clients) {
var clients = JSON.parse(_clients);
$("#clients").empty();
for(var i = 0; i < clients.length; i++) {
var client = clients[i];
var clientCard = createClientCard(i+1, client.clientId, client.active, client.connectionSessionId);
$("#clients").append(clientCard);
}
}
function createClientCard(index, clientId, status, sessionId) {
if(clientId === 'manager-ui') {
return '<div class="col-12 col-sm-6 col-md-4 p-3">'+
'<div class="h-100 p-4 shadow rounded-3 bg-warning bg-opacity-50 bg-gradient">'+
clientId +
'<p class="text-muted">This application</p>'+
'</div>'+
'</div>';
}
return '<div class="col-12 col-sm-6 col-md-4 p-3">'+
'<div class="h-100 bg-white p-4 shadow rounded-3">'+
'<h5 class="card-title text-bold">Client ' + index + '</h5>'+
'<p class="card-subtitle mb-2 text-muted">' + clientId + '</p>'+
'<p class="card-text mt-4"><strong>Status: </strong>'+ status +'</p>'+
'<div class="input-group">'+
'<input id="client'+index+'" type="text" class="form-control form-control-sm" value="INVERT" >'+
'<button class="btn btn-sm btn-outline-primary dropdown-toggle" type="button" data-bs-toggle="dropdown" aria-expanded="false">Send</button>'+
'<ul class="dropdown-menu dropdown-menu-end">'+
'<li><a class="dropdown-item" href="#" onClick=\'sendCommand(\"'+clientId+'\", \"#client' + index + '\")\'>Direct to Client</a></li>'+
'<li><a class="dropdown-item" href="#" onClick=\'sendCommandThroughServer(\"'+clientId+'\", \"#client' + index + '\")\'>Through Server</a></li>'+
'</ul>'+
'</div>'+
'</div>'+
'</div>';
}
function sendCommand(clientId, inputId) {
var command = $(inputId).val();
stompClient.send('/client/'+clientId, {}, command);
}
function sendCommandThroughServer(clientId, inputId) {
var command = $(inputId).val();
stompClient.send('/app/'+clientId, {}, command);
}
function disconnect() {
if (stompClient !== null) {
stompClient.disconnect();
}
setConnected(false);
buildClientList('[]');
}
Completed server application source code can be found here
Imlementation of the client
Client application is a simple spring boot application, where we connnect to server and notify the server about our updates.
Since we will be running multiple instances of the client application, the first thing I want to do is to change the port number to -1
, which will pick an available random port.
#client/application.properties
server.port=-1
I also want to check the connection with the server and connect if the client is not connected, so I will use @EnableScheduling
annotation to execute scheduled function.
package com.ferdisonmezay.websocket.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class ClientApplication {
public static void main(String[] args) {
SpringApplication.run(ClientApplication.class, args);
}
}
We will have only one class to manage websocket connection to server. For simlisity, we will generate a UUID for each connection, and regirter the client with that UUID to the server.
package com.ferdisonmezay.websocket.client.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@Slf4j
@Service
public class WebsocketClient {
private static final String SERVER_URL = "ws://localhost:8080/server";
private static boolean connectedToServer = false;
private static boolean enabled = false; //we will change value of this variable using management UI
@Scheduled(fixedDelay = (5 * 1000)) // 5 seconds
public void scheduledConnectionCheck() {
log.info("Scheduled check for connection! Current value of 'enabled' flag is {}", enabled);
if(!connectedToServer) {
connectToServer();
}
}
public void connectToServer() {
log.info("Trying to connect to the server!");
String clientId = UUID.randomUUID().toString();
StompSession stompSession;
try {
stompSession = connect().get();
subscribe(stompSession, clientId);
connectedToServer = true;
log.info("Connection established to {}", SERVER_URL);
} catch (Exception e) {
connectedToServer = false;
log.error("Failed to connect to {}", SERVER_URL);
}
}
private void subscribe(StompSession stompSession, String clientId) {
stompSession.subscribe("/client/" + clientId, new StompFrameHandler() {
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}
@Override
public void handleFrame(StompHeaders stompHeaders, Object o) {
String command = new String((byte[]) o);
log.info("Received: {}", command);
if (command.equals("INVERT")) {
enabled = !enabled;
notifyServer(stompSession);
}
}
});
}
private void notifyServer(StompSession stompSession) {
stompSession.send("/app/update", (enabled ? "true" : "false").getBytes(StandardCharsets.UTF_8));
}
private ListenableFuture<StompSession> connect() {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
return stompClient.connect(SERVER_URL, new WebSocketHttpHeaders(), new CustomHandler());
}
private static class CustomHandler extends StompSessionHandlerAdapter {
@Override
public void handleTransportError(StompSession session, Throwable exception) {
log.warn("Connection closed!");
connectedToServer = false;
super.handleTransportError(session, exception);
}
}
}
Completed client application source code can be found here
Conclusion
In my current project, we have recently migrated around 10M users.
By using this approach, we were able to control our migration instances and migration velocity by managing those clients during the migration.
I hope this application can solve some of your problems or at least it gives you some idia about websockets in spring-boot.
Link to the project at github is here
Stay tuned…