MessageHandlersTest.java
/*
* Copyright (c) 2013, 2017 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.tyrus.test.standard_config;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.DeploymentException;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.core.Utils;
import org.glassfish.tyrus.server.Server;
import org.glassfish.tyrus.test.tools.TestContainer;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/**
* @author Pavel Bucek (pavel.bucek at oracle.com)
*/
public class MessageHandlersTest extends TestContainer {
@ServerEndpoint("/whole1")
public static class WholeString {
@OnMessage
public String onMessage(String message) {
return message;
}
}
@ServerEndpoint("/partial1")
public static class PartialString {
private StringBuffer sb = new StringBuffer();
@OnMessage
public void onMessage(Session session, String message, boolean isLast) throws IOException {
sb.append(message);
if (isLast) {
final String completeMessage = sb.toString();
sb = new StringBuffer();
session.getBasicRemote().sendText(completeMessage);
}
}
}
@Test
public void clientWholeServerWhole() throws DeploymentException {
Server server = startServer(WholeString.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
if (message.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendText("In my experience, there's no such thing as luck.");
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(WholeString.class));
messageLatch.await(5, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientWholeServerPartial() throws DeploymentException {
Server server = startServer(PartialString.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
if (message.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendText("In my experience, there's no such thing as luck.");
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(PartialString.class));
messageLatch.await(5, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientPartialServerWhole() throws DeploymentException {
Server server = startServer(WholeString.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
if (message.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendText("In my experience", false);
session.getBasicRemote().sendText(", there's no such ", false);
session.getBasicRemote().sendText("thing as luck.", true);
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(WholeString.class));
messageLatch.await(5, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientPartialServerPartial() throws DeploymentException {
Server server = startServer(PartialString.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
if (message.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendText("In my experience", false);
session.getBasicRemote().sendText(", there's no such ", false);
session.getBasicRemote().sendText("thing as luck.", true);
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(PartialString.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@ServerEndpoint("/whole2")
public static class WholeByteArray {
@OnMessage
public byte[] onMessage(byte[] message) {
return message;
}
}
@ServerEndpoint("/partial2")
public static class PartialByteArray {
private List<byte[]> buffer = new ArrayList<byte[]>();
@OnMessage
public void onMessage(Session session, byte[] message, boolean isLast) {
buffer.add(message);
if (isLast) {
try {
ByteBuffer b = null;
for (byte[] bytes : buffer) {
if (b == null) {
b = ByteBuffer.wrap(bytes);
} else {
b = joinBuffers(b, ByteBuffer.wrap(bytes));
}
}
session.getBasicRemote().sendBinary(b);
} catch (IOException e) {
//
}
buffer.clear();
}
}
public static ByteBuffer joinBuffers(ByteBuffer bb1, ByteBuffer bb2) {
final int remaining1 = bb1.remaining();
final int remaining2 = bb2.remaining();
byte[] array = new byte[remaining1 + remaining2];
bb1.get(array, 0, remaining1);
System.arraycopy(bb2.array(), 0, array, remaining1, remaining2);
ByteBuffer buf = ByteBuffer.wrap(array);
buf.limit(remaining1 + remaining2);
return buf;
}
}
@ServerEndpoint("/whole3")
public static class WholeByteBuffer {
@OnMessage
public byte[] onMessage(byte[] message) {
return message;
}
}
@ServerEndpoint("/partial3")
public static class PartialByteBuffer {
private List<byte[]> buffer = new ArrayList<byte[]>();
@OnMessage
public void onMessage(Session session, ByteBuffer message, boolean isLast) {
buffer.add(message.array());
if (isLast) {
try {
ByteBuffer b = null;
for (byte[] bytes : buffer) {
if (b == null) {
b = ByteBuffer.wrap(bytes);
} else {
b = PartialByteArray.joinBuffers(b, ByteBuffer.wrap(bytes));
}
}
session.getBasicRemote().sendBinary(b);
} catch (IOException e) {
//
}
buffer.clear();
}
}
}
private CountDownLatch messageLatch;
@Test
public void clientWholeServerWholeByteArray() throws DeploymentException {
Server server = startServer(WholeByteArray.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] message) {
if (new String(message).equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendBinary(
ByteBuffer.wrap("In my experience, there's no such thing as luck.".getBytes()));
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(WholeByteArray.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientWholeServerPartialByteArray() throws DeploymentException {
Server server = startServer(PartialByteArray.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] message) {
if (new String(message).equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendBinary(
ByteBuffer.wrap("In my experience, there's no such thing as luck.".getBytes()));
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(PartialByteArray.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientPartialServerWholeByteArray() throws DeploymentException {
Server server = startServer(WholeByteArray.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] message) {
if (new String(message).equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendBinary(ByteBuffer.wrap("In my experience".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(", there's no such ".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap("thing as luck.".getBytes()), true);
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(WholeByteArray.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientPartialServerPartialByteArray() throws DeploymentException {
Server server = startServer(PartialByteArray.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
final CountDownLatch latch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] message) {
if (new String(message).equals("In my experience, there's no such thing as luck.")) {
latch.countDown();
}
}
});
try {
session.getBasicRemote().sendBinary(ByteBuffer.wrap("In my experience".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(", there's no such ".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap("thing as luck.".getBytes()), true);
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(PartialByteArray.class));
latch.await(1, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientWholeServerWholeByteBuffer() throws DeploymentException {
Server server = startServer(WholeByteBuffer.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
if (new String(message.array())
.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendBinary(
ByteBuffer.wrap("In my experience, there's no such thing as luck.".getBytes()));
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(WholeByteBuffer.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientWholeServerPartialByteBuffer() throws DeploymentException {
Server server = startServer(PartialByteBuffer.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
if (new String(message.array())
.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendBinary(
ByteBuffer.wrap("In my experience, there's no such thing as luck.".getBytes()));
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(PartialByteBuffer.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientPartialServerWholeByteBuffer() throws DeploymentException {
Server server = startServer(WholeByteBuffer.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
if (new String(message.array())
.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendBinary(ByteBuffer.wrap("In my experience".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(", there's no such ".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap("thing as luck.".getBytes()), true);
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(WholeByteBuffer.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientPartialServerPartialByteBuffer() throws DeploymentException {
Server server = startServer(PartialByteBuffer.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
if (new String(message.array())
.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendBinary(ByteBuffer.wrap("In my experience".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(", there's no such ".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap("thing as luck.".getBytes()), true);
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(PartialByteBuffer.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@ServerEndpoint("/clientPartialText")
public static class ClientPartialText {
@OnMessage
public void onMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText("In my experience", false);
session.getBasicRemote().sendText(", there's no such ", false);
session.getBasicRemote().sendText("thing as luck.", true);
}
}
@ServerEndpoint("/clientPartialBinary")
public static class ClientPartialBinary {
@OnMessage
public void onMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendBinary(ByteBuffer.wrap("In my experience".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(", there's no such ".getBytes()), false);
session.getBasicRemote().sendBinary(ByteBuffer.wrap("thing as luck.".getBytes()), true);
}
}
@Test
public void clientReceivePartialTextAsWhole() throws DeploymentException {
Server server = startServer(ClientPartialText.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
if (message.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendText("In my experience, there's no such thing as luck.");
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(ClientPartialText.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientReceivePartialBinaryAsWhole() throws DeploymentException {
Server server = startServer(ClientPartialBinary.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
if (message.equals(ByteBuffer.wrap("In my experience, there's no such thing as luck."
.getBytes()))) {
messageLatch.countDown();
}
}
});
try {
session.getBasicRemote().sendText("In my experience, there's no such thing as luck.");
} catch (IOException e) {
// don't care
}
}
}, cec, getURI(ClientPartialBinary.class));
messageLatch.await(1, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@ServerEndpoint("/reader")
public static class WholeReader {
public static CountDownLatch receivedMessageLatch = new CountDownLatch(1);
@OnMessage
public void onMessage(Session session, Reader reader) throws IOException {
receivedMessageLatch.countDown();
StringBuilder sb = new StringBuilder();
int i;
while ((i = reader.read()) != -1) {
sb.append((char) i);
}
reader.close();
session.getBasicRemote().sendText(sb.toString());
}
}
@Test
public void clientPartialServerWholeReader() throws DeploymentException {
Server server = startServer(WholeReader.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(2);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
boolean first = true;
@Override
public void onOpen(final Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
System.out.println("Client received message: " + message);
if (message.equals("In my experience, there's no such thing as luck.")) {
messageLatch.countDown();
}
if (first) {
try {
WholeReader.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendText("In my experience", false);
WholeReader.receivedMessageLatch.await(1, TimeUnit.SECONDS);
WholeReader.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendText(", there's no such ", false);
WholeReader.receivedMessageLatch.await(1, TimeUnit.SECONDS);
WholeReader.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendText("thing as luck.", true);
} catch (Exception e) {
e.printStackTrace();
}
first = false;
}
}
});
try {
session.getBasicRemote().sendText("In my experience", false);
WholeReader.receivedMessageLatch.await(1, TimeUnit.SECONDS);
WholeReader.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendText(", there's no such ", false);
WholeReader.receivedMessageLatch.await(1, TimeUnit.SECONDS);
WholeReader.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendText("thing as luck.", true);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onError(Session session, Throwable thr) {
thr.printStackTrace();
}
}, cec, getURI(WholeReader.class));
messageLatch.await(3, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@ServerEndpoint("/inputstream")
public static class WholeInputStream {
public static CountDownLatch receivedMessageLatch = new CountDownLatch(1);
@OnMessage
public void onMessage(Session session, InputStream is) throws IOException {
receivedMessageLatch.countDown();
ArrayList<Byte> bytes = new ArrayList<Byte>();
int i;
while ((i = is.read()) != -1) {
bytes.add((byte) i);
}
byte[] result = new byte[bytes.size()];
for (int j = 0; j < bytes.size(); j++) {
result[j] = bytes.get(j);
}
is.close();
session.getBasicRemote().sendBinary(ByteBuffer.wrap(result));
}
}
@Test
public void clientPartialServerWholeInputStream() throws DeploymentException {
Server server = startServer(WholeInputStream.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(2);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
boolean first = true;
byte[] buf1 = {1, 2, 3};
byte[] buf2 = {4, 5, 6};
byte[] buf3 = {7, 8, 9};
byte[] result = {1, 2, 3, 4, 5, 6, 7, 8, 9};
@Override
public void onOpen(final Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] message) {
System.out.println("Client received message: " + Utils.toString(message));
assertArrayEquals(result, message);
messageLatch.countDown();
if (first) {
try {
WholeInputStream.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf1), false);
WholeInputStream.receivedMessageLatch.await(1, TimeUnit.SECONDS);
WholeInputStream.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf2), false);
WholeInputStream.receivedMessageLatch.await(1, TimeUnit.SECONDS);
WholeInputStream.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf3), true);
} catch (Exception e) {
e.printStackTrace();
}
first = false;
}
}
});
try {
WholeInputStream.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf1), false);
WholeInputStream.receivedMessageLatch.await(1, TimeUnit.SECONDS);
WholeInputStream.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf2), false);
WholeInputStream.receivedMessageLatch.await(1, TimeUnit.SECONDS);
WholeInputStream.receivedMessageLatch = new CountDownLatch(1);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf3), true);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onError(Session session, Throwable thr) {
thr.printStackTrace();
}
}, cec, getURI(WholeInputStream.class));
messageLatch.await(3, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
@Test
public void clientSendStreamServerWholeInputStream() throws DeploymentException {
Server server = startServer(WholeInputStream.class);
try {
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
messageLatch = new CountDownLatch(1);
ClientManager client = createClient();
client.connectToServer(new Endpoint() {
byte[] buf1 = {1, 2, 3};
byte[] buf2 = {4, 5, 6};
byte[] buf3 = {7, 8, 9};
byte[] result = {1, 2, 3, 4, 5, 6, 7, 8, 9};
@Override
public void onOpen(final Session session, EndpointConfig EndpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] message) {
for (int i = 0; i < result.length; i++) {
assertEquals(result[i], message[i]);
}
messageLatch.countDown();
}
});
try {
final OutputStream sendStream = session.getBasicRemote().getSendStream();
sendStream.write(buf1);
sendStream.write(buf2);
sendStream.write(buf3);
sendStream.flush();
sendStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onError(Session session, Throwable thr) {
thr.printStackTrace();
}
}, cec, getURI(WholeInputStream.class));
messageLatch.await(3, TimeUnit.SECONDS);
assertEquals(0, messageLatch.getCount());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
} finally {
stopServer(server);
}
}
}