001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server; 028 029import java.net.SocketException; 030 031import org.forgerock.i18n.LocalizableMessage; 032import org.opends.server.api.DirectoryThread; 033import org.forgerock.i18n.slf4j.LocalizedLogger; 034import org.opends.server.replication.common.ServerStatus; 035import org.opends.server.replication.protocol.*; 036 037import static org.opends.messages.ReplicationMessages.*; 038import static org.opends.server.replication.common.ServerStatus.*; 039import static org.opends.server.util.StaticUtils.*; 040 041/** 042 * This class implement the part of the replicationServer that is reading 043 * the connection from the LDAP servers to get all the updates that 044 * were done on this replica and forward them to other servers. 045 * 046 * A single thread is dedicated to this work. 047 * It waits in a blocking mode on the connection from the LDAP server 048 * and upon receiving an update puts in into the replicationServer cache 049 * from where the other servers will grab it. 050 */ 051public class ServerReader extends DirectoryThread 052{ 053 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 054 private final Session session; 055 private final ServerHandler handler; 056 057 /** 058 * Constructor for the LDAP server reader part of the replicationServer. 059 * 060 * @param session 061 * The Session from which to read the data. 062 * @param handler 063 * The server handler for this server reader. 064 */ 065 public ServerReader(Session session, ServerHandler handler) 066 { 067 super("Replication server RS(" + handler.getReplicationServerId() 068 + ") reading from " + handler + " at " 069 + session.getReadableRemoteAddress()); 070 this.session = session; 071 this.handler = handler; 072 } 073 074 /** 075 * Create a loop that reads changes and hands them off to be processed. 076 */ 077 @Override 078 public void run() 079 { 080 if (logger.isTraceEnabled()) 081 { 082 logger.trace(getName() + " starting"); 083 } 084 /* 085 * wait on input stream 086 * grab all incoming messages and publish them to the 087 * replicationServerDomain 088 */ 089 LocalizableMessage errMessage = null; 090 try 091 { 092 while (true) 093 { 094 try 095 { 096 final ReplicationMsg msg = session.receive(); 097 098 if (logger.isTraceEnabled()) 099 { 100 logger.trace("In " + getName() + " receives " + msg); 101 } 102 103 if (msg instanceof AckMsg) 104 { 105 handler.checkWindow(); 106 handler.processAck((AckMsg) msg); 107 } 108 else if (msg instanceof UpdateMsg) 109 { 110 final UpdateMsg updateMsg = (UpdateMsg) msg; 111 if (!isUpdateMsgFiltered(updateMsg)) 112 { 113 handler.put(updateMsg); 114 } 115 } 116 else if (msg instanceof WindowMsg) 117 { 118 handler.updateWindow((WindowMsg) msg); 119 } 120 else if (msg instanceof MonitorRequestMsg) 121 { 122 handler.processMonitorRequestMsg((MonitorRequestMsg) msg); 123 } 124 else if (msg instanceof MonitorMsg) 125 { 126 handler.processMonitorMsg((MonitorMsg) msg); 127 } 128 else if (msg instanceof RoutableMsg) 129 { 130 /* 131 * Note that we handle monitor messages separately since they in 132 * fact never need "routing" and are instead sent directly between 133 * connected peers. Doing so allows us to more clearly decouple 134 * write IO from the reader thread (see OPENDJ-1354). 135 */ 136 handler.process((RoutableMsg) msg); 137 } 138 else if (msg instanceof ResetGenerationIdMsg) 139 { 140 handler.processResetGenId((ResetGenerationIdMsg) msg); 141 } 142 else if (msg instanceof WindowProbeMsg) 143 { 144 handler.replyToWindowProbe(); 145 } 146 else if (msg instanceof TopologyMsg) 147 { 148 ReplicationServerHandler rsh = (ReplicationServerHandler) handler; 149 rsh.receiveTopoInfoFromRS((TopologyMsg) msg); 150 } 151 else if (msg instanceof ChangeStatusMsg) 152 { 153 ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; 154 try 155 { 156 DataServerHandler dsh = (DataServerHandler) handler; 157 dsh.receiveNewStatus(csMsg); 158 } 159 catch (Exception e) 160 { 161 errMessage = ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get( 162 handler.getBaseDN(), handler.getServerId(), csMsg); 163 logger.error(errMessage); 164 } 165 } 166 else if (msg instanceof ChangeTimeHeartbeatMsg) 167 { 168 handler.process((ChangeTimeHeartbeatMsg) msg); 169 } 170 else if (msg instanceof StopMsg) 171 { 172 /* 173 * Peer server is properly disconnecting: go out of here to properly 174 * close the server handler going to finally block. 175 */ 176 if (logger.isTraceEnabled()) 177 { 178 logger.trace(handler 179 + " has properly disconnected from this replication server " 180 + handler.getReplicationServerId()); 181 } 182 return; 183 } 184 else if (msg == null) 185 { 186 /* 187 * The remote server has sent an unknown message, close the 188 * connection. 189 */ 190 errMessage = NOTE_READER_NULL_MSG.get(handler); 191 logger.info(errMessage); 192 return; 193 } 194 } 195 catch (NotSupportedOldVersionPDUException e) 196 { 197 /* 198 * Received a V1 PDU we do not need to support: we just trash the 199 * message and log the event for debug purpose, then continue 200 * receiving messages. 201 */ 202 logException(e); 203 } 204 } 205 } 206 catch (SocketException e) 207 { 208 /* 209 * The connection has been broken Log a message and exit from this loop So 210 * that this handler is stopped. 211 */ 212 logException(e); 213 if (!handler.shuttingDown()) 214 { 215 errMessage = handler.getBadlyDisconnectedErrorMessage(); 216 logger.error(errMessage); 217 } 218 } 219 catch (Exception e) 220 { 221 /* 222 * The remote server has sent an unknown message, close the connection. 223 */ 224 errMessage = NOTE_READER_EXCEPTION.get(handler, 225 stackTraceToSingleLineString(e)); 226 logger.info(errMessage); 227 } 228 finally 229 { 230 /* 231 * The thread only exits the loop above if some error condition happen. 232 * Attempt to close the socket and stop the server handler. 233 */ 234 if (logger.isTraceEnabled()) 235 { 236 logger.trace("In " + getName() + " closing the session"); 237 } 238 session.close(); 239 handler.doStop(); 240 if (logger.isTraceEnabled()) 241 { 242 logger.trace(getName() + " stopped: " + errMessage); 243 } 244 } 245 } 246 247 /** 248 * Returns whether the update message is filtered in one of those cases: 249 * <ul> 250 * <li>Ignore updates from DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS</li> 251 * <li>Ignore updates from RS with bad gen id</li> 252 * </ul> 253 */ 254 private boolean isUpdateMsgFiltered(UpdateMsg updateMsg) 255 { 256 if (handler.isDataServer()) 257 { 258 /** 259 * Ignore updates from DS in bad BAD_GENID_STATUS or 260 * FULL_UPDATE_STATUS 261 * 262 * The RSD lock should not be taken here as it is acceptable to 263 * have a delay between the time the server has a wrong status and 264 * the fact we detect it: the updates that succeed to pass during 265 * this time will have no impact on remote server. But it is 266 * interesting to not saturate uselessly the network if the 267 * updates are not necessary so this check to stop sending updates 268 * is interesting anyway. Not taking the RSD lock allows to have 269 * better performances in normal mode (most of the time). 270 */ 271 final ServerStatus dsStatus = handler.getStatus(); 272 if (dsStatus == BAD_GEN_ID_STATUS) 273 { 274 logger.warn(WARN_IGNORING_UPDATE_FROM_DS_BADGENID, 275 handler.getReplicationServerId(), updateMsg.getCSN(), 276 handler.getBaseDN(), handler.getServerId(), 277 session.getReadableRemoteAddress(), 278 handler.getGenerationId(), handler.getReferenceGenId()); 279 return true; 280 } 281 else if (dsStatus == FULL_UPDATE_STATUS) 282 { 283 logger.warn(WARN_IGNORING_UPDATE_FROM_DS_FULLUP, 284 handler.getReplicationServerId(), updateMsg.getCSN(), 285 handler.getBaseDN(), handler.getServerId(), 286 session.getReadableRemoteAddress()); 287 return true; 288 } 289 } 290 else 291 { 292 /** 293 * Ignore updates from RS with bad gen id 294 * (no system managed status for a RS) 295 */ 296 long referenceGenerationId = handler.getReferenceGenId(); 297 if (referenceGenerationId > 0 298 && referenceGenerationId != handler.getGenerationId()) 299 { 300 logger.error(WARN_IGNORING_UPDATE_FROM_RS, 301 handler.getReplicationServerId(), updateMsg.getCSN(), 302 handler.getBaseDN(), handler.getServerId(), 303 session.getReadableRemoteAddress(), 304 handler.getGenerationId(), referenceGenerationId); 305 return true; 306 } 307 } 308 return false; 309 } 310 311 private void logException(Exception e) 312 { 313 if (logger.isTraceEnabled()) 314 { 315 logger.trace("In " + getName() + " " + stackTraceToSingleLineString(e)); 316 } 317 } 318}