/* * Copyright (c) 2015 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.spotify.dns; import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableSet; import java.util.Set; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A {@link ChangeNotifier} that resolves and provides records using a {@link DnsSrvResolver}. * *

The records are refreshable when {@link #run()} is called. */ class ServiceResolvingChangeNotifier extends AbstractChangeNotifier implements ChangeNotifierFactory.RunnableChangeNotifier { private static final Logger log = LoggerFactory.getLogger(ServiceResolvingChangeNotifier.class); private final DnsSrvResolver resolver; private final String fqdn; private final Function resultTransformer; private final ErrorHandler errorHandler; private volatile Set records = ChangeNotifiers.initialEmptyDataInstance(); private volatile boolean waitingForFirstEvent = true; private volatile boolean run = true; /** * Create a {@link ChangeNotifier} that tracks changes from a {@link DnsSrvResolver}. * *

The list of {@link LookupResult}s will be transformed using the provided function * and put into a set. The set will then be compared to the previous set and if a * change is detected, the notifier will fire. * *

An optional {@link ErrorHandler} can be used to react on {@link DnsException}s thrown * by the {@link DnsSrvResolver}. * * @param resolver The resolver to use. * @param fqdn The name to lookup SRV records for * @param resultTransformer The transform function * @param errorHandler The error handler that will receive exceptions (nullable) */ ServiceResolvingChangeNotifier(final DnsSrvResolver resolver, final String fqdn, final Function resultTransformer, final ErrorHandler errorHandler) { this.resolver = requireNonNull(resolver, "resolver"); this.fqdn = requireNonNull(fqdn, "fqdn"); this.resultTransformer = requireNonNull(resultTransformer, "resultTransformer"); this.errorHandler = errorHandler; } @Override protected void closeImplementation() { run = false; } @Override public Set current() { return records; } @Override public void run() { if (!run) { return; } resolver.resolveAsync(fqdn).whenComplete((nodes, e) -> { if (e instanceof DnsException) { if (errorHandler != null) { errorHandler.handle(fqdn, (DnsException) e); } log.error(e.getMessage(), e); fireIfFirstError(); } else if (e != null) { log.error(e.getMessage(), e); fireIfFirstError(); } else { final Set current; try { ImmutableSet.Builder builder = ImmutableSet.builder(); for (LookupResult node : nodes) { T transformed = resultTransformer.apply(node); builder.add(requireNonNull(transformed, "transformed")); } current = builder.build(); } catch (Exception transformerException) { log.error(transformerException.getMessage(), transformerException); fireIfFirstError(); return; } if (ChangeNotifiers.isNoLongerInitial(current, records) || !current.equals(records)) { // This means that any subsequent DNS error will be ignored and the existing result will be kept waitingForFirstEvent = false; final ChangeNotification changeNotification = newChangeNotification(current, records); records = current; fireRecordsUpdated(changeNotification); } } }); } private void fireIfFirstError() { if (waitingForFirstEvent) { waitingForFirstEvent = false; Set previous = current(); records = ImmutableSet.of(); fireRecordsUpdated(newChangeNotification(records, previous)); } } }