001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2014-2015 ForgeRock AS. 025 */ 026package org.opends.server.backends; 027 028import static org.opends.messages.BackendMessages.*; 029import static org.opends.messages.ReplicationMessages.*; 030import static org.opends.server.config.ConfigConstants.*; 031import static org.opends.server.replication.plugin.MultimasterReplication.*; 032import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; 033import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 034import static org.opends.server.util.LDIFWriter.*; 035import static org.opends.server.util.ServerConstants.*; 036import static org.opends.server.util.StaticUtils.*; 037 038import java.text.SimpleDateFormat; 039import java.util.Collection; 040import java.util.Collections; 041import java.util.Date; 042import java.util.Iterator; 043import java.util.LinkedHashMap; 044import java.util.List; 045import java.util.Map; 046import java.util.Set; 047import java.util.TimeZone; 048import java.util.concurrent.ConcurrentLinkedQueue; 049import java.util.concurrent.ConcurrentSkipListMap; 050import java.util.concurrent.atomic.AtomicReference; 051 052import org.forgerock.i18n.LocalizableMessage; 053import org.forgerock.i18n.slf4j.LocalizedLogger; 054import org.forgerock.opendj.config.server.ConfigException; 055import org.forgerock.opendj.ldap.ByteString; 056import org.forgerock.opendj.ldap.ConditionResult; 057import org.forgerock.opendj.ldap.ModificationType; 058import org.forgerock.opendj.ldap.ResultCode; 059import org.forgerock.opendj.ldap.SearchScope; 060import org.opends.server.admin.Configuration; 061import org.opends.server.api.Backend; 062import org.opends.server.config.ConfigConstants; 063import org.opends.server.controls.EntryChangelogNotificationControl; 064import org.opends.server.controls.ExternalChangelogRequestControl; 065import org.opends.server.core.AddOperation; 066import org.opends.server.core.DeleteOperation; 067import org.opends.server.core.DirectoryServer; 068import org.opends.server.core.ModifyDNOperation; 069import org.opends.server.core.ModifyOperation; 070import org.opends.server.core.PersistentSearch; 071import org.opends.server.core.SearchOperation; 072import org.opends.server.core.ServerContext; 073import org.opends.server.replication.common.CSN; 074import org.opends.server.replication.common.MultiDomainServerState; 075import org.opends.server.replication.common.ServerState; 076import org.opends.server.replication.protocol.AddMsg; 077import org.opends.server.replication.protocol.DeleteMsg; 078import org.opends.server.replication.protocol.LDAPUpdateMsg; 079import org.opends.server.replication.protocol.ModifyCommonMsg; 080import org.opends.server.replication.protocol.ModifyDNMsg; 081import org.opends.server.replication.protocol.UpdateMsg; 082import org.opends.server.replication.server.ReplicationServer; 083import org.opends.server.replication.server.ReplicationServerDomain; 084import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; 085import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 086import org.opends.server.replication.server.changelog.api.ChangelogDB; 087import org.opends.server.replication.server.changelog.api.ChangelogException; 088import org.opends.server.replication.server.changelog.api.DBCursor; 089import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 090import org.opends.server.replication.server.changelog.api.ReplicaId; 091import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 092import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; 093import org.opends.server.replication.server.changelog.file.ECLMultiDomainDBCursor; 094import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor; 095import org.opends.server.types.Attribute; 096import org.opends.server.types.AttributeType; 097import org.opends.server.types.Attributes; 098import org.opends.server.types.BackupConfig; 099import org.opends.server.types.BackupDirectory; 100import org.opends.server.types.CanceledOperationException; 101import org.opends.server.types.Control; 102import org.opends.server.types.DN; 103import org.opends.server.types.DirectoryException; 104import org.opends.server.types.Entry; 105import org.opends.server.types.FilterType; 106import org.opends.server.types.IndexType; 107import org.opends.server.types.InitializationException; 108import org.opends.server.types.LDIFExportConfig; 109import org.opends.server.types.LDIFImportConfig; 110import org.opends.server.types.LDIFImportResult; 111import org.opends.server.types.Modification; 112import org.opends.server.types.ObjectClass; 113import org.opends.server.types.Privilege; 114import org.opends.server.types.RDN; 115import org.opends.server.types.RawAttribute; 116import org.opends.server.types.RestoreConfig; 117import org.opends.server.types.SearchFilter; 118import org.opends.server.types.WritabilityMode; 119import org.opends.server.util.StaticUtils; 120 121/** 122 * A backend that provides access to the changelog, i.e. the "cn=changelog" 123 * suffix. It is a read-only backend that is created by a 124 * {@link ReplicationServer} and is not configurable. 125 * <p> 126 * There are two modes to search the changelog: 127 * <ul> 128 * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the 129 * request. The cookie provided in the control is used to retrieve entries from 130 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with 131 * the entries.</li> 132 * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided 133 * with the request. The entries are retrieved using the ChangeNumberIndexDB and 134 * their attributes are set with the information from the ReplicasDBs. The 135 * <code>changeNumber</code> attribute value is set from the content of 136 * ChangeNumberIndexDB.</li> 137 * </ul> 138 * <h3>Searches flow</h3> 139 * <p> 140 * Here is the flow of searches within the changelog backend APIs: 141 * <ul> 142 * <li>Normal searches only go through: 143 * <ol> 144 * <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li> 145 * </ol> 146 * </li> 147 * <li>Persistent searches with <code>changesOnly=false</code> go through: 148 * <ol> 149 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)} 150 * (once, single threaded),</li> 151 * <li> 152 * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li> 153 * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi 154 * threaded)</li> 155 * </ol> 156 * </li> 157 * <li>Persistent searches with <code>changesOnly=true</code> go through: 158 * <ol> 159 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)} 160 * (once, single threaded)</li> 161 * <li> 162 * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi 163 * threaded)</li> 164 * </ol> 165 * </li> 166 * </ul> 167 * 168 * @see ReplicationServer 169 */ 170public class ChangelogBackend extends Backend<Configuration> 171{ 172 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 173 174 /** The id of this backend. */ 175 public static final String BACKEND_ID = "changelog"; 176 177 private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L; 178 179 private static final String CHANGE_NUMBER_ATTR = "changeNumber"; 180 private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase(); 181 private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender"; 182 183 /** The set of objectclasses that will be used in root entry. */ 184 private static final Map<ObjectClass, String> 185 CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<>(2); 186 static 187 { 188 CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP); 189 CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container"); 190 } 191 192 /** The set of objectclasses that will be used in ECL entries. */ 193 private static final Map<ObjectClass, String> 194 CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<>(2); 195 static 196 { 197 CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP); 198 CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY); 199 } 200 201 /** The attribute type for the "creatorsName" attribute. */ 202 private static final AttributeType CREATORS_NAME_TYPE = 203 DirectoryServer.getAttributeTypeOrDefault(OP_ATTR_CREATORS_NAME_LC); 204 /** The attribute type for the "modifiersName" attribute. */ 205 private static final AttributeType MODIFIERS_NAME_TYPE = 206 DirectoryServer.getAttributeTypeOrDefault(OP_ATTR_MODIFIERS_NAME_LC); 207 208 /** The base DN for the external change log. */ 209 public static final DN CHANGELOG_BASE_DN; 210 211 static 212 { 213 try 214 { 215 CHANGELOG_BASE_DN = DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT); 216 } 217 catch (DirectoryException e) 218 { 219 throw new RuntimeException(e); 220 } 221 } 222 223 /** The set of base DNs for this backend. */ 224 private DN[] baseDNs; 225 /** The set of supported controls for this backend. */ 226 private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL); 227 /** Whether the base changelog entry has subordinates. */ 228 private Boolean baseEntryHasSubordinates; 229 230 /** The replication server on which the changelog is read. */ 231 private final ReplicationServer replicationServer; 232 private final ECLEnabledDomainPredicate domainPredicate; 233 234 /** The set of cookie-based persistent searches registered with this backend. */ 235 private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches = new ConcurrentLinkedQueue<>(); 236 /** The set of change number-based persistent searches registered with this backend. */ 237 private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches = 238 new ConcurrentLinkedQueue<>(); 239 240 /** 241 * Creates a new backend with the provided replication server. 242 * 243 * @param replicationServer 244 * The replication server on which the changes are read. 245 * @param domainPredicate 246 * Returns whether a domain is enabled for the external changelog. 247 */ 248 public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate) 249 { 250 this.replicationServer = replicationServer; 251 this.domainPredicate = domainPredicate; 252 setBackendID(BACKEND_ID); 253 setWritabilityMode(WritabilityMode.DISABLED); 254 setPrivateBackend(true); 255 } 256 257 private ChangelogDB getChangelogDB() 258 { 259 return replicationServer.getChangelogDB(); 260 } 261 262 /** 263 * Returns the ChangelogBackend configured for "cn=changelog" in this directory server. 264 * 265 * @return the ChangelogBackend configured for "cn=changelog" in this directory server 266 * @deprecated instead inject the required object where needed 267 */ 268 @Deprecated 269 public static ChangelogBackend getInstance() 270 { 271 return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN); 272 } 273 274 /** {@inheritDoc} */ 275 @Override 276 public void configureBackend(final Configuration config, ServerContext serverContext) throws ConfigException 277 { 278 throw new UnsupportedOperationException("The changelog backend is not configurable"); 279 } 280 281 /** {@inheritDoc} */ 282 @Override 283 public void openBackend() throws InitializationException 284 { 285 baseDNs = new DN[] { CHANGELOG_BASE_DN }; 286 287 try 288 { 289 DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true); 290 } 291 catch (final DirectoryException e) 292 { 293 throw new InitializationException( 294 ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e); 295 } 296 } 297 298 /** {@inheritDoc} */ 299 @Override 300 public void closeBackend() 301 { 302 try 303 { 304 DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN); 305 } 306 catch (final DirectoryException e) 307 { 308 logger.traceException(e); 309 } 310 } 311 312 /** {@inheritDoc} */ 313 @Override 314 public DN[] getBaseDNs() 315 { 316 return baseDNs; 317 } 318 319 /** {@inheritDoc} */ 320 @Override 321 public boolean isIndexed(final AttributeType attributeType, final IndexType indexType) 322 { 323 return true; 324 } 325 326 /** {@inheritDoc} */ 327 @Override 328 public Entry getEntry(final DN entryDN) throws DirectoryException 329 { 330 if (entryDN == null) 331 { 332 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 333 ERR_BACKEND_GET_ENTRY_NULL.get(getBackendID())); 334 } 335 throw new RuntimeException("Not implemented"); 336 } 337 338 /** {@inheritDoc} */ 339 @Override 340 public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException 341 { 342 if (CHANGELOG_BASE_DN.equals(entryDN)) 343 { 344 final Boolean hasSubs = baseChangelogHasSubordinates(); 345 if (hasSubs == null) 346 { 347 return ConditionResult.UNDEFINED; 348 } 349 return ConditionResult.valueOf(hasSubs); 350 } 351 return ConditionResult.FALSE; 352 } 353 354 private Boolean baseChangelogHasSubordinates() throws DirectoryException 355 { 356 if (baseEntryHasSubordinates == null) 357 { 358 // compute its value 359 try 360 { 361 final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); 362 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); 363 final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( 364 new MultiDomainServerState(), options, getExcludedBaseDNs()); 365 try 366 { 367 baseEntryHasSubordinates = cursor.next(); 368 } 369 finally 370 { 371 close(cursor); 372 } 373 } 374 catch (ChangelogException e) 375 { 376 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get( 377 "hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e))); 378 } 379 } 380 return baseEntryHasSubordinates; 381 } 382 383 /** {@inheritDoc} */ 384 @Override 385 public long getNumberOfEntriesInBaseDN(final DN baseDN) throws DirectoryException 386 { 387 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get()); 388 } 389 390 /** {@inheritDoc} */ 391 @Override 392 public long getNumberOfChildren(final DN parentDN) throws DirectoryException 393 { 394 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get()); 395 } 396 397 /** 398 * Notifies persistent searches of this backend that a new cookie entry was added to it. 399 * <p> 400 * Note: This method correspond to the "persistent search" phase. 401 * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled. 402 * <p> 403 * This method must only be called after the provided data have been persisted to disk. 404 * 405 * @param baseDN 406 * the baseDN of the newly added entry. 407 * @param updateMsg 408 * the update message of the newly added entry 409 * @throws ChangelogException 410 * If a problem occurs while notifying of the newly added entry. 411 */ 412 public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException 413 { 414 if (!(updateMsg instanceof LDAPUpdateMsg)) 415 { 416 return; 417 } 418 419 try 420 { 421 for (PersistentSearch pSearch : cookieBasedPersistentSearches) 422 { 423 final SearchOperation searchOp = pSearch.getSearchOperation(); 424 final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT); 425 entrySender.persistentSearchSendEntry(baseDN, updateMsg); 426 } 427 } 428 catch (DirectoryException e) 429 { 430 throw new ChangelogException(e.getMessageObject(), e); 431 } 432 } 433 434 /** 435 * Notifies persistent searches of this backend that a new change number entry was added to it. 436 * <p> 437 * Note: This method correspond to the "persistent search" phase. 438 * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled. 439 * <p> 440 * This method must only be called after the provided data have been persisted to disk. 441 * 442 * @param baseDN 443 * the baseDN of the newly added entry. 444 * @param changeNumber 445 * the change number of the newly added entry. It will be greater 446 * than zero for entries added to the change number index and less 447 * than or equal to zero for entries added to any replica DB 448 * @param cookieString 449 * a string representing the cookie of the newly added entry. 450 * This is only meaningful for entries added to the change number index 451 * @param updateMsg 452 * the update message of the newly added entry 453 * @throws ChangelogException 454 * If a problem occurs while notifying of the newly added entry. 455 */ 456 public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg) 457 throws ChangelogException 458 { 459 if (!(updateMsg instanceof LDAPUpdateMsg) 460 || changeNumberBasedPersistentSearches.isEmpty()) 461 { 462 return; 463 } 464 465 try 466 { 467 // changeNumber entry can be shared with multiple persistent searches 468 final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg); 469 for (PersistentSearch pSearch : changeNumberBasedPersistentSearches) 470 { 471 final SearchOperation searchOp = pSearch.getSearchOperation(); 472 final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT); 473 entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry); 474 } 475 } 476 catch (DirectoryException e) 477 { 478 throw new ChangelogException(e.getMessageObject(), e); 479 } 480 } 481 482 private boolean isCookieBased(final SearchOperation searchOp) 483 { 484 for (Control c : searchOp.getRequestControls()) 485 { 486 if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID())) 487 { 488 return true; 489 } 490 } 491 return false; 492 } 493 494 /** {@inheritDoc} */ 495 @Override 496 public void addEntry(Entry entry, AddOperation addOperation) 497 throws DirectoryException, CanceledOperationException 498 { 499 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 500 ERR_BACKEND_ADD_NOT_SUPPORTED.get(String.valueOf(entry.getName()), getBackendID())); 501 } 502 503 /** {@inheritDoc} */ 504 @Override 505 public void deleteEntry(DN entryDN, DeleteOperation deleteOperation) 506 throws DirectoryException, CanceledOperationException 507 { 508 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 509 ERR_BACKEND_DELETE_NOT_SUPPORTED.get(String.valueOf(entryDN), getBackendID())); 510 } 511 512 /** {@inheritDoc} */ 513 @Override 514 public void replaceEntry(Entry oldEntry, Entry newEntry, 515 ModifyOperation modifyOperation) throws DirectoryException, 516 CanceledOperationException 517 { 518 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 519 ERR_BACKEND_MODIFY_NOT_SUPPORTED.get(String.valueOf(newEntry.getName()), getBackendID())); 520 } 521 522 /** {@inheritDoc} */ 523 @Override 524 public void renameEntry(DN currentDN, Entry entry, 525 ModifyDNOperation modifyDNOperation) throws DirectoryException, 526 CanceledOperationException 527 { 528 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 529 ERR_BACKEND_MODIFY_DN_NOT_SUPPORTED.get(String.valueOf(currentDN), getBackendID())); 530 } 531 532 /** 533 * {@inheritDoc} 534 * <p> 535 * Runs the "initial search" phase (as opposed to a "persistent search" 536 * phase). The "initial search" phase is the only search run by normal 537 * searches, but it is also run by persistent searches with 538 * <code>changesOnly=false</code>. Persistent searches with 539 * <code>changesOnly=true</code> never execute this code. 540 * <p> 541 * Note: this method is executed only once per persistent search, single 542 * threaded. 543 */ 544 @Override 545 public void search(final SearchOperation searchOperation) throws DirectoryException 546 { 547 checkChangelogReadPrivilege(searchOperation); 548 549 final Set<DN> excludedBaseDNs = getExcludedBaseDNs(); 550 final MultiDomainServerState cookie = getCookieFromControl(searchOperation, excludedBaseDNs); 551 552 final ChangeNumberRange range = optimizeSearch(searchOperation.getBaseDN(), searchOperation.getFilter()); 553 try 554 { 555 final boolean isPersistentSearch = isPersistentSearch(searchOperation); 556 if (cookie != null) 557 { 558 initialSearchFromCookie( 559 getCookieEntrySender(SearchPhase.INITIAL, searchOperation, cookie, excludedBaseDNs, isPersistentSearch)); 560 } 561 else 562 { 563 initialSearchFromChangeNumber( 564 getChangeNumberEntrySender(SearchPhase.INITIAL, searchOperation, range, isPersistentSearch)); 565 } 566 } 567 catch (ChangelogException e) 568 { 569 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get( 570 searchOperation.getBaseDN(), searchOperation.getFilter(), stackTraceToSingleLineString(e))); 571 } 572 } 573 574 private MultiDomainServerState getCookieFromControl(final SearchOperation searchOperation, Set<DN> excludedBaseDNs) 575 throws DirectoryException 576 { 577 final ExternalChangelogRequestControl eclRequestControl = 578 searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER); 579 if (eclRequestControl != null) 580 { 581 final MultiDomainServerState cookie = eclRequestControl.getCookie(); 582 validateProvidedCookie(cookie, excludedBaseDNs); 583 return cookie; 584 } 585 return null; 586 } 587 588 /** {@inheritDoc} */ 589 @Override 590 public Set<String> getSupportedControls() 591 { 592 return supportedControls; 593 } 594 595 /** {@inheritDoc} */ 596 @Override 597 public Set<String> getSupportedFeatures() 598 { 599 return Collections.emptySet(); 600 } 601 602 /** {@inheritDoc} */ 603 @Override 604 public boolean supports(BackendOperation backendOperation) 605 { 606 return false; 607 } 608 609 /** {@inheritDoc} */ 610 @Override 611 public void exportLDIF(final LDIFExportConfig exportConfig) 612 throws DirectoryException 613 { 614 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 615 ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID())); 616 } 617 618 /** {@inheritDoc} */ 619 @Override 620 public LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) 621 throws DirectoryException 622 { 623 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 624 ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID())); 625 } 626 627 /** {@inheritDoc} */ 628 @Override 629 public void createBackup(BackupConfig backupConfig) throws DirectoryException 630 { 631 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 632 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 633 } 634 635 /** {@inheritDoc} */ 636 @Override 637 public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException 638 { 639 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 640 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 641 } 642 643 /** {@inheritDoc} */ 644 @Override 645 public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException 646 { 647 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 648 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 649 } 650 651 /** {@inheritDoc} */ 652 @Override 653 public long getEntryCount() 654 { 655 try 656 { 657 return getNumberOfEntriesInBaseDN(CHANGELOG_BASE_DN) + 1; 658 } 659 catch (DirectoryException e) 660 { 661 logger.traceException(e); 662 return -1; 663 } 664 } 665 666 /** 667 * Represent the change number range targeted by a search operation. 668 * <p> 669 * This class should be visible for tests. 670 */ 671 static final class ChangeNumberRange 672 { 673 private long lowerBound = -1; 674 private long upperBound = -1; 675 676 /** 677 * Returns the lowest change number to retrieve (inclusive). 678 * 679 * @return the lowest change number 680 */ 681 long getLowerBound() 682 { 683 return lowerBound; 684 } 685 686 /** 687 * Returns the highest change number to retrieve (inclusive). 688 * 689 * @return the highest change number 690 */ 691 long getUpperBound() 692 { 693 return upperBound; 694 } 695 } 696 697 /** 698 * Returns the set of DNs to exclude from the search. 699 * 700 * @return the DNs corresponding to domains to exclude from the search. 701 * @throws DirectoryException 702 * If a DN can't be decoded. 703 */ 704 private static Set<DN> getExcludedBaseDNs() throws DirectoryException 705 { 706 return getExcludedChangelogDomains(); 707 } 708 709 /** 710 * Optimize the search parameters by analyzing the DN and filter. 711 * It also performs validation on some search parameters 712 * for both cookie and change number based changelogs. 713 * 714 * @param baseDN the provided search baseDN. 715 * @param userFilter the provided search filter. 716 * @return the optimized change number range 717 * @throws DirectoryException when an exception occurs. 718 */ 719 ChangeNumberRange optimizeSearch(final DN baseDN, final SearchFilter userFilter) throws DirectoryException 720 { 721 SearchFilter equalityFilter = null; 722 switch (baseDN.size()) 723 { 724 case 1: 725 // "cn=changelog" : use user-provided search filter. 726 break; 727 case 2: 728 // It is probably "changeNumber=xxx,cn=changelog", use equality filter 729 // But it also could be "<service-id>,cn=changelog" so need to check on attribute 730 equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR); 731 break; 732 default: 733 // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter 734 equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN"); 735 break; 736 } 737 738 return optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter); 739 } 740 741 /** 742 * Build a search filter from given DN and attribute. 743 * 744 * @return the search filter or {@code null} if attribute is not present in 745 * the provided DN 746 */ 747 private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr) 748 { 749 final RDN rdn = baseDN.rdn(); 750 AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(lowerCaseAttr, upperCaseAttr); 751 final ByteString attrValue = rdn.getAttributeValue(attrType); 752 if (attrValue != null) 753 { 754 return SearchFilter.createEqualityFilter(attrType, attrValue); 755 } 756 return null; 757 } 758 759 private ChangeNumberRange optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException 760 { 761 final ChangeNumberRange range = new ChangeNumberRange(); 762 if (filter == null) 763 { 764 return range; 765 } 766 767 if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR)) 768 { 769 range.lowerBound = decodeChangeNumber(filter.getAssertionValue()); 770 } 771 else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR)) 772 { 773 range.upperBound = decodeChangeNumber(filter.getAssertionValue()); 774 } 775 else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR)) 776 { 777 final long number = decodeChangeNumber(filter.getAssertionValue()); 778 range.lowerBound = number; 779 range.upperBound = number; 780 } 781 else if (matches(filter, FilterType.EQUALITY, "replicationcsn")) 782 { 783 // == exact CSN 784 // validate provided CSN is correct 785 new CSN(filter.getAssertionValue().toString()); 786 } 787 else if (filter.getFilterType() == FilterType.AND) 788 { 789 // TODO: it looks like it could be generalized to N components, not only two 790 final Collection<SearchFilter> components = filter.getFilterComponents(); 791 final SearchFilter filters[] = components.toArray(new SearchFilter[0]); 792 long upper1 = -1; 793 long lower1 = -1; 794 long upper2 = -1; 795 long lower2 = -1; 796 if (filters.length > 0) 797 { 798 ChangeNumberRange range1 = optimizeSearchUsingFilter(filters[0]); 799 upper1 = range1.upperBound; 800 lower1 = range1.lowerBound; 801 } 802 if (filters.length > 1) 803 { 804 ChangeNumberRange range2 = optimizeSearchUsingFilter(filters[1]); 805 upper2 = range2.upperBound; 806 lower2 = range2.lowerBound; 807 } 808 if (upper1 == -1) 809 { 810 range.upperBound = upper2; 811 } 812 else if (upper2 == -1) 813 { 814 range.upperBound = upper1; 815 } 816 else 817 { 818 range.upperBound = Math.min(upper1, upper2); 819 } 820 821 range.lowerBound = Math.max(lower1, lower2); 822 } 823 return range; 824 } 825 826 private static long decodeChangeNumber(final ByteString assertionValue) 827 throws DirectoryException 828 { 829 try 830 { 831 return Long.decode(assertionValue.toString()); 832 } 833 catch (NumberFormatException e) 834 { 835 throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX, 836 LocalizableMessage.raw("Could not convert value '%s' to long", assertionValue)); 837 } 838 } 839 840 private boolean matches(SearchFilter filter, FilterType filterType, String primaryName) 841 { 842 return filter.getFilterType() == filterType 843 && filter.getAttributeType() != null 844 && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName); 845 } 846 847 /** 848 * Search the changelog when a cookie control is provided. 849 */ 850 private void initialSearchFromCookie(final CookieEntrySender entrySender) 851 throws DirectoryException, ChangelogException 852 { 853 if (!sendBaseChangelogEntry(entrySender.searchOp)) 854 { // only return the base entry: stop here 855 return; 856 } 857 858 ECLMultiDomainDBCursor replicaUpdatesCursor = null; 859 try 860 { 861 final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); 862 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); 863 final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( 864 entrySender.cookie, options, entrySender.excludedBaseDNs); 865 replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); 866 867 if (sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor)) 868 { 869 entrySender.transitioningToPersistentSearchPhase(); 870 sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor); 871 } 872 } 873 finally 874 { 875 entrySender.finalizeInitialSearch(); 876 StaticUtils.close(replicaUpdatesCursor); 877 } 878 } 879 880 private CookieEntrySender getCookieEntrySender(SearchPhase startPhase, final SearchOperation searchOperation, 881 MultiDomainServerState cookie, Set<DN> excludedBaseDNs, boolean isPersistentSearch) 882 { 883 if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase)) 884 { 885 return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); 886 } 887 return new CookieEntrySender(searchOperation, startPhase, cookie, excludedBaseDNs); 888 } 889 890 private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender, 891 final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException 892 { 893 boolean continueSearch = true; 894 while (continueSearch && replicaUpdatesCursor.next()) 895 { 896 final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); 897 final DN domainBaseDN = replicaUpdatesCursor.getData(); 898 continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN); 899 } 900 return continueSearch; 901 } 902 903 private boolean isPersistentSearch(SearchOperation op) 904 { 905 for (PersistentSearch pSearch : getPersistentSearches()) 906 { 907 if (op == pSearch.getSearchOperation()) 908 { 909 return true; 910 } 911 } 912 return false; 913 } 914 915 /** {@inheritDoc} */ 916 @Override 917 public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException 918 { 919 initializePersistentSearch(pSearch); 920 921 if (isCookieBased(pSearch.getSearchOperation())) 922 { 923 cookieBasedPersistentSearches.add(pSearch); 924 } 925 else 926 { 927 changeNumberBasedPersistentSearches.add(pSearch); 928 } 929 super.registerPersistentSearch(pSearch); 930 } 931 932 private void initializePersistentSearch(PersistentSearch pSearch) throws DirectoryException 933 { 934 final SearchOperation searchOp = pSearch.getSearchOperation(); 935 936 // Validation must be done during registration for changes only persistent searches. 937 // Otherwise, when there is an initial search phase, 938 // validation is performed by the search() method. 939 if (pSearch.isChangesOnly()) 940 { 941 checkChangelogReadPrivilege(searchOp); 942 } 943 final ChangeNumberRange range = optimizeSearch(searchOp.getBaseDN(), searchOp.getFilter()); 944 945 final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL; 946 if (isCookieBased(searchOp)) 947 { 948 final Set<DN> excludedBaseDNs = getExcludedBaseDNs(); 949 final MultiDomainServerState cookie = getCookie(pSearch.isChangesOnly(), searchOp, excludedBaseDNs); 950 searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, 951 new CookieEntrySender(searchOp, startPhase, cookie, excludedBaseDNs)); 952 } 953 else 954 { 955 searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, 956 new ChangeNumberEntrySender(searchOp, startPhase, range)); 957 } 958 } 959 960 private MultiDomainServerState getCookie(boolean isChangesOnly, SearchOperation searchOp, Set<DN> excludedBaseDNs) 961 throws DirectoryException 962 { 963 if (isChangesOnly) 964 { 965 // this changesOnly persistent search will not go through #initialSearch() 966 // so we must initialize the cookie here 967 return getNewestCookie(searchOp); 968 } 969 return getCookieFromControl(searchOp, excludedBaseDNs); 970 } 971 972 private MultiDomainServerState getNewestCookie(SearchOperation searchOp) 973 { 974 if (!isCookieBased(searchOp)) 975 { 976 return null; 977 } 978 979 final MultiDomainServerState cookie = new MultiDomainServerState(); 980 for (final Iterator<ReplicationServerDomain> it = 981 replicationServer.getDomainIterator(); it.hasNext();) 982 { 983 final DN baseDN = it.next().getBaseDN(); 984 final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN); 985 cookie.update(baseDN, state); 986 } 987 return cookie; 988 } 989 990 /** 991 * Validates the cookie contained in search parameters by checking its content 992 * with the actual replication server state. 993 * 994 * @throws DirectoryException 995 * If the state is not valid 996 */ 997 private void validateProvidedCookie(final MultiDomainServerState cookie, Set<DN> excludedBaseDNs) 998 throws DirectoryException 999 { 1000 if (cookie != null && !cookie.isEmpty()) 1001 { 1002 replicationServer.validateCookie(cookie, excludedBaseDNs); 1003 } 1004 } 1005 1006 /** 1007 * Search the changelog using change number(s). 1008 */ 1009 private void initialSearchFromChangeNumber(final ChangeNumberEntrySender entrySender) 1010 throws ChangelogException, DirectoryException 1011 { 1012 if (!sendBaseChangelogEntry(entrySender.searchOp)) 1013 { // only return the base entry: stop here 1014 return; 1015 } 1016 1017 DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null; 1018 final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<>(); 1019 try 1020 { 1021 cnIndexDBCursor = getCNIndexDBCursor(entrySender.lowestChangeNumber); 1022 final MultiDomainServerState cookie = new MultiDomainServerState(); 1023 1024 if (sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie)) 1025 { 1026 entrySender.transitioningToPersistentSearchPhase(); 1027 sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie); 1028 } 1029 } 1030 finally 1031 { 1032 entrySender.finalizeInitialSearch(); 1033 StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor.get()); 1034 } 1035 } 1036 1037 private ChangeNumberEntrySender getChangeNumberEntrySender(SearchPhase startPhase, 1038 final SearchOperation searchOperation, ChangeNumberRange range, boolean isPersistentSearch) 1039 { 1040 if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase)) 1041 { 1042 return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); 1043 } 1044 return new ChangeNumberEntrySender(searchOperation, SearchPhase.INITIAL, range); 1045 } 1046 1047 private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender, 1048 DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor, 1049 MultiDomainServerState cookie) throws ChangelogException, DirectoryException 1050 { 1051 boolean continueSearch = true; 1052 while (continueSearch && cnIndexDBCursor.next()) 1053 { 1054 // Handle the current cnIndex record 1055 final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord(); 1056 if (replicaUpdatesCursor.get() == null) 1057 { 1058 replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord)); 1059 initializeCookieForChangeNumberMode(cookie, cnIndexRecord); 1060 } 1061 else 1062 { 1063 cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); 1064 } 1065 continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); 1066 if (continueSearch) 1067 { 1068 final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN()); 1069 if (updateMsg != null) 1070 { 1071 continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie); 1072 replicaUpdatesCursor.get().next(); 1073 } 1074 } 1075 } 1076 return continueSearch; 1077 } 1078 1079 /** Initialize the provided cookie from the provided change number index record. */ 1080 private void initializeCookieForChangeNumberMode( 1081 MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException 1082 { 1083 // Initialize the multi domain cursor only from the change number index record. 1084 // The cookie is always empty at this stage. 1085 CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN()); 1086 MultiDomainServerState unused = new MultiDomainServerState(); 1087 MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options); 1088 try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor)) 1089 { 1090 updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord); 1091 } 1092 } 1093 1094 /** 1095 * Rebuilds the changelogcookie starting at the newest change number index record. 1096 * <p> 1097 * It updates the provided cookie with the changes from the provided ECL cursor, 1098 * up to (and including) the provided change number index record. 1099 * <p> 1100 * Therefore, after calling this method, the cursor is positioned 1101 * to the change immediately following the provided change number index record. 1102 * 1103 * @param cookie the cookie to update 1104 * @param cursor the cursor where to read changes from 1105 * @param cnIndexRecord the change number index record to go right after 1106 * @throws ChangelogException if any problem occurs 1107 */ 1108 public static void updateCookieToMediumConsistencyPoint( 1109 MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord) 1110 throws ChangelogException 1111 { 1112 if (cnIndexRecord == null) 1113 { 1114 return; 1115 } 1116 1117 while (cursor.next()) 1118 { 1119 UpdateMsg updateMsg = cursor.getRecord(); 1120 if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0) 1121 { 1122 break; 1123 } 1124 cookie.update(cursor.getData(), updateMsg.getCSN()); 1125 } 1126 } 1127 1128 private MultiDomainDBCursor initializeReplicaUpdatesCursor( 1129 final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException 1130 { 1131 final MultiDomainServerState state = new MultiDomainServerState(); 1132 state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); 1133 1134 // No need for ECLMultiDomainDBCursor in this case 1135 // as updateMsg will be matched with cnIndexRecord 1136 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); 1137 final MultiDomainDBCursor replicaUpdatesCursor = 1138 getChangelogDB().getReplicationDomainDB().getCursorFrom(state, options); 1139 replicaUpdatesCursor.next(); 1140 return replicaUpdatesCursor; 1141 } 1142 1143 /** 1144 * Returns the replica update message corresponding to the provided 1145 * cnIndexRecord. 1146 * 1147 * @return the update message, which may be {@code null} if the update message 1148 * could not be found because it was purged or because corresponding 1149 * baseDN was removed from the changelog 1150 * @throws DirectoryException 1151 * If inconsistency is detected between the available update 1152 * messages and the provided cnIndexRecord 1153 */ 1154 private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn) 1155 throws ChangelogException, DirectoryException 1156 { 1157 while (true) 1158 { 1159 final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); 1160 final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN()); 1161 if (compareIndexWithUpdateMsg < 0) { 1162 // Either update message has been purged or baseDN has been removed from changelogDB, 1163 // ignore current index record and go to the next one 1164 return null; 1165 } 1166 else if (compareIndexWithUpdateMsg == 0) 1167 { 1168 // Found the matching update message 1169 return updateMsg; 1170 } 1171 // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet 1172 if (!replicaUpdatesCursor.next()) 1173 { 1174 // Should never happen, as it means some messages have disappeared 1175 // TODO : put the correct I18N message 1176 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1177 LocalizableMessage.raw("Could not find replica update message matching index record. " + 1178 "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist.")); 1179 } 1180 } 1181 } 1182 1183 /** Returns a cursor on CNIndexDB for the provided first change number. */ 1184 private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor( 1185 final long firstChangeNumber) throws ChangelogException 1186 { 1187 final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB(); 1188 long changeNumberToUse = firstChangeNumber; 1189 if (changeNumberToUse <= 1) 1190 { 1191 final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); 1192 changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber(); 1193 } 1194 return cnIndexDB.getCursorFrom(changeNumberToUse); 1195 } 1196 1197 /** 1198 * Creates a changelog entry. 1199 */ 1200 private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, 1201 final UpdateMsg msg) throws DirectoryException 1202 { 1203 if (msg instanceof AddMsg) 1204 { 1205 return createAddMsg(baseDN, changeNumber, cookie, msg); 1206 } 1207 else if (msg instanceof ModifyCommonMsg) 1208 { 1209 return createModifyMsg(baseDN, changeNumber, cookie, msg); 1210 } 1211 else if (msg instanceof DeleteMsg) 1212 { 1213 final DeleteMsg delMsg = (DeleteMsg) msg; 1214 return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName()); 1215 } 1216 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1217 LocalizableMessage.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN, 1218 msg.getClass())); 1219 } 1220 1221 /** 1222 * Creates an entry from an add message. 1223 * <p> 1224 * Map addMsg to an LDIF string for the 'changes' attribute, and pull out 1225 * change initiators name if available which is contained in the creatorsName 1226 * attribute. 1227 */ 1228 private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) 1229 throws DirectoryException 1230 { 1231 final AddMsg addMsg = (AddMsg) msg; 1232 String changeInitiatorsName = null; 1233 String ldifChanges = null; 1234 try 1235 { 1236 final StringBuilder builder = new StringBuilder(256); 1237 for (Attribute attr : addMsg.getAttributes()) 1238 { 1239 if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty()) 1240 { 1241 // This attribute is not multi-valued. 1242 changeInitiatorsName = attr.iterator().next().toString(); 1243 } 1244 final String attrName = attr.getNameWithOptions(); 1245 for (ByteString value : attr) 1246 { 1247 builder.append(attrName); 1248 appendLDIFSeparatorAndValue(builder, value); 1249 builder.append('\n'); 1250 } 1251 } 1252 ldifChanges = builder.toString(); 1253 } 1254 catch (Exception e) 1255 { 1256 logEncodingMessageError("add", addMsg.getDN(), e); 1257 } 1258 1259 return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName); 1260 } 1261 1262 /** 1263 * Creates an entry from a modify message. 1264 * <p> 1265 * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull 1266 * out change initiators name if available which is contained in the 1267 * modifiersName attribute. 1268 */ 1269 private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, 1270 final UpdateMsg msg) throws DirectoryException 1271 { 1272 final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg; 1273 String changeInitiatorsName = null; 1274 String ldifChanges = null; 1275 try 1276 { 1277 final StringBuilder builder = new StringBuilder(128); 1278 for (Modification mod : modifyMsg.getMods()) 1279 { 1280 final Attribute attr = mod.getAttribute(); 1281 if (mod.getModificationType() == ModificationType.REPLACE 1282 && attr.getAttributeType().equals(MODIFIERS_NAME_TYPE) 1283 && !attr.isEmpty()) 1284 { 1285 // This attribute is not multi-valued. 1286 changeInitiatorsName = attr.iterator().next().toString(); 1287 } 1288 final String attrName = attr.getNameWithOptions(); 1289 builder.append(mod.getModificationType()); 1290 builder.append(": "); 1291 builder.append(attrName); 1292 builder.append('\n'); 1293 1294 for (ByteString value : attr) 1295 { 1296 builder.append(attrName); 1297 appendLDIFSeparatorAndValue(builder, value); 1298 builder.append('\n'); 1299 } 1300 builder.append("-\n"); 1301 } 1302 ldifChanges = builder.toString(); 1303 } 1304 catch (Exception e) 1305 { 1306 logEncodingMessageError("modify", modifyMsg.getDN(), e); 1307 } 1308 1309 final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg; 1310 final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges, 1311 isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName); 1312 1313 if (isModifyDNMsg) 1314 { 1315 final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg; 1316 addAttribute(entry, "newrdn", modDNMsg.getNewRDN()); 1317 if (modDNMsg.getNewSuperior() != null) 1318 { 1319 addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior()); 1320 } 1321 addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn())); 1322 } 1323 return entry; 1324 } 1325 1326 /** 1327 * Log an encoding message error. 1328 * 1329 * @param messageType 1330 * String identifying type of message. Should be "add" or "modify". 1331 * @param entryDN 1332 * DN of original entry 1333 */ 1334 private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception) 1335 { 1336 logger.traceException(exception); 1337 logger.error(LocalizableMessage.raw( 1338 "An exception was encountered while trying to encode a replication " + messageType + " message for entry \"" 1339 + entryDN + "\" into an External Change Log entry: " + exception.getMessage())); 1340 } 1341 1342 private void checkChangelogReadPrivilege(SearchOperation searchOp) throws DirectoryException 1343 { 1344 if (!searchOp.getClientConnection().hasPrivilege(Privilege.CHANGELOG_READ, searchOp)) 1345 { 1346 throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS, 1347 NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get()); 1348 } 1349 } 1350 1351 /** 1352 * Create a changelog entry from a set of provided information. This is the part of 1353 * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN). 1354 */ 1355 private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie, 1356 final LDAPUpdateMsg msg, final String ldifChanges, final String changeType, 1357 final String changeInitiatorsName) throws DirectoryException 1358 { 1359 final CSN csn = msg.getCSN(); 1360 String dnString; 1361 if (changeNumber > 0) 1362 { 1363 // change number mode 1364 dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT; 1365 } 1366 else 1367 { 1368 // Cookie mode 1369 dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT; 1370 } 1371 1372 final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>(); 1373 final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<>(); 1374 1375 // Operational standard attributes 1376 addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC, 1377 ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs); 1378 addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs); 1379 addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs); 1380 addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs); 1381 1382 // REQUIRED attributes 1383 if (changeNumber > 0) 1384 { 1385 addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs); 1386 } 1387 SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME); 1388 dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ?? 1389 final String format = dateFormat.format(new Date(csn.getTime())); 1390 addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs); 1391 addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs); 1392 addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs); 1393 1394 // NON REQUESTED attributes 1395 addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs); 1396 addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()), 1397 userAttrs, opAttrs); 1398 1399 if (ldifChanges != null) 1400 { 1401 addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs); 1402 } 1403 if (changeInitiatorsName != null) 1404 { 1405 addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs); 1406 } 1407 1408 final String targetUUID = msg.getEntryUUID(); 1409 if (targetUUID != null) 1410 { 1411 addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs); 1412 } 1413 final String cookie2 = cookie != null ? cookie : ""; 1414 addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs); 1415 1416 final List<RawAttribute> includedAttributes = msg.getEclIncludes(); 1417 if (includedAttributes != null && !includedAttributes.isEmpty()) 1418 { 1419 final StringBuilder builder = new StringBuilder(256); 1420 for (final RawAttribute includedAttribute : includedAttributes) 1421 { 1422 final String name = includedAttribute.getAttributeType(); 1423 for (final ByteString value : includedAttribute.getValues()) 1424 { 1425 builder.append(name); 1426 appendLDIFSeparatorAndValue(builder, value); 1427 builder.append('\n'); 1428 } 1429 } 1430 final String includedAttributesLDIF = builder.toString(); 1431 addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs); 1432 } 1433 1434 return new Entry(DN.valueOf(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs); 1435 } 1436 1437 /** 1438 * Sends the entry if it matches the base, scope and filter of the current search operation. 1439 * It will also send the base changelog entry if it needs to be sent and was not sent before. 1440 * 1441 * @return {@code true} if search should continue, {@code false} otherwise 1442 */ 1443 private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie) 1444 throws DirectoryException 1445 { 1446 if (matchBaseAndScopeAndFilter(searchOp, entry)) 1447 { 1448 return searchOp.returnEntry(entry, getControls(cookie)); 1449 } 1450 // maybe the next entry will match? 1451 return true; 1452 } 1453 1454 /** Indicates if the provided entry matches the filter, base and scope. */ 1455 private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException 1456 { 1457 return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope()) 1458 && searchOp.getFilter().matchesEntry(entry); 1459 } 1460 1461 private static List<Control> getControls(String cookie) 1462 { 1463 if (cookie != null) 1464 { 1465 final Control c = new EntryChangelogNotificationControl(true, cookie); 1466 return Collections.singletonList(c); 1467 } 1468 return Collections.emptyList(); 1469 } 1470 1471 /** 1472 * Create and returns the base changelog entry to the underlying search operation. 1473 * <p> 1474 * "initial search" phase must return the base entry immediately. 1475 * 1476 * @return {@code true} if search should continue, {@code false} otherwise 1477 */ 1478 private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException 1479 { 1480 final DN baseDN = searchOp.getBaseDN(); 1481 final SearchFilter filter = searchOp.getFilter(); 1482 final SearchScope scope = searchOp.getScope(); 1483 1484 if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope)) 1485 { 1486 final Entry entry = buildBaseChangelogEntry(); 1487 if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null)) 1488 { 1489 // Abandon, size limit reached. 1490 return false; 1491 } 1492 } 1493 return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN) 1494 || !scope.equals(SearchScope.BASE_OBJECT); 1495 } 1496 1497 private Entry buildBaseChangelogEntry() throws DirectoryException 1498 { 1499 final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates()); 1500 1501 final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>(); 1502 final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<>(); 1503 1504 // We never return the numSubordinates attribute for the base changelog entry 1505 // and there is a very good reason for that: 1506 // - Either we compute it before sending the entries, 1507 // -- then we risk returning more entries if new entries come in after we computed numSubordinates 1508 // -- or we risk returning less entries if purge kicks in after we computed numSubordinates 1509 // - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError 1510 1511 addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs); 1512 addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY, 1513 ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs); 1514 addAttributeByUppercaseName("hassubordinates", "hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs); 1515 addAttributeByUppercaseName("entrydn", "entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs); 1516 return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs); 1517 } 1518 1519 private static void addAttribute(final Entry e, final String attrType, final String attrValue) 1520 { 1521 e.addAttribute(Attributes.create(attrType, attrValue), null); 1522 } 1523 1524 private static void addAttributeByType(String attrNameLowercase, 1525 String attrNameUppercase, String attrValue, 1526 Map<AttributeType, List<Attribute>> userAttrs, 1527 Map<AttributeType, List<Attribute>> operationalAttrs) 1528 { 1529 addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true); 1530 } 1531 1532 private static void addAttributeByUppercaseName(String attrNameLowercase, 1533 String attrNameUppercase, String attrValue, 1534 Map<AttributeType, List<Attribute>> userAttrs, 1535 Map<AttributeType, List<Attribute>> operationalAttrs) 1536 { 1537 addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false); 1538 } 1539 1540 private static void addAttribute(final String attrNameLowercase, 1541 final String attrNameUppercase, final String attrValue, 1542 final Map<AttributeType, List<Attribute>> userAttrs, 1543 final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType) 1544 { 1545 AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(attrNameLowercase, attrNameUppercase); 1546 final Attribute a = addByType 1547 ? Attributes.create(attrType, attrValue) 1548 : Attributes.create(attrNameUppercase, attrValue); 1549 final List<Attribute> attrList = Collections.singletonList(a); 1550 if (attrType.isOperational()) 1551 { 1552 operationalAttrs.put(attrType, attrList); 1553 } 1554 else 1555 { 1556 userAttrs.put(attrType, attrList); 1557 } 1558 } 1559 1560 /** 1561 * Describes the current search phase. 1562 */ 1563 private enum SearchPhase 1564 { 1565 /** 1566 * "Initial search" phase. The "initial search" phase is running 1567 * concurrently. All update notifications are ignored. 1568 */ 1569 INITIAL, 1570 /** 1571 * Transitioning from the "initial search" phase to the "persistent search" 1572 * phase. "Initial search" phase has finished reading from the DB. It now 1573 * verifies if any more updates have been persisted to the DB since stopping 1574 * and send them. All update notifications are blocked. 1575 */ 1576 TRANSITIONING, 1577 /** 1578 * "Persistent search" phase. "Initial search" phase has completed. All 1579 * update notifications are published. 1580 */ 1581 PERSISTENT; 1582 } 1583 1584 /** 1585 * Contains data to ensure that the same change is not sent twice to clients 1586 * because of race conditions between the "initial search" phase and the 1587 * "persistent search" phase. 1588 */ 1589 private static class SendEntryData<K extends Comparable<K>> 1590 { 1591 private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<>(SearchPhase.INITIAL); 1592 private final Object transitioningLock = new Object(); 1593 private volatile K lastKeySentByInitialSearch; 1594 1595 private SendEntryData(SearchPhase startPhase) 1596 { 1597 searchPhase.set(startPhase); 1598 } 1599 1600 private void finalizeInitialSearch() 1601 { 1602 searchPhase.set(SearchPhase.PERSISTENT); 1603 synchronized (transitioningLock) 1604 { // initial search phase has completed, release all persistent searches 1605 transitioningLock.notifyAll(); 1606 } 1607 } 1608 1609 public void transitioningToPersistentSearchPhase() 1610 { 1611 searchPhase.set(SearchPhase.TRANSITIONING); 1612 } 1613 1614 private void initialSearchSendsEntry(final K key) 1615 { 1616 lastKeySentByInitialSearch = key; 1617 } 1618 1619 private boolean persistentSearchCanSendEntry(K key) 1620 { 1621 final SearchPhase stateValue = searchPhase.get(); 1622 switch (stateValue) 1623 { 1624 case INITIAL: 1625 return false; 1626 case TRANSITIONING: 1627 synchronized (transitioningLock) 1628 { 1629 while (SearchPhase.TRANSITIONING.equals(searchPhase.get())) 1630 { 1631 // "initial search" phase is over, and is now verifying whether new 1632 // changes have been published to the DB. 1633 // Wait for this check to complete 1634 try 1635 { 1636 transitioningLock.wait(); 1637 } 1638 catch (InterruptedException e) 1639 { 1640 Thread.currentThread().interrupt(); 1641 // Shutdown must have been called. Stop sending entries. 1642 return false; 1643 } 1644 } 1645 } 1646 return key.compareTo(lastKeySentByInitialSearch) > 0; 1647 case PERSISTENT: 1648 return true; 1649 default: 1650 throw new RuntimeException("Not implemented for " + stateValue); 1651 } 1652 } 1653 } 1654 1655 /** Sends entries to clients for change number searches. */ 1656 private static class ChangeNumberEntrySender 1657 { 1658 private final SearchOperation searchOp; 1659 private final long lowestChangeNumber; 1660 private final long highestChangeNumber; 1661 private final SendEntryData<Long> sendEntryData; 1662 1663 private ChangeNumberEntrySender(SearchOperation searchOp, SearchPhase startPhase, ChangeNumberRange range) 1664 { 1665 this.searchOp = searchOp; 1666 this.sendEntryData = new SendEntryData<>(startPhase); 1667 this.lowestChangeNumber = range.lowerBound; 1668 this.highestChangeNumber = range.upperBound; 1669 } 1670 1671 /** 1672 * Indicates if provided change number is compatible with last change 1673 * number. 1674 * 1675 * @param changeNumber 1676 * The change number to test. 1677 * @return {@code true} if and only if the provided change number is in the 1678 * range of the last change number. 1679 */ 1680 boolean changeNumberIsInRange(long changeNumber) 1681 { 1682 return highestChangeNumber == -1 || changeNumber <= highestChangeNumber; 1683 } 1684 1685 private void finalizeInitialSearch() 1686 { 1687 sendEntryData.finalizeInitialSearch(); 1688 } 1689 1690 private void transitioningToPersistentSearchPhase() 1691 { 1692 sendEntryData.transitioningToPersistentSearchPhase(); 1693 } 1694 1695 /** 1696 * @return {@code true} if search should continue, {@code false} otherwise 1697 */ 1698 private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg, 1699 MultiDomainServerState cookie) throws DirectoryException 1700 { 1701 final DN baseDN = cnIndexRecord.getBaseDN(); 1702 sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber()); 1703 final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg); 1704 return sendEntryIfMatches(searchOp, entry, null); 1705 } 1706 1707 private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException 1708 { 1709 if (sendEntryData.persistentSearchCanSendEntry(changeNumber)) 1710 { 1711 sendEntryIfMatches(searchOp, entry, null); 1712 } 1713 } 1714 } 1715 1716 /** Sends entries to clients for cookie-based searches. */ 1717 private static class CookieEntrySender { 1718 private final SearchOperation searchOp; 1719 private final SearchPhase startPhase; 1720 private final Set<DN> excludedBaseDNs; 1721 private final MultiDomainServerState cookie; 1722 private final ConcurrentSkipListMap<ReplicaId, SendEntryData<CSN>> replicaIdToSendEntryData = 1723 new ConcurrentSkipListMap<>(); 1724 1725 private CookieEntrySender(SearchOperation searchOp, SearchPhase startPhase, MultiDomainServerState cookie, 1726 Set<DN> excludedBaseDNs) 1727 { 1728 this.searchOp = searchOp; 1729 this.startPhase = startPhase; 1730 this.cookie = cookie; 1731 this.excludedBaseDNs = excludedBaseDNs; 1732 } 1733 1734 private void finalizeInitialSearch() 1735 { 1736 for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) 1737 { 1738 sendEntryData.finalizeInitialSearch(); 1739 } 1740 } 1741 1742 private void transitioningToPersistentSearchPhase() 1743 { 1744 for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) 1745 { 1746 sendEntryData.transitioningToPersistentSearchPhase(); 1747 } 1748 } 1749 1750 private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn) 1751 { 1752 final ReplicaId replicaId = ReplicaId.of(baseDN, csn.getServerId()); 1753 SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId); 1754 if (data == null) 1755 { 1756 final SendEntryData<CSN> newData = new SendEntryData<>(startPhase); 1757 data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData); 1758 return data == null ? newData : data; 1759 } 1760 return data; 1761 } 1762 1763 private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException 1764 { 1765 final CSN csn = updateMsg.getCSN(); 1766 final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); 1767 sendEntryData.initialSearchSendsEntry(csn); 1768 final String cookieString = updateCookie(baseDN, updateMsg.getCSN()); 1769 final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); 1770 return sendEntryIfMatches(searchOp, entry, cookieString); 1771 } 1772 1773 private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg) 1774 throws DirectoryException 1775 { 1776 final CSN csn = updateMsg.getCSN(); 1777 final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); 1778 if (sendEntryData.persistentSearchCanSendEntry(csn)) 1779 { 1780 // multi threaded case: wait for the "initial search" phase to set the cookie 1781 final String cookieString = updateCookie(baseDN, updateMsg.getCSN()); 1782 final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); 1783 // FIXME JNR use this instead of previous line: 1784 // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString)); 1785 sendEntryIfMatches(searchOp, cookieEntry, cookieString); 1786 } 1787 } 1788 1789 private String updateCookie(DN baseDN, final CSN csn) 1790 { 1791 synchronized (cookie) 1792 { // forbid concurrent updates to the cookie 1793 cookie.update(baseDN, csn); 1794 return cookie.toString(); 1795 } 1796 } 1797 } 1798}