/*
* Copyright 2006 Simon Raess
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.sf.beep4j.internal.session;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import net.sf.beep4j.ChannelFilterChainBuilder;
import net.sf.beep4j.ChannelHandler;
import net.sf.beep4j.ChannelHandlerFactory;
import net.sf.beep4j.Message;
import net.sf.beep4j.ProfileInfo;
import net.sf.beep4j.ProtocolException;
import net.sf.beep4j.ReplyHandler;
import net.sf.beep4j.SessionHandler;
import net.sf.beep4j.internal.NullChannelFilterChainBuilder;
import net.sf.beep4j.internal.SessionListener;
import net.sf.beep4j.internal.SessionManager;
import net.sf.beep4j.internal.StartChannelResponse;
import net.sf.beep4j.internal.TransportHandler;
import net.sf.beep4j.internal.management.BEEPError;
import net.sf.beep4j.internal.management.CloseCallback;
import net.sf.beep4j.internal.management.Greeting;
import net.sf.beep4j.internal.management.ManagementProfile;
import net.sf.beep4j.internal.management.ManagementProfileImpl;
import net.sf.beep4j.internal.management.StartChannelCallback;
import net.sf.beep4j.internal.stream.BeepStream;
import net.sf.beep4j.internal.stream.MessageHandler;
import net.sf.beep4j.internal.util.Assert;
import net.sf.beep4j.internal.util.IntegerSequence;
import net.sf.beep4j.internal.util.Sequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default implementation of the Session interface. Objects of this class are
* the central object in a BEEP session.
*
*
* - dispatch incoming messages
* - send outgoing messages
* - manage channel start and close
*
*
* @author Simon Raess
*/
public class SessionImpl
implements MessageHandler, SessionManager, InternalSession, TransportHandler {
private static final int MANAGEMENT_CHANNEL = 0;
private final Logger LOG = LoggerFactory.getLogger(SessionImpl.class);
private final boolean initiator;
private final Map channels = new HashMap();
private final ManagementProfile channelManagementProfile;
private final BeepStream beepStream;
private final SessionHandler sessionHandler;
private final Sequence channelNumberSequence;
private ChannelFilterChainBuilder filterChainBuilder = new NullChannelFilterChainBuilder();
private final ReentrantLock sessionLock = new ReentrantLock();
private final List eventListeners = Collections.synchronizedList(new LinkedList());
private SessionState currentState;
private SessionState initialState;
private SessionState aliveState;
private SessionState closeInitiatedState;
private SessionState deadState;
/**
* The greeting received from the other peer.
*/
private Greeting greeting;
public SessionImpl(boolean initiator, SessionHandler sessionHandler, BeepStream beepStream) {
Assert.notNull("sessionHandler", sessionHandler);
Assert.notNull("beepStream", beepStream);
this.initiator = initiator;
this.sessionHandler = new UnlockingSessionHandler(sessionHandler, sessionLock);
this.beepStream = beepStream;
addSessionListener(beepStream);
this.channelManagementProfile = createManagementProfile(initiator);
initChannelManagementProfile();
this.channelNumberSequence = new IntegerSequence(initiator ? 1 : 2, 2);
initialState = createInitialState();
aliveState = createAliveState();
closeInitiatedState = createCloseInitiatedState();
deadState = createDeadState();
currentState = initialState;
}
public void setChannelFilterChainBuilder(ChannelFilterChainBuilder filterChainBuilder) {
this.filterChainBuilder = filterChainBuilder == null ? new NullChannelFilterChainBuilder() : filterChainBuilder;
}
protected DeadState createDeadState() {
return new DeadState();
}
protected SessionState createCloseInitiatedState() {
return new CloseInitiatedState();
}
protected SessionState createAliveState() {
return new AliveState();
}
protected SessionState createInitialState() {
return new InitialState();
}
protected ManagementProfile createManagementProfile(boolean initiator) {
return new ManagementProfileImpl(initiator);
}
protected void initChannelManagementProfile() {
InternalChannel channel = new ChannelImpl(this, null, MANAGEMENT_CHANNEL, filterChainBuilder, null);
ChannelHandler channelHandler = channelManagementProfile.createChannelHandler(this, channel);
registerChannel(MANAGEMENT_CHANNEL, channel);
channel.channelOpened(channelHandler);
}
protected InternalChannel createChannel(InternalSession session, String profileUri, int channelNumber) {
return new ChannelImpl(session, profileUri, channelNumber, filterChainBuilder, sessionLock);
}
private ChannelHandler initChannel(InternalChannel channel, ChannelHandler handler) {
return new UnlockingChannelHandler(handler, sessionLock);
}
protected void lock() {
sessionLock.lock();
}
protected void unlock() {
sessionLock.unlock();
}
private String traceInfo() {
StringBuilder builder = new StringBuilder();
builder.append("[").append(System.identityHashCode(this));
builder.append("|").append(currentState);
builder.append("|").append(initiator).append("] ");
return builder.toString();
}
private void debug(Object... messages) {
if (LOG.isDebugEnabled()) {
StringBuffer buffer = new StringBuffer();
for (Object message : messages) {
buffer.append(message);
}
LOG.debug(buffer.toString());
}
}
private void info(String message) {
if (LOG.isInfoEnabled()) {
LOG.info(traceInfo() + message);
}
}
private void warn(String message, Exception e) {
LOG.warn(traceInfo() + message, e);
}
private void setCurrentState(SessionState currentState) {
debug("setting session state from ", this.currentState, " to ", currentState);
this.currentState = currentState;
}
private SessionState getCurrentState() {
return currentState;
}
public void addSessionListener(SessionListener l) {
eventListeners.add(l);
}
protected void fireChannelStarted(int channel) {
SessionListener[] listeners = eventListeners.toArray(new SessionListener[eventListeners.size()]);
for (SessionListener listener : listeners) {
listener.channelStarted(channel);
}
}
protected void fireChannelClosed(int channel) {
SessionListener[] listeners = eventListeners.toArray(new SessionListener[eventListeners.size()]);
for (SessionListener listener : listeners) {
listener.channelClosed(channel);
}
}
private int getNextChannelNumber() {
return channelNumberSequence.next();
}
protected boolean hasOpenChannels() {
return channels.size() > 1;
}
protected void registerChannel(int channelNumber, InternalChannel channel) {
channels.put(channelNumber, channel);
fireChannelStarted(channelNumber);
}
protected InternalChannel getChannel(int channelNumber) {
InternalChannel channel = channels.get(channelNumber);
if (channel == null) {
throw new ProtocolException("channel " + channelNumber + " is not known by session");
}
return channel;
}
protected void removeChannel(int channelNumber) {
channels.remove(channelNumber);
fireChannelClosed(channelNumber);
}
protected void checkInitialAliveTransition() {
if (greeting != null) {
setCurrentState(aliveState);
sessionHandler.sessionOpened(SessionImpl.this);
}
}
// --> start of Session methods <--
public String[] getProfiles() {
if (greeting == null) {
throw new IllegalStateException("greeting has not yet been received");
}
return greeting.getProfiles();
}
public void startChannel(String profileUri, ChannelHandler handler) {
startChannel(new ProfileInfo(profileUri), handler);
}
public void startChannel(final ProfileInfo profile, final ChannelHandler channelHandler) {
startChannel(new ProfileInfo[] { profile }, new ChannelHandlerFactory() {
public ChannelHandler createChannelHandler(ProfileInfo info) {
if (!profile.getUri().equals(info.getUri())) {
throw new IllegalArgumentException("profile URIs do not match: "
+ profile.getUri() + " | " + info.getUri());
}
return channelHandler;
}
public void startChannelFailed(int code, String message) {
unlock();
try {
sessionHandler.channelStartFailed(profile.getUri(), channelHandler, code, message);
} finally {
lock();
}
}
});
}
public void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory) {
lock();
try {
getCurrentState().startChannel(profiles, factory);
} finally {
unlock();
}
}
public void close() {
lock();
try {
getCurrentState().closeSession();
} finally {
unlock();
}
}
// --> end of Session methods <--
// --> start of InternalSession methods <--
/*
* This method is called by the channel implementation to send a message on
* a particular channel to the other peer.
*/
public void sendMSG(int channelNumber, int messageNumber, Message message, ReplyHandler replyHandler) {
lock();
try {
getCurrentState().sendMessage(channelNumber, messageNumber, message, replyHandler);
} finally {
unlock();
}
}
public void sendANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
lock();
try {
getCurrentState().sendANS(channelNumber, messageNumber, answerNumber, message);
} finally {
unlock();
}
}
public void sendERR(int channelNumber, int messageNumber, Message message) {
lock();
try {
getCurrentState().sendERR(channelNumber, messageNumber, message);
} finally {
unlock();
}
}
public void sendNUL(int channelNumber, int messageNumber) {
lock();
try {
getCurrentState().sendNUL(channelNumber, messageNumber);
} finally {
unlock();
}
}
public void sendRPY(int channelNumber, int messageNumber, Message message) {
lock();
try {
getCurrentState().sendRPY(channelNumber, messageNumber, message);
} finally {
unlock();
}
}
/*
* This method is called by the channel implementation to send a close channel
* request to the other peer.
*/
public void requestChannelClose(final int channelNumber, final CloseCallback callback) {
Assert.notNull("callback", callback);
lock();
try {
channelManagementProfile.closeChannel(channelNumber, new CloseCallback() {
public void closeDeclined(int code, String message) {
callback.closeDeclined(code, message);
}
public void closeAccepted() {
callback.closeAccepted();
removeChannel(channelNumber);
}
});
} finally {
unlock();
}
}
// --> end of InternalSession methods <--
// --> start of SessionManager methods <--
//
// - these methods are invoked while holding the session lock
// - they are called by the ChannelManagementProfile
// - thus, it is not necessary to lock / unlock the session lock here
/*
* This method is invoked by the ChannelManagementProfile when the other
* peer requests creating a new channel.
*/
public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
Assert.holdsLock("session", sessionLock);
return getCurrentState().channelStartRequested(channelNumber, profiles);
}
/*
* This method is invoked by the ChannelManagement profile when a channel
* close request is received. This request is passed on to the ChannelHandler,
* that is the application, which decides what to do with the request to
* close the channel.
*/
public void channelCloseRequested(final int channelNumber, final CloseCallback callback) {
Assert.holdsLock("session", sessionLock);
getCurrentState().channelCloseRequested(channelNumber, callback);
}
/*
* This method is invoked by the ChannelManagement profile when a session
* close request is received.
*/
public void sessionCloseRequested(CloseCallback callback) {
Assert.holdsLock("session", sessionLock);
getCurrentState().sessionCloseRequested(callback);
}
// --> end of SessionManager methods <--
// --> start of MessageHandler methods <--
public final void receiveMSG(int channelNumber, int messageNumber, Message message) {
lock();
try {
getCurrentState().receiveMSG(channelNumber, messageNumber, message);
} finally {
unlock();
}
}
public final void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
lock();
try {
getCurrentState().receiveANS(channelNumber, messageNumber, answerNumber, message);
} finally {
unlock();
}
}
public final void receiveNUL(int channelNumber, int messageNumber) {
lock();
try {
getCurrentState().receiveNUL(channelNumber, messageNumber);
} finally {
unlock();
}
}
public final void receiveERR(int channelNumber, int messageNumber, Message message) {
lock();
try {
getCurrentState().receiveERR(channelNumber, messageNumber, message);
} finally {
unlock();
}
}
public final void receiveRPY(int channelNumber, int messageNumber, Message message) {
lock();
try {
getCurrentState().receiveRPY(channelNumber, messageNumber, message);
} finally {
unlock();
}
}
// --> end of MessageHandler methods <--
/*
* Notifies the ChannelManagementProfile about this event. The
* ChannelManagementProfile then asks the application (SessionHandler)
* whether to accept the connection and sends the appropriate response.
*/
public void connectionEstablished(SocketAddress address) {
lock();
try {
getCurrentState().connectionEstablished(address);
} finally {
unlock();
}
}
public void exceptionCaught(Throwable cause) {
lock();
try {
getCurrentState().exceptionCaught(cause);
} finally {
unlock();
}
}
public void connectionClosed() {
lock();
try {
getCurrentState().connectionClosed();
} finally {
unlock();
}
}
// --> end of TransportHandler methods <--
/**
* Interface of session states. The whole implementation of the SessionImpl
* class is based on the state pattern. All the important methods are
* delegated to an implementation of SessionState. A BEEP session is
* inherently state dependent. Some actions are not supported in
* certain states.
*/
protected static interface SessionState extends MessageHandler {
void connectionEstablished(SocketAddress address);
void sendRPY(int channelNumber, int messageNumber, Message message);
void sendNUL(int channelNumber, int messageNumber);
void sendERR(int channelNumber, int messageNumber, Message message);
void sendANS(int channelNumber, int messageNumber, int answerNumber, Message message);
void exceptionCaught(Throwable cause);
void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory);
void sendMessage(int channelNumber, int messageNumber, Message message, ReplyHandler listener);
void closeSession();
StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles);
void channelCloseRequested(int channelNumber, CloseCallback callback);
void sessionCloseRequested(CloseCallback callback);
void connectionClosed();
}
protected abstract class AbstractSessionState implements SessionState {
public abstract String getName();
public void exceptionCaught(Throwable cause) {
// TODO: how to handle other exceptions?
if (cause instanceof ProtocolException) {
handleProtocolException((ProtocolException) cause);
}
}
private void handleProtocolException(ProtocolException cause) {
warn("dropping connection because of a protocol exception", cause);
try {
sessionHandler.sessionClosed();
} finally {
setCurrentState(deadState);
beepStream.closeTransport();
}
}
public void connectionEstablished(SocketAddress address) {
throw new IllegalStateException("connection already established, state=<"
+ getName() + ">");
}
public void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory) {
throw new IllegalStateException("" +
"cannot start channel in state <" + getName() + ">");
}
public void sendMessage(int channelNumber, int messageNumber, Message message, ReplyHandler listener) {
throw new IllegalStateException(
"cannot send messages in state <" + getName() + ">: channel="
+ channelNumber);
}
public void sendANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
throw new IllegalStateException(
"cannot send ANS message in state <" + getName() + ">: channel="
+ channelNumber);
}
public void sendERR(int channelNumber, int messageNumber, Message message) {
throw new IllegalStateException(
"cannot send ERR message in state <" + getName() + ">: channel="
+ channelNumber);
}
public void sendNUL(int channelNumber, int messageNumber) {
throw new IllegalStateException(
"cannot send NUL message in state <" + getName() + ">: channel="
+ channelNumber);
}
public void sendRPY(int channelNumber, int messageNumber, Message message) {
throw new IllegalStateException(
"cannot send RPY message in state <" + getName() + ">: channel="
+ channelNumber);
}
public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
return StartChannelResponse.createCancelledResponse(550, "cannot start channel");
}
public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
throw new IllegalStateException(
"internal error: unexpected method invocation in state <" + getName() + ">: "
+ "message ANS, channel=" + channelNumber
+ ",message=" + messageNumber
+ ",answerNumber=" + answerNumber);
}
public void receiveERR(int channelNumber, int messageNumber, Message message) {
throw new IllegalStateException(
"internal error: unexpected method invocation in state <" + getName() + ">: "
+ "message ERR, channel=" + channelNumber + ",message=" + messageNumber);
}
public void receiveMSG(int channelNumber, int messageNumber, Message message) {
throw new IllegalStateException(
"internal error: unexpected method invocation in state <" + getName() + ">: "
+ "message MSG, channel=" + channelNumber + ",message=" + messageNumber);
}
public void receiveNUL(int channelNumber, int messageNumber) {
throw new IllegalStateException(
"internal error: unexpected method invocation in state <" + getName() + ">: "
+ "message NUL, channel=" + channelNumber + ",message=" + messageNumber);
}
public void receiveRPY(int channelNumber, int messageNumber, Message message) {
throw new IllegalStateException(
"internal error: unexpected method invocation in state <" + getName() + ">: "
+ "message RPY, channel=" + channelNumber + ",message=" + messageNumber);
}
public void closeSession() {
throw new IllegalStateException("cannot close session");
}
public void channelCloseRequested(int channelNumber, CloseCallback callback) {
throw new IllegalStateException("cannot close channel");
}
public void sessionCloseRequested(CloseCallback callback) {
throw new IllegalStateException("cannot close session");
}
public void connectionClosed() {
try {
sessionHandler.sessionClosed();
} finally {
setCurrentState(deadState);
}
}
}
protected class InitialState extends AbstractSessionState {
@Override
public String getName() {
return "initial";
}
public void connectionEstablished(SocketAddress address) {
DefaultStartSessionRequest request = new DefaultStartSessionRequest(!initiator);
sessionHandler.connectionEstablished(request);
if (request.isCancelled()) {
beepStream.sendERR(MANAGEMENT_CHANNEL, 0, channelManagementProfile.createSessionStartDeclined(request.getReplyCode(), request.getMessage()));
setCurrentState(deadState);
beepStream.closeTransport();
} else {
beepStream.sendRPY(MANAGEMENT_CHANNEL, 0, channelManagementProfile.createGreeting(request.getProfiles()));
}
}
public void receiveMSG(int channelNumber, int messageNumber, Message message) {
throw new ProtocolException(
"first message in a session must be RPY or ERR on channel 0: "
+ "was MSG channel=" + channelNumber + ",message=" + messageNumber);
}
public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
throw new ProtocolException(
"first message in a session must be RPY or ERR on channel 0: "
+ "was ANS channel=" + channelNumber + ",message=" + messageNumber);
}
public void receiveNUL(int channelNumber, int messageNumber) {
throw new ProtocolException(
"first message in a session must be RPY or ERR on channel 0: "
+ "was NUL channel=" + channelNumber + ",message=" + messageNumber);
}
public void receiveRPY(final int channelNumber, final int messageNumber, final Message message) {
validateMessage(channelNumber, messageNumber);
greeting = channelManagementProfile.receivedGreeting(message);
checkInitialAliveTransition();
}
public void receiveERR(int channelNumber, int messageNumber, Message message) {
validateMessage(channelNumber, messageNumber);
BEEPError error = channelManagementProfile.receivedError(message);
info("received error, session start failed: " + error.getCode() + ":" + error.getMessage());
sessionHandler.sessionStartDeclined(error.getCode(), error.getMessage());
setCurrentState(deadState);
beepStream.closeTransport();
}
private void validateMessage(int channelNumber, int messageNumber) {
if (channelNumber != MANAGEMENT_CHANNEL || messageNumber != 0) {
throw new ProtocolException("first message in session must be sent on "
+ "channel 0 with message number 0: was channel " + channelNumber
+ ", message=" + messageNumber);
}
}
@Override
public String toString() {
return "";
}
}
protected class AliveState extends AbstractSessionState {
@Override
public String getName() {
return "alive";
}
@Override
public void startChannel(final ProfileInfo[] profiles, final ChannelHandlerFactory factory) {
final int channelNumber = getNextChannelNumber();
channelManagementProfile.startChannel(channelNumber, profiles, new StartChannelCallback() {
public void channelCreated(ProfileInfo info) {
lock();
try {
ChannelHandler handler = factory.createChannelHandler(info);
InternalChannel channel = createChannel(
SessionImpl.this, info.getUri(), channelNumber);
ChannelHandler channelHandler = initChannel(channel, handler);
registerChannel(channelNumber, channel);
channel.channelOpened(channelHandler);
} finally {
unlock();
}
}
public void channelFailed(int code, String message) {
lock();
try {
factory.startChannelFailed(code, message);
} finally {
unlock();
}
}
});
}
@Override
public void sendMessage(int channelNumber, int messageNumber, Message message, ReplyHandler listener) {
debug("send message: channel=", channelNumber, ",message=", messageNumber);
beepStream.sendMSG(channelNumber, messageNumber, message);
}
@Override
public void sendANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
debug("send ANS: channel=", channelNumber, ",message=", messageNumber, ",answer=", answerNumber);
beepStream.sendANS(channelNumber, messageNumber, answerNumber, message);
}
@Override
public void sendERR(int channelNumber, int messageNumber, Message message) {
debug("send ERR: channel=", channelNumber, ",message=", messageNumber);
beepStream.sendERR(channelNumber, messageNumber, message);
}
@Override
public void sendNUL(int channelNumber, int messageNumber) {
debug("send NUL: channel=", channelNumber, ",message=", messageNumber);
beepStream.sendNUL(channelNumber, messageNumber);
}
@Override
public void sendRPY(int channelNumber, int messageNumber, Message message) {
debug("send RPY: channel=", channelNumber, ",message=", messageNumber);
beepStream.sendRPY(channelNumber, messageNumber, message);
}
@Override
public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
debug("start of channel ", channelNumber, " requested by remote peer: ", Arrays.toString(profiles));
// of course, requesting to start the same channel twice is non-sense, terminate session
if (channels.containsKey(channelNumber)) {
throw new ProtocolException("the given channel with number " + channelNumber + " is already open");
}
DefaultStartChannelRequest request = new DefaultStartChannelRequest(profiles);
sessionHandler.channelStartRequested(request);
StartChannelResponse response = request.getResponse();
if (response.isCancelled()) {
debug("start of channel ", channelNumber, " is cancelled by application: ", response.getCode(),
",'", response.getMessage(), "'");
return response;
}
ProfileInfo info = response.getProfile();
if (info == null) {
throw new ProtocolException("StartChannelRequest must be either cancelled or a profile must be selected");
}
debug("start of channel ", channelNumber, " is accepted by application: ", info.getUri());
InternalChannel channel = createChannel(SessionImpl.this, info.getUri(), channelNumber);
ChannelHandler handler = initChannel(channel, response.getChannelHandler());
registerChannel(channelNumber, channel);
channel.channelOpened(handler);
return response;
}
@Override
public void receiveMSG(int channelNumber, int messageNumber, Message message) {
InternalChannel channel = getChannel(channelNumber);
channel.receiveMSG(messageNumber, message);
}
@Override
public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
InternalChannel channel = getChannel(channelNumber);
channel.receiveANS(messageNumber, answerNumber, message);
}
@Override
public void receiveNUL(int channelNumber, int messageNumber) {
InternalChannel channel = getChannel(channelNumber);
channel.receiveNUL(messageNumber);
}
@Override
public void receiveERR(int channelNumber, int messageNumber, Message message) {
InternalChannel channel = getChannel(channelNumber);
channel.receiveERR(messageNumber, message);
}
@Override
public void receiveRPY(int channelNumber, int messageNumber, Message message) {
InternalChannel channel = getChannel(channelNumber);
channel.receiveRPY(messageNumber, message);
}
@Override
public void closeSession() {
// TODO: do not allow session close if there are still open channels
setCurrentState(closeInitiatedState);
channelManagementProfile.closeSession(new CloseCallback() {
public void closeDeclined(int code, String message) {
Assert.holdsLock("session", sessionLock);
performClose();
}
public void closeAccepted() {
Assert.holdsLock("session", sessionLock);
performClose();
}
private void performClose() {
try {
sessionHandler.sessionClosed();
} finally {
setCurrentState(deadState);
beepStream.closeTransport();
}
}
});
}
@Override
public void channelCloseRequested(final int channelNumber, final CloseCallback callback) {
InternalChannel channel = getChannel(channelNumber);
channel.channelCloseRequested(new CloseCallback() {
public void closeDeclined(final int code, final String message) {
callback.closeDeclined(code, message);
}
public void closeAccepted() {
callback.closeAccepted();
removeChannel(channelNumber);
}
});
}
@Override
public void sessionCloseRequested(CloseCallback callback) {
if (hasOpenChannels()) {
callback.closeDeclined(550, "still working");
} else {
callback.closeAccepted();
try {
sessionHandler.sessionClosed();
} finally {
setCurrentState(deadState);
beepStream.closeTransport();
}
}
}
@Override
public String toString() {
return "";
}
}
protected class CloseInitiatedState extends AliveState {
@Override
public String getName() {
return "close-initiated";
}
@Override
public void sessionCloseRequested(CloseCallback callback) {
debug("received session close request while close is already in progress");
try {
sessionHandler.sessionClosed();
} finally {
callback.closeAccepted();
beepStream.closeTransport();
setCurrentState(deadState);
}
}
@Override
public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
return StartChannelResponse.createCancelledResponse(550, "session release in progress");
}
@Override
public String toString() {
return "";
}
}
protected class DeadState extends AbstractSessionState {
@Override
public String getName() {
return "dead";
}
public void connectionClosed() {
// ignore this one
}
@Override
public String toString() {
return "";
}
}
}