SessionMonitor.java

/*
 * Copyright (c) 2014, 2017 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.tyrus.ext.monitoring.jmx;

import java.util.concurrent.atomic.AtomicLong;

/**
 * Listens to message events and collects session-level statistics for sent and received messages. Creates and registers
 * {@link org.glassfish.tyrus.ext.monitoring.jmx.MessageStatisticsMXBean} MXBeans for text, binary control and all
 * messages which expose these statistics.
 *
 * @author Petr Janouch
 * @see org.glassfish.tyrus.core.monitoring.MessageEventListener
 */
class SessionMonitor extends BaseMonitor implements MessageListener {

    private final MessageStatistics sentTextMessageStatistics = new MessageStatistics();
    private final MessageStatistics sentBinaryMessageStatistics = new MessageStatistics();
    private final MessageStatistics sentControlMessageStatistics = new MessageStatistics();

    private final MessageStatistics receivedTextMessageStatistics = new MessageStatistics();
    private final MessageStatistics receivedBinaryMessageStatistics = new MessageStatistics();
    private final MessageStatistics receivedControlMessageStatistics = new MessageStatistics();

    private final String applicationName;
    private final String endpointPath;
    private final String sessionId;
    private final MessageListener messageListener;
    private final EndpointMXBeanImpl endpointMXBean;

    SessionMonitor(String applicationName, String endpointPath, String sessionId, MessageListener messageListener,
                   EndpointMXBeanImpl endpointMXBean) {
        this.applicationName = applicationName;
        this.endpointPath = endpointPath;
        this.sessionId = sessionId;
        this.messageListener = messageListener;
        this.endpointMXBean = endpointMXBean;

        MessageStatisticsMXBean textMessagesMXBean =
                new MessageStatisticsMXBeanImpl(sentTextMessageStatistics, receivedTextMessageStatistics);
        MessageStatisticsMXBean binaryMessagesMXBean =
                new MessageStatisticsMXBeanImpl(sentBinaryMessageStatistics, receivedBinaryMessageStatistics);
        MessageStatisticsMXBean controlMessagesMXBean =
                new MessageStatisticsMXBeanImpl(sentControlMessageStatistics, receivedControlMessageStatistics);

        MessageStatisticsAggregator sentMessagesTotal =
                new MessageStatisticsAggregator(sentTextMessageStatistics, sentBinaryMessageStatistics,
                                                sentControlMessageStatistics);
        MessageStatisticsAggregator receivedMessagesTotal =
                new MessageStatisticsAggregator(receivedTextMessageStatistics, receivedBinaryMessageStatistics,
                                                receivedControlMessageStatistics);
        SessionMXBeanImpl sessionMXBean =
                new SessionMXBeanImpl(sentMessagesTotal, receivedMessagesTotal, getErrorCounts(), textMessagesMXBean,
                                      binaryMessagesMXBean, controlMessagesMXBean, sessionId);

        endpointMXBean.putSessionMXBean(sessionId, sessionMXBean);
        MBeanPublisher
                .registerSessionMXBeans(applicationName, endpointPath, sessionId, sessionMXBean, textMessagesMXBean,
                                        binaryMessagesMXBean, controlMessagesMXBean);
    }

    void unregister() {
        MBeanPublisher.unregisterSessionMXBeans(applicationName, endpointPath, sessionId);
        endpointMXBean.removeSessionMXBean(sessionId);
    }

    @Override
    public void onTextMessageSent(long length) {
        sentTextMessageStatistics.onMessage(length);
        messageListener.onTextMessageSent(length);
    }

    @Override
    public void onBinaryMessageSent(long length) {
        sentBinaryMessageStatistics.onMessage(length);
        messageListener.onBinaryMessageSent(length);
    }

    @Override
    public void onControlMessageSent(long length) {
        sentControlMessageStatistics.onMessage(length);
        messageListener.onControlMessageSent(length);
    }

    @Override
    public void onTextMessageReceived(long length) {
        receivedTextMessageStatistics.onMessage(length);
        messageListener.onTextMessageReceived(length);
    }

    @Override
    public void onBinaryMessageReceived(long length) {
        receivedBinaryMessageStatistics.onMessage(length);
        messageListener.onBinaryMessageReceived(length);
    }

    @Override
    public void onControlMessageReceived(long length) {
        receivedControlMessageStatistics.onMessage(length);
        messageListener.onControlMessageReceived(length);
    }

    private static class MessageStatistics implements MessageStatisticsSource {

        /*
        volatile is enough in this case, because only one thread can sent or receive a message in a session
         */
        private final AtomicLong messagesCount = new AtomicLong(0);
        private volatile long messagesSize = 0;
        private volatile long minimalMessageSize = Long.MAX_VALUE;
        private volatile long maximalMessageSize = 0;

        void onMessage(long size) {
            messagesCount.incrementAndGet();
            messagesSize += size;
            if (minimalMessageSize > size) {
                minimalMessageSize = size;
            }
            if (maximalMessageSize < size) {
                maximalMessageSize = size;
            }
        }

        @Override
        public long getMessagesCount() {
            return messagesCount.get();
        }

        @Override
        public long getMessagesSize() {
            return messagesSize;
        }

        @Override
        public long getMinMessageSize() {
            if (minimalMessageSize == Long.MAX_VALUE) {
                return 0;
            }
            return minimalMessageSize;
        }

        @Override
        public long getMaxMessageSize() {
            return maximalMessageSize;
        }
    }
}