53066191
8/16/2018 - 10:05 AM

SelfStompClient.groovy

SelfStompClient.groovy


import com.alibaba.fastjson.JSON
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.messaging.simp.stomp.StompFrameHandler
import org.springframework.messaging.simp.stomp.StompHeaders
import org.springframework.messaging.simp.stomp.StompSession
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter
import org.springframework.util.concurrent.ListenableFuture
import org.springframework.web.socket.WebSocketHttpHeaders
import org.springframework.web.socket.client.standard.StandardWebSocketClient
import org.springframework.web.socket.messaging.WebSocketStompClient
import org.springframework.web.socket.sockjs.client.SockJsClient
import org.springframework.web.socket.sockjs.client.Transport
import org.springframework.web.socket.sockjs.client.WebSocketTransport
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec

import java.lang.reflect.Type
import java.util.concurrent.ExecutionException

class SelfStompClient {
    static Logger logger = LoggerFactory.getLogger("Test")
    private final static WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
    private  websocketUrl

    SelfStompClient(wsUrl){
        this.websocketUrl = wsUrl
    }

    private class MyHandler extends StompSessionHandlerAdapter {
        public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
            logger.info("Now connected");
        }
    }
    public ListenableFuture<StompSession> connect() {

        Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
        List<Transport> transports = Collections.singletonList(webSocketTransport);

        SockJsClient sockJsClient = new SockJsClient(transports);
        sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());

        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);

        return stompClient.connect(websocketUrl, headers, new MyHandler());
    }

    public void subscribeGreetings(String topic, StompSession stompSession, Closure closure) throws ExecutionException, InterruptedException {
        stompSession.subscribe(topic, new StompFrameHandler() {

            public Type getPayloadType(StompHeaders stompHeaders) {
                return byte[].class;
            }

            public void handleFrame(StompHeaders stompHeaders, Object content) {
                String value = new String((byte[]) content)
                logger.info("content" + value)
                def  contentJson =  JSON.parse(value)
                closure.call(contentJson)
//                logger.info("aaaa" + s.data.robotStates)
            }


        });
    }
    
    @Test
    void testSelfStompClient(){
        def websocketUrl = "ws://0.0.0.0:9500/ws"
        String topic = "/topic/robot"

        def robotStates

        SelfStompClient selfStompClient = new SelfStompClient(websocketUrl)
        StompSession session = selfStompClient.connect().get()

        selfStompClient.subscribeGreetings(topic, session){ contentJson->
            robotStates = contentJson.data.robotStates
        }

        Tools.waitForResult(10, 1000){
            robotStates == 1
        }

    }
}