/* * Copyright 2006 Simon Raess * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package net.sf.beep4j.internal.stream; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import net.sf.beep4j.Message; import net.sf.beep4j.ProtocolException; import net.sf.beep4j.internal.message.DefaultMessageParser; import net.sf.beep4j.internal.message.MessageParser; import net.sf.beep4j.internal.stream.DataHeader.ANSHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * MessageAssembler assembles fragmented frames into a Message. * The assembled Messages are passed to a {@link MessageHandler}. * * @author Simon Raess */ public class MessageAssembler implements FrameHandler { private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class); private final MessageHandler handler; private State currentState; public MessageAssembler(MessageHandler handler) { this.handler = handler; } private void setCurrentState(State state) { if (LOG.isDebugEnabled()) { LOG.debug("moving from " + currentState + " to " + state); } currentState = state; } // --> start of FrameHandler methods <-- public void handleFrame(Frame frame) { LOG.debug("got frame: " + frame.getHeader()); if (currentState == null) { MessageType type = frame.getHeader().getType(); if (MessageType.ANS == type || MessageType.NUL == type) { setCurrentState(new AnsState()); } else { setCurrentState(new NormalState()); } } // pass on to the state currentState.append(frame, handler); } // --> end of FrameHandler methods <-- protected Message createMessage(List frames) { if (frames.size() == 0) { throw new IllegalArgumentException("cannot create message from 0 fragments"); } LOG.debug("creating message from " + frames.size() + " frames"); int total = 0; for (Frame frame : frames) { long check = total + frame.getSize(); if (check > Integer.MAX_VALUE) { throw new ProtocolException("total message length is longer " + "than supported: " + check); } total += frame.getPayload().remaining(); } LOG.debug("total payload size is " + total); ByteBuffer buffer = ByteBuffer.allocate(total); for (Frame frame : frames) { buffer.put(frame.getPayload()); } buffer.flip(); MessageParser parser = new DefaultMessageParser(); return parser.parse(buffer); } protected void receive(MessageType type, int channelNumber, int messageNumber, Message message) { if (MessageType.ERR == type) { handler.receiveERR(channelNumber, messageNumber, message); } else if (MessageType.MSG == type) { handler.receiveMSG(channelNumber, messageNumber, message); } else if (MessageType.RPY == type) { handler.receiveRPY(channelNumber, messageNumber, message); } else { throw new IllegalArgumentException("unkown type: " + type); } } protected void receive(int channelNumber, int messageNumber, int answerNumber, Message message) { handler.receiveANS(channelNumber, messageNumber, answerNumber, message); } private static interface State { void append(Frame frame, MessageHandler handler); } private class NormalState implements State { private List fragments; private DataHeader last; private NormalState() { this.fragments = new LinkedList(); } private boolean hasPreviousFrame() { return last != null; } public void append(Frame frame, MessageHandler handler) { DataHeader header = (DataHeader) frame.getHeader(); MessageType type = header.getType(); if (hasPreviousFrame()) { validateMessageNumber(header); validateMatchingFragmentTypes(last.getType(), type); } fragments.add(frame); if (header.isIntermediate()) { last = (DataHeader) frame.getHeader(); } else { LOG.debug("got complete message with " + fragments.size() + " fragments"); last = null; List copy = new LinkedList(fragments); fragments.clear(); setCurrentState(null); receive(type, frame.getChannelNumber(), frame.getMessageNumber(), createMessage(copy)); } } /* * Validation of sequencing according to the BEEP specification section * 2.2.1.1. * * A frame is poorly formed, if the continuation indicator of the * previous frame received on the same channel * was intermediate ("*"), and its message number isn't identical to this frame's * message number. */ private void validateMessageNumber(DataHeader header) { if (last.getMessageNumber() != header.getMessageNumber()) { throw new ProtocolException("message number for fragments does not match: was " + header.getMessageNumber() + ", should be " + last.getMessageNumber()); } } /* * Validation of sequencing according to the BEEP specification section * 2.2.1.1. * * A frame is poorly formed if the header starts with "MSG", "RPY", "ERR", * or "ANS", and refers to a message number for which at least one other * frame has been received, and the three-character keyword starting this * frame and the immediately-previous received frame for this message * number are not identical */ private void validateMatchingFragmentTypes(MessageType last, MessageType current) { if (MessageType.ERR == current || MessageType.MSG == current || MessageType.RPY == current) { if (!last.equals(current)) { throw new ProtocolException("header type does not match: expected " + last + " but was " + current); } } } @Override public String toString() { return ""; } } private class AnsState implements State { private Map> fragments; private int messageNumber = -1; private AnsState() { this.fragments = new HashMap>(); } public void append(Frame frame, MessageHandler handler) { MessageType type = frame.getType(); if (messageNumber == -1) { messageNumber = frame.getMessageNumber(); } else { validateMessageNumber(frame.getHeader()); } if (MessageType.ANS == type) { ANSHeader header = (ANSHeader) frame.getHeader(); List frames = fragments.get(header.getAnswerNumber()); if (frames == null) { frames = new LinkedList(); fragments.put(header.getAnswerNumber(), frames); } frames.add(frame); if (!header.isIntermediate()) { fragments.remove(header.getAnswerNumber()); receive(frame.getChannelNumber(), frame.getMessageNumber(), header.getAnswerNumber(), createMessage(frames)); } } else if (MessageType.NUL == type) { if (hasUnfinishedAnsMessages()) { // Validation of sequencing according to the BEEP specification section // 2.2.1.1. // // A frame is poorly formed if the header starts with "NUL", and refers to // a message number for which at least one other frame has been received, // and the keyword of of the immediately-previous received frame for // this reply isn't "ANS". throw new ProtocolException("unfinished ANS messages"); } else if (frame.isIntermediate()) { throw new ProtocolException("NUL reply's continuation indicator is '*'"); } else if (frame.getSize() != 0) { throw new ProtocolException("NUL reply's payload size is non-zero (" + frame.getSize() + ")"); } fragments.clear(); setCurrentState(null); handler.receiveNUL(frame.getChannelNumber(), frame.getMessageNumber()); } else { throw new ProtocolException("expected ANS or NUL message, was " + type.name()); } } /* * Validation of sequencing according to the BEEP specification section * 2.2.1.1. * * A frame is poorly formed, if the continuation indicator of the * previous frame received on the same channel * was intermediate ("*"), and its message number isn't identical to this frame's * message number. */ private void validateMessageNumber(DataHeader current) { if (messageNumber != current.getMessageNumber()) { throw new ProtocolException("message number for fragments does not match: was " + current.getMessageNumber() + ", should be " + messageNumber); } } private boolean hasUnfinishedAnsMessages() { return fragments.size() > 0; } @Override public String toString() { return ""; } } }