SelfStompClient.groovy
import com.alibaba.fastjson.JSON
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.messaging.simp.stomp.StompFrameHandler
import org.springframework.messaging.simp.stomp.StompHeaders
import org.springframework.messaging.simp.stomp.StompSession
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter
import org.springframework.util.concurrent.ListenableFuture
import org.springframework.web.socket.WebSocketHttpHeaders
import org.springframework.web.socket.client.standard.StandardWebSocketClient
import org.springframework.web.socket.messaging.WebSocketStompClient
import org.springframework.web.socket.sockjs.client.SockJsClient
import org.springframework.web.socket.sockjs.client.Transport
import org.springframework.web.socket.sockjs.client.WebSocketTransport
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec
import java.lang.reflect.Type
import java.util.concurrent.ExecutionException
class SelfStompClient {
static Logger logger = LoggerFactory.getLogger("Test")
private final static WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
private websocketUrl
SelfStompClient(wsUrl){
this.websocketUrl = wsUrl
}
private class MyHandler extends StompSessionHandlerAdapter {
public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
logger.info("Now connected");
}
}
public ListenableFuture<StompSession> connect() {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
return stompClient.connect(websocketUrl, headers, new MyHandler());
}
public void subscribeGreetings(String topic, StompSession stompSession, Closure closure) throws ExecutionException, InterruptedException {
stompSession.subscribe(topic, new StompFrameHandler() {
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}
public void handleFrame(StompHeaders stompHeaders, Object content) {
String value = new String((byte[]) content)
logger.info("content" + value)
def contentJson = JSON.parse(value)
closure.call(contentJson)
// logger.info("aaaa" + s.data.robotStates)
}
});
}
@Test
void testSelfStompClient(){
def websocketUrl = "ws://0.0.0.0:9500/ws"
String topic = "/topic/robot"
def robotStates
SelfStompClient selfStompClient = new SelfStompClient(websocketUrl)
StompSession session = selfStompClient.connect().get()
selfStompClient.subscribeGreetings(topic, session){ contentJson->
robotStates = contentJson.data.robotStates
}
Tools.waitForResult(10, 1000){
robotStates == 1
}
}
}