SteamStream!WEB-INF!src!steamStream!ReStreamer!!java

From Red5Wiki

Jump to: navigation, search
/*
 * SteamStream - Red5 stream transcoding application
 * Copyright (c) 2009 by Jeremy Morton - All rights reserved.
 * 
 * This application is free software; you can redistribute it and/or modify it under the
 * terms of the GNU Lesser General Public License as published by the Free Software
 * Foundation; either version 2.1 of the License, or (at your option) any later version.
 * Text of this license version 2.1 can be found at the following web address:
 * http://www.gnu.org/licenses/lgpl-2.1.txt
 */
package steamStream;
 
import java.io.File;
import java.io.IOException;
import java.util.Hashtable;
import java.util.concurrent.locks.ReentrantLock;
 
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IContext;
import org.red5.server.api.IScope;
import org.red5.server.net.rtmp.event.IRTMPEvent;
import org.red5.server.stream.BroadcastScope;
import org.red5.server.stream.IBroadcastScope;
import org.red5.server.stream.IProviderService;
import org.slf4j.Logger;
 
import com.xuggle.red5.io.BroadcastStream;
import com.xuggle.red5.io.IRTMPEventIOHandler;
import com.xuggle.red5.io.Red5HandlerFactory;
import com.xuggle.red5.io.Red5Message;
import com.xuggle.xuggler.ICodec;
import com.xuggle.xuggler.IContainer;
import com.xuggle.xuggler.IContainerFormat;
import com.xuggle.xuggler.IPacket;
import com.xuggle.xuggler.IRational;
import com.xuggle.xuggler.ISimpleMediaFile;
import com.xuggle.xuggler.IStream;
import com.xuggle.xuggler.IStreamCoder;
import com.xuggle.xuggler.IVideoPicture;
 
public class ReStreamer implements Runnable {
	private class VidQualityProfile {
		private int gopValue;
		// ^ The default GOP value for the output stream coder when outputting FLV/Sorenson Spark video was 12.
		//   The higher this is, the lower the outgoing bitrate tends to be, but the lower the quality and also
		//   the longer the decoder usually needs to wait (for an I-frame) before it can start buffering.
		private int nrValue;
		// ^ Noise reduction.  Values over 1000 (roughly) cease to be meaningful.  The higher the number, the more
		//   'noise' (and therefore detail) is removed, and the lower quality the video is.  However this does
		//   drastically reduce the bandwidth taken up by the outgoing stream.  Default value was 0.
		private int mbdValue;
		// ^ Macroblock decision.
		//   '0' FF_MB_DECISION_SIMPLE: Use mb_cmp (cannot change it yet in FFmpeg).
		//   '1' FF_MB_DECISION_BITS: Choose the one which needs the fewest bits.
		//   '2' FF_MB_DECISION_RD: rate distortion.
		//   Default value for FLV/Sorenson Spark video was 0.
 
		public VidQualityProfile() {
			// Np specified VQP values; set to defaults
			this.gopValue = 12;
			this.nrValue = 0;
			this.mbdValue = 0;
		}
		public VidQualityProfile(int gopValue, int nrValue, int mbdValue) {
			this.gopValue = gopValue;
			this.nrValue = nrValue;
			this.mbdValue = mbdValue;
		}
 
		public int GetGopValue() { return this.gopValue; }
		public void SetGopValue(int gopValue) { this.gopValue = gopValue; }
 
		public int GetNrValue() { return this.nrValue; }
		public void SetNrValue(int nrValue) { this.nrValue = nrValue; }
 
		public int GetMbdValue() { return this.mbdValue; }
		public void SetMbdValie(int mbdValue) { this.mbdValue = mbdValue; }
	}
 
	private static Logger log = Red5LoggerFactory.getLogger(Application.class, "steamStream");
	private IScope streamScope;
	private BroadcastStream outputStream;
	private ISimpleMediaFile outputStreamInfo;
	private String inputStreamName;
	private File sdpFile;
	final static private Red5HandlerFactory r5HandlerFact = Red5HandlerFactory.getFactory();
	// ^ This initializes the FFMPEG IO libraries and gets a Red5 handler factory we can register streams with.
	private IRTMPEventIOHandler handlerOutp;
	private String urlOutp;
	private IContainerFormat contInpFormat;
	private IContainerFormat contOutpFormat;
 
	// Transcoding-related vars
	private int inpIndex = 0;
	IPacket pktInp;
	IPacket pktOutp;
	private final long tsInterval = 100000;
	// ^ Manual forced incrementing of output video frame timestamps.  FLV timestamps should increment
	//   by 1000000/(framerate).  eg. with outgoing video at 10FPS, this should be 100,000.  Set to 0 to
	//   disable manual forced incrementing.
	private final int frNumerator = 10;
	// ^ Framerate numerator; we'll keep this the same between input and output streams.
	private final int frDenominator = 1;
	// ^ Framerate denominator; we'll keep this the same between input and output streams.
 
	// To change which Video Quality Profile the outputted stream will have, assign the relevant VQP instance
	// to the member named 'vqp'.
	private final VidQualityProfile vqpLow = new VidQualityProfile(100, 1500, 1);
	//private final VidQualityProfile vqpMedium = new VidQualityProfile(70, 999, 2);
	//private final VidQualityProfile vqpHigh = new VidQualityProfile(40, 0, 0);
	//private final VidQualityProfile vqpDefault = new VidQualityProfile();
 
	private final VidQualityProfile vqp = this.vqpLow;
	//private final VidQualityProfile vqp = this.vqpMedium;
	//private final VidQualityProfile vqp = this.vqpHigh;
	//private final VidQualityProfile vqp = this.vqpDefault;
 
	private IContainer contInp = null;
	private IContainer contOutp = null;
	private IStream strInp = null;
	private IStreamCoder strmCoderInp = null;
	private IStream strOutp = null;
	private IStreamCoder strmCoderOutp = null;
	private IVideoPicture vidpicInp = null;
	private int numStreams = 0;
	private IRational num;
	private long timestampCount = 0;
	private long frameNumber = 1;
	private boolean firstFrame = true;
 
	// Constructor
	/**
	 * ReStreamer class's constructor
	 * 
	 * @param streamScope The scope for which to publish the broadcast stream
	 * @param inputStreamName The filename (sans the .sdp extension) of the file in the SDP descriptors directory that describes the RTP input stream to read
	 * @param outputStreamInfo A description of the nature of the output stream to be broadcast
	 */
	public ReStreamer(IScope streamScope, String inputStreamName, ISimpleMediaFile outputStreamInfo) {
		log.info("ReStreamer constructor called.");
 
		this.inputStreamName = inputStreamName;
 
		// Grab SDP file with the specified input stream SDP name
		sdpFile = null;
 
		// Even if this actual file doesn't exist on the filesystem, the File can still be created via .getFile(); it will simply point
		// to the non-existant file location.  If/when Xuggler tries to open a non-existant file, it will fail and we'll handle it
		// gracefully with an exception.
		try {
			sdpFile = streamScope.getContext().getResource("sdpdescriptors/" + inputStreamName + ".sdp").getFile();
		} catch (IOException ex) {
			log.error("I/O exception trying to get SDP file: " + ex.toString());
		}
 
		this.streamScope = streamScope;
		this.outputStreamInfo = outputStreamInfo;
	}
 
	public void run() {
		// Start restreaming thread
		log.info("ReStreamer thread starting.  ID: " + Thread.currentThread().getId());
 
		// First, we need to set up an anonymous output handler class to do something when the Red5 handler
		// comes back to us with some Red5 RTMP-style data
		this.handlerOutp = new IRTMPEventIOHandler() {
			/**
			 * Reading not supported on this handler.
			 */
			public Red5Message read() throws InterruptedException {
				return null;
			}
 
			/**
			 * Write/dispatch an RTMP event.
			 */
			public void write(Red5Message r5Msg) throws InterruptedException {
				IRTMPEvent event = r5Msg.getData();
				if (event != null) {
					// if (event instanceof VideoData) {
						// VideoData dataPacket = (VideoData)event;
					// }
 
					outputStream.dispatchEvent(event);
					event.release();
				}
			}
		};
 
		try {
			int retVal = 0;
 
			IContext streamContext = this.streamScope.getContext();
 
			// Set up the actual output stream we're going to broadcast to.
			this.outputStream = new BroadcastStream(this.inputStreamName);
			this.outputStream.setPublishedName(this.inputStreamName);
			this.outputStream.setScope(this.streamScope);
 
			log.debug("ReStreamer: About to register output stream with provider service: " + this.inputStreamName);
 
			IProviderService providerService = (IProviderService)streamContext.getBean(IProviderService.BEAN_NAME);
			if (providerService.registerBroadcastStream(this.streamScope, this.inputStreamName, this.outputStream)) {
				IBroadcastScope bs = (BroadcastScope)providerService.getLiveProviderInput(this.streamScope, this.inputStreamName, true);
				bs.setAttribute(IBroadcastScope.STREAM_ATTRIBUTE, this.outputStream);
			}
			else {
				log.error("Got a fatal error; could not register broadcast stream: " + this.inputStreamName);
			}
			outputStream.start();
 
			log.debug("ReStreamer: Successfully registered and started output stream: " + this.inputStreamName);
 
			// Now, get processing the SDP-described input, and sending the result to said output stream.
			contInp = IContainer.make();
			contOutp = IContainer.make();
 
			// We tell FFMPEG to listen for and demux/transcode incoming RTP data by pointing it at an
			// SDP descriptor file.
			// WARNING!!!  If the SDP file specified here is invalid (as far as FFMPEG is concerned), which
			// inludes its having (old) Mac-style line endings (ie. 0D instead of 0A), the incoming stream
			// will not be listened for, and Xuggler may even lock up and max out the CPU when you
			// call .readNextPacket().  Therefore, MAKE SURE this specified SDP is *VALID*!!!
			// NB. Xuggler 2.0 and later should not have this problem of locking up when the SDP is invalid.
			contInpFormat = IContainerFormat.make();
			contInpFormat.setInputFormat("sdp");
			log.info("Publishing stream from input: " + sdpFile.getCanonicalPath());
 
			if (contInp.open(sdpFile.getCanonicalPath(), IContainer.Type.READ, contInpFormat, true, false) < 0) {
				throw new RuntimeException("Xuggler container.open failed!  Stream: " + this.inputStreamName + ", Path: " + sdpFile.getCanonicalPath());
			}
 
			// We tell Xuggler's IContainer to output to a Red5 RTMP-formatted stream by pointing it at the
			// 'redfive' protocol, and the stream name (same as the BasicScope name).
			urlOutp = Red5HandlerFactory.DEFAULT_PROTOCOL + ":" + outputStream.getName();
			outputStreamInfo.setURL(urlOutp);
			contOutpFormat = IContainerFormat.make();
			contOutpFormat.setOutputFormat("flv", urlOutp, null);
			r5HandlerFact.registerStream(this.handlerOutp, outputStreamInfo);
			// ^ If we'd set the IContainerFormat to null, we'd be telling Xuggler to guess the output container
			// format based on the outputURL.  As we're not outputting to a file here, and hence have no '.flv'
			// extension, though, we need to specify an output container format.
			log.debug("Publishing stream to output: " + urlOutp);
 
			// Packet object needed for the current input data
			pktInp = IPacket.make();
 
			// We need to wait until we receive a useful data packet to be able to dynamically setup our
			// input/output coders
			boolean gotKeyframePkt = false;
			boolean keepDecoding = true;
 
			// Because of something causing weird IPacket timestamps for the first few packets, and the knock-on
			// effect of this causing Xuggler's IVideoPicture timestamps to be invalid which will screw up the
			// FLV output, we must manually set the FLV timestamps.  They must increment monotonically.  The
			// amount by which they increment is IMPORTANT.  They should increment by 1000000/(framerate), as
			// IVideoPicture timestamps are in microseconds.
			timestampCount = 0;
 
			// Main transcoding loop...
			while (contInp.readNextPacket(pktInp) >= 0 && keepDecoding) {
				if (!gotKeyframePkt) {
					// No keyframe packet yet.  Unless this one is, drop the packet and get the next one.
					if (pktInp.isKeyPacket()) {
						// Dynamically setup our input/output coders.
						gotKeyframePkt = true;
						log.debug("Got packet with key frame data; setting up coders: " + this.inputStreamName);
						this.setupInOutCoders();
 
						// Input/output coders should now be set up and we can get on with the transcoding.
					}
				}
				else {
					// Already got keyframe packet.  Process this packet.
					keepDecoding = processPacket(pktInp);
				}
			}
 
			// Flush video encoder to make sure it knows there's no more data
			log.debug("Flushing video encoder: " + this.inputStreamName);
			strmCoderOutp.encodeVideo(pktOutp, null, 0);
 
			// Write trailer if necessary
			log.debug("Writing trailer if necessary: " + this.inputStreamName);
			retVal = contOutp.writeTrailer();
			if (retVal < 0) {
				throw new RuntimeException("Could not write trailer to output file!");
			}
		}
		catch (Exception ex) {
			log.error("Exception occurred during restreaming: " + ex.toString());
		}
		finally {
			if (strmCoderOutp != null) { strmCoderOutp.close(); }
			if (strmCoderInp != null) { strmCoderInp.close(); }
 
			if (contOutp != null) { contOutp.close(); }
			if (contInp != null) { contInp.close(); }
 
			ReentrantLock streamSubCountLock = ((ReentrantLock)this.streamScope.getAttribute("streamSubCountLock"));
			if (streamSubCountLock.isHeldByCurrentThread()) {
				// Now that we've fully stopped transcoding, unlock streamSubCount's lock...
				streamSubCountLock.unlock();
			}
		}
 
		log.info("ReStreamer thread exiting.  ID: " + Thread.currentThread().getId());
	}
 
	private void setupInOutCoders() {
		int retVal = 0;
 
		numStreams = contInp.getNumStreams();
		// ^ .getNumStreams() causes .queryStreamMetaData() to be called too, detecting additional
		//   info about our input stream.
 
		if (numStreams < 0) { 
			throw new RuntimeException("Couldn't query input stream meta data: " + this.inputStreamName);
		}
 
		// Iterate through the streams...
		strInp = null;
		strmCoderInp = null;
		for (int i = 0; i < numStreams; i++) {
			// Find the stream object
			strInp = contInp.getStream(i);
 
			// Get the pre-configured decoder that can decode this stream
			strmCoderInp = strInp.getStreamCoder();
 
			// We want to just grab the first video stream...
			if (strmCoderInp.getCodecType() == ICodec.Type.CODEC_TYPE_VIDEO) {
				break;
			}
		}
 
		if (strmCoderInp == null) {
			throw new RuntimeException("Couldn't find a video stream in incoming RTP data: " + this.inputStreamName);
		}
 
		inpIndex = strInp.getIndex();
 
		// Framerate and timebase for input...
		num = IRational.make(frNumerator, frDenominator);
		strmCoderInp.setFrameRate(num);
		// ... for FLV, the timebase should be the inverse of the framerate.
		strmCoderInp.setTimeBase(IRational.make(num.getDenominator(), num.getNumerator()));
		// ... and the pixel type seems to be YUV420P (this is required for FLV anyway!)
		strmCoderInp.setPixelType(com.xuggle.xuggler.IPixelFormat.Type.YUV420P);
		num = null;
 
		// Now that we have some useful info in our input stream coder, we can setup out output stream coder.
		// Begin by setting up its container.
 
		// Open the output container for writing.
		retVal = contOutp.open(urlOutp, IContainer.Type.WRITE, contOutpFormat);
		if (retVal < 0) {
			throw new RuntimeException("Could not open output container at URL: " + urlOutp);
		}
 
		// We're only trying to output one video stream, so create it in the container
		strOutp = contOutp.addNewStream(0);
		strmCoderOutp = strOutp.getStreamCoder();
 
		// Setup output stream coder now.  After demuxing and decoding, we encode video data into
		// Sorenson Spark (FLV1) format, ready for RTMP packets to be read by a receiving Flash client.
		strmCoderOutp.setCodec(ICodec.ID.CODEC_ID_FLV1);
		int origWidth = strmCoderInp.getWidth();
		int origHeight = strmCoderInp.getHeight();
		if (origWidth <= 0 || origHeight <= 0) {
			throw new RuntimeException("Couldn't find width or height in original video stream: " + this.inputStreamName);
		}
		strmCoderOutp.setWidth(origWidth);
		strmCoderOutp.setHeight(origHeight);
		strmCoderOutp.setPixelType(strmCoderInp.getPixelType());
 
		strmCoderOutp.setFlag(com.xuggle.xuggler.IStreamCoder.Flags.FLAG_QSCALE, true);
		strmCoderOutp.setNumPicturesInGroupOfPictures(this.vqp.GetGopValue());
		strmCoderOutp.setProperty("nr", this.vqp.GetNrValue());
		strmCoderOutp.setProperty("mbd", this.vqp.GetMbdValue());
 
		// Framerate and timebase for output...
		num = strmCoderInp.getFrameRate();
		strmCoderOutp.setFrameRate(num);
		// ... and for FLV, the timebase should be the inverse of the framerate.
		strmCoderOutp.setTimeBase(IRational.make(num.getDenominator(), num.getNumerator()));
		num = null;
 
		// Allocate buffer for us to store decoded video pictures.
		vidpicInp = IVideoPicture.make(strmCoderInp.getPixelType(), strmCoderInp.getWidth(), strmCoderInp.getHeight());
 
		retVal = strmCoderOutp.open();
		if (retVal < 0) { 
			throw new RuntimeException("Couldn't open output encoder for stream: " + this.inputStreamName);
		}
		retVal = strmCoderInp.open();
		if (retVal < 0) {
			throw new RuntimeException("Couldn't open input decoder for stream: " + this.inputStreamName);
		}
 
		retVal = contOutp.writeHeader();
		if (retVal < 0) {
			throw new RuntimeException("Couldn't write output FLV header: " + this.inputStreamName);
		}
	}
 
	/**
	 * Process the packet passed as part of our video stream, which must already have been set up.
	 * 
	 * @param pkt The packet to process
	 * @return Whether or not to continue getting packets and decoding the stream.  Note that if this value is false, the calling code MUST unlock the 'streamSubCountLock', or it won't get unlocked.
	 */
	@SuppressWarnings("unchecked")
	private boolean processPacket(IPacket pkt) {
		int retVal = 0;
		int offset = 0;
 
		// If there are no clients still subscribing to this stream, we need to stop transcoding now.
		Hashtable<String, Integer> streamSubCount = ((Hashtable<String, Integer>)this.streamScope.getAttribute("streamSubCount"));
		ReentrantLock streamSubCountLock = ((ReentrantLock)this.streamScope.getAttribute("streamSubCountLock"));
		boolean doingReturn = false;
		streamSubCountLock.lock();
		try {
			if (
				streamSubCount.containsKey(this.inputStreamName) &&
				streamSubCount.get(this.inputStreamName) == 0
			) {
				doingReturn = true;
				log.info("ReStreamer: No clients are now subscribed to stream; terminating transcoding.  Stream: " + this.inputStreamName);
 
				return false;
			}
		}
		finally {
			// IIF we aren't returning and closing the stream, release the lock at this point.
			if (!doingReturn) {
				streamSubCountLock.unlock();
			}
		}
 
		// Packet object needed for the current output data
		pktOutp = IPacket.make();
 
		// If packet belongs to the stream we're interested in...
		int pktIndex = pkt.getStreamIndex();
		if (pktIndex == inpIndex) {
			// Read the packet; if the frame is complete, transcode it.
			IVideoPicture vidpicOutp = null;
 
			while (offset < pkt.getSize()) {
				retVal = strmCoderInp.decodeVideo(vidpicInp, pkt, offset);
				if (retVal <= 0) {
					// It seems this error will sometimes occur when receiving the cameras' RTP
					// video data, as FFMPEG can't decode it perfectly.  We need to just drop this
					// packet, not throw an exception.
 
					log.trace("Could not decode this video packet!  Dropping, and carrying on: " + this.inputStreamName);
					break;
				}
 
				offset += retVal;
 
				if (vidpicInp.isComplete()) {
					// Frame complete.  Write it out via the encoder.
					vidpicOutp = vidpicInp;
 
					if (this.tsInterval > 0) {
						if (firstFrame) {
							firstFrame = false;
							timestampCount = ((int)vidpicOutp.getTimeStamp());
						}
						else {
							timestampCount += this.tsInterval;
							vidpicOutp.setTimeStamp(timestampCount);
						}
					}
 
					vidpicOutp.setQuality(0);
					retVal = strmCoderOutp.encodeVideo(pktOutp, vidpicOutp, 0);
					if (retVal < 0) {
						throw new RuntimeException("Could not encode video: " + this.inputStreamName);
					}
 
					if (pktOutp.isComplete()) {
						retVal = contOutp.writePacket(pktOutp, true);
						if (retVal < 0) {
							throw new RuntimeException("Could not write video packet: " + this.inputStreamName);
						}
 
						if (this.frameNumber++ < 8) {
							// Close/open encoder 7 times to force a keyframe for the first 8 frames (encoder
							// always starts with first frame as a keyframe)
							strmCoderOutp.close();
 
							retVal = strmCoderOutp.open();
							if (retVal < 0) {
								throw new RuntimeException("Couldn't open output encoder for stream: " + this.inputStreamName);
							}
						}
					}
				}
			}
		}
 
		return true;
	}
}
Personal tools