/* * Copyright 2006 Simon Raess * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package net.sf.beep4j.internal.tcp; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.LinkedList; import net.sf.beep4j.Message; import net.sf.beep4j.ProtocolException; import net.sf.beep4j.internal.stream.DataHeader; import net.sf.beep4j.internal.stream.Frame; import net.sf.beep4j.internal.stream.MessageType; import net.sf.beep4j.internal.stream.DataHeader.ANSHeader; import net.sf.beep4j.internal.util.Assert; import net.sf.beep4j.transport.Transport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Default implementation of the {@link ChannelController} interface for use * by the TCP mapping implementation. * * @author Simon Raess */ final class DefaultChannelController implements ChannelController { private static final Logger LOG = LoggerFactory.getLogger(ChannelController.class); private static final Charset ASCII_CHARSET = Charset.forName("US-ASCII"); private static final int MINIMUM_FRAME_SIZE = 1; private final int channel; private final SlidingWindow window; private final SlidingWindow senderWindow; private final LinkedList frames = new LinkedList(); private final Transport transport; private long seqno; public DefaultChannelController(Transport transport, int channel, int window) { Assert.notNull("transport", transport); this.transport = transport; this.channel = channel; this.senderWindow = new SlidingWindow(window); this.window = new SlidingWindow(window); } public synchronized void updateSendWindow(long ackno, int size) { LOG.debug("update send window: ackno=" + ackno + ",window=" + size); senderWindow.slide(ackno, size); sendFrames(transport); } public synchronized void sendANS(int messageNumber, int answerNumber, Message message) { LOG.debug("sendANS to message " + messageNumber + " with answer number " + answerNumber + " on channel " + channel); ByteBuffer buffer = message.asByteBuffer(); DataHeader header = new ANSHeader( channel, messageNumber, false, seqno, buffer.remaining(), answerNumber); seqno += buffer.remaining(); Frame frame = new Frame(header, buffer); enqueueFrame(frame); sendFrames(transport); } public synchronized void sendERR(int messageNumber, Message message) { LOG.debug("sendERR to message " + messageNumber + " on channel " + channel); ByteBuffer buffer = message.asByteBuffer(); DataHeader header = new DataHeader( MessageType.ERR, channel, messageNumber, false, seqno, buffer.remaining()); seqno += buffer.remaining(); Frame frame = new Frame(header, buffer); enqueueFrame(frame); sendFrames(transport); } public synchronized void sendMSG(int messageNumber, Message message) { LOG.debug("sendMSG with message number " + messageNumber + " on channel " + channel); ByteBuffer buffer = message.asByteBuffer(); DataHeader header = new DataHeader( MessageType.MSG, channel, messageNumber, false, seqno, buffer.remaining()); seqno += buffer.remaining(); Frame frame = new Frame(header, buffer); enqueueFrame(frame); sendFrames(transport); } public synchronized void sendNUL(int messageNumber) { LOG.debug("sendNUL to message " + messageNumber + " on channel " + channel); DataHeader header = new DataHeader( MessageType.NUL, channel, messageNumber, false, seqno, 0); Frame frame = new Frame(header, ByteBuffer.allocate(0)); enqueueFrame(frame); sendFrames(transport); } public synchronized void sendRPY(int messageNumber, Message message) { LOG.debug("sendRPY to message " + messageNumber + " on channel " + channel); ByteBuffer buffer = message.asByteBuffer(); DataHeader header = new DataHeader( MessageType.RPY, channel, messageNumber, false, seqno, buffer.remaining()); seqno += buffer.remaining(); Frame frame = new Frame(header, buffer); enqueueFrame(frame); int count = sendFrames(transport); LOG.debug("sendRPY caused " + count + " frames to be sent"); } long id; public synchronized void checkFrame(long seqno, int payloadSize) { if (seqno != window.getPosition()) { throw new ProtocolException("sequence number " + seqno + " does not " + "match expected sequence number " + window.getPosition()); } if (window.remaining() < payloadSize) { throw new ProtocolException("message larger than remaining window size (remaining=" + window.remaining() + ",payload size=" + payloadSize + ")"); } } public synchronized void frameReceived(long seqno, int size) { if (seqno != window.getPosition()) { throw new IllegalStateException("sequence number " + seqno + " does not " + "match expected sequence number " + window.getPosition()); } LOG.debug("frameReceived on channel " + channel + ": seqno=" + seqno + ",size=" + size); window.moveBy(size); LOG.debug("receiver window = " + window); if (window.remaining() <= 0.5 * window.getWindowSize()) { long ackno = seqno + size; int windowSize = window.getWindowSize(); window.slide(ackno, windowSize); LOG.debug("sending SEQ frame on channel " + channel + ": ackno=" + ackno + ",window=" + windowSize); LOG.debug("receiver window = " + window); transport.sendBytes(createSEQFrame(channel, ackno, windowSize)); } } private ByteBuffer createSEQFrame(int channel, long ackno, int window) { StringBuilder buf = new StringBuilder(SEQHeader.TYPE); buf.append(" "); buf.append(channel); buf.append(" "); buf.append(ackno); buf.append(" "); buf.append(window); buf.append("\r\n"); return ASCII_CHARSET.encode(buf.toString()); } private void enqueueFrame(Frame frame) { frames.addLast(frame); } protected int sendFrames(Transport transport) { int count = 0; Frame frame; while ((frame = nextFrame()) != null) { LOG.debug("send frame " + frame.getHeader()); senderWindow.moveBy(frame.getSize()); frame.send(transport); LOG.debug("sender window = " + senderWindow); count++; } return count; } private Frame nextFrame() { if (frames.isEmpty()) { return null; } else { Frame frame = frames.removeFirst(); if (frame.getSize() <= senderWindow.remaining()) { LOG.debug("sending frame unchanged (channel=" + channel + ")"); if (frames.isEmpty()) { LOG.debug("sending last frame in buffer (channel=" + channel + ")"); } return frame; } else if (senderWindow.remaining() >= MINIMUM_FRAME_SIZE) { LOG.debug("split frame at position " + senderWindow.remaining() + " (channel=" + channel + ")"); Frame[] split = frame.split(senderWindow.remaining()); frames.addFirst(split[1]); return split[0]; } else { frames.addFirst(frame); return null; } } } }