001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2009-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2014 ForgeRock AS 026 */ 027package org.opends.server.replication.server; 028 029import java.io.IOException; 030 031import org.opends.server.api.DirectoryThread; 032import org.forgerock.i18n.slf4j.LocalizedLogger; 033import org.opends.server.replication.protocol.MonitorMsg; 034 035/** 036 * This thread regularly publishes monitoring information: 037 * - it sends monitoring messages regarding the direct topology (directly 038 * connected DSs and RSs) to the connected RSs 039 * - it sends monitoring messages regarding the whole topology (also includes 040 * the local RS) to the connected DSs 041 * Note: as of today, monitoring messages mainly contains the server state of 042 * the entities. 043 */ 044public class MonitoringPublisher extends DirectoryThread 045{ 046 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 047 048 /** The replication domain we send monitoring for. */ 049 private final ReplicationServerDomain domain; 050 051 /** Sleep time (in ms) before sending new monitoring messages. */ 052 private volatile long period; 053 054 private final Object shutdownLock = new Object(); 055 056 /** 057 * Create a monitoring publisher. 058 * @param replicationServerDomain The ReplicationServerDomain the monitoring 059 * publisher is for. 060 * @param period The sleep time to use 061 */ 062 public MonitoringPublisher(ReplicationServerDomain replicationServerDomain, 063 long period) 064 { 065 super("Replication server RS(" 066 + replicationServerDomain.getLocalRSServerId() 067 + ") monitor publisher for domain \"" 068 + replicationServerDomain.getBaseDN() + "\""); 069 070 this.domain = replicationServerDomain; 071 this.period = period; 072 } 073 074 /** 075 * Run method for the monitoring publisher. 076 */ 077 @Override 078 public void run() 079 { 080 if (logger.isTraceEnabled()) 081 { 082 logger.trace(getMessage("Monitoring publisher starting.")); 083 } 084 085 try 086 { 087 while (!isShutdownInitiated()) 088 { 089 // Send global topology information to peer DSs 090 final int senderId = domain.getLocalRSServerId(); 091 final MonitorMsg monitorMsg = 092 domain.createGlobalTopologyMonitorMsg(senderId, 0); 093 094 for (ServerHandler serverHandler : domain.getConnectedDSs().values()) 095 { 096 // send() can be long operation, check for shutdown between each calls 097 if (isShutdownInitiated()) 098 { 099 break; 100 } 101 try 102 { 103 serverHandler.send(monitorMsg); 104 } 105 catch (IOException e) 106 { 107 // Server is disconnecting ? Forget it 108 } 109 } 110 111 synchronized (shutdownLock) 112 { 113 // double check to ensure the call to notify() was not missed 114 if (!isShutdownInitiated()) 115 { 116 shutdownLock.wait(period); 117 } 118 } 119 } 120 } 121 catch (InterruptedException e) 122 { 123 logger.trace(getMessage( 124 "Monitoring publisher has been interrupted while sleeping.")); 125 } 126 127 logger.trace(getMessage("Monitoring publisher is terminated.")); 128 } 129 130 131 132 /** 133 * Stops the thread. 134 */ 135 public void shutdown() 136 { 137 initiateShutdown(); 138 synchronized (shutdownLock) 139 { 140 shutdownLock.notifyAll(); 141 } 142 if (logger.isTraceEnabled()) 143 { 144 logger.trace(getMessage("Shutting down monitoring publisher.")); 145 } 146 } 147 148 /** 149 * Waits for thread death. If not terminated within 2 seconds, 150 * forces interruption 151 */ 152 public void waitForShutdown() 153 { 154 try 155 { 156 // Here, "this" is the monitoring publisher thread 157 join(2000); 158 } 159 catch (InterruptedException e) 160 { 161 // exit the loop if this thread is interrupted. 162 } 163 } 164 165 /** 166 * Sets the period value. 167 * @param period The new period value. 168 */ 169 public void setPeriod(long period) 170 { 171 if (logger.isTraceEnabled()) 172 { 173 logger.trace(getMessage( 174 "Monitoring publisher changing period value to " + period)); 175 } 176 177 this.period = period; 178 } 179 180 private String getMessage(String message) 181 { 182 return "In RS " + domain.getLocalRSServerId() + ", for base dn " 183 + domain.getBaseDN() + ": " + message; 184 } 185}