SteamStream!WEB-INF!src!steamStream!Application!!java

From Red5Wiki

Jump to: navigation, search
/*
 * SteamStream - Red5 stream transcoding application
 * Copyright (c) 2009 by Jeremy Morton - All rights reserved.
 * 
 * This application is free software; you can redistribute it and/or modify it under the
 * terms of the GNU Lesser General Public License as published by the Free Software
 * Foundation; either version 2.1 of the License, or (at your option) any later version.
 * Text of this license version 2.1 can be found at the following web address:
 * http://www.gnu.org/licenses/lgpl-2.1.txt
 */
package steamStream;
 
import java.util.Hashtable;
import java.util.concurrent.locks.ReentrantLock;
 
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.adapter.MultiThreadedApplicationAdapter;
import org.red5.server.api.IConnection;
import org.red5.server.api.IScope;
import org.red5.server.api.Red5;
import org.red5.server.api.stream.IPlayItem;
import org.red5.server.api.stream.IPlaylistSubscriberStream;
import org.red5.server.api.stream.ISubscriberStream;
import org.slf4j.Logger;
import com.xuggle.xuggler.ICodec;
import com.xuggle.xuggler.SimpleMediaFile;
 
public class Application extends MultiThreadedApplicationAdapter {
	// SteamStream Red5 application.  Is able to take incoming RTP video streams in
	// MPEG4 format, demux and transcode them on-the-fly, and output them to a Red5
	// IBroadcastStream as RTMP data in Sorenson Spark video format for reading by an
	// incoming Flash client.
 
	private final String version = "v1.2";
	private IScope appScope;
	private static Logger log = Red5LoggerFactory.getLogger(Application.class, "steamStream");
 
	@Override
	public boolean appStart(IScope app) {
		log.info("SteamStream " + version + " started.");
 
		// Record current scope
		appScope = app;
 
		// Init our attributes to be thread-shared
		appScope.setAttribute("streamSubCount", new Hashtable<String, Integer>());
		appScope.setAttribute("streamSubCountLock", new ReentrantLock());
 
		// Register our stream publish/playback security classes
		registerStreamPublishSecurity(new SsPublishSecurity());
		registerStreamPlaybackSecurity(new SsPlaybackSecurity());
 
		return true;
	}
 
	@Override
	public void streamSubscriberStart(ISubscriberStream stream) {
		log.info("streamSubscriberStart() called... stream: {}", stream.getName());
 
		super.streamSubscriberStart(stream);
	}
 
	@SuppressWarnings("unchecked")
	@Override
	public void streamPlaylistItemPlay(IPlaylistSubscriberStream stream, IPlayItem item, boolean isLive) {
		// NOTE: Red5 dynamically creates a BroadcastStream for the appScope if/when a Flash client requests
		// a stream using Netstream.play().  By the time this notification method is called, the stream WILL
		// exist even if it didn't before, as it will have been created (after passing our security checks).
		// We just need to decide whether to pump data out to it with a new thread or not.
 
		IConnection conn = Red5.getConnectionLocal();
 
		String itemName = item.getName();
 
		Object[] logParams = {stream.getName(), itemName, (isLive ? "Yes" : "No")};
		log.info("streamPlaylistItemPlay() called... stream: {} | item: {} | isLive?: {}", logParams);
 
		Hashtable<String, Integer> streamSubCount = ((Hashtable<String, Integer>)appScope.getAttribute("streamSubCount"));
		ReentrantLock streamSubCountLock = ((ReentrantLock)appScope.getAttribute("streamSubCountLock"));
		streamSubCountLock.lock();
		try {
			if (!streamSubCount.containsKey(itemName)) {
				streamSubCount.put(itemName, 0);
			}
 
			int streamCount = streamSubCount.get(itemName);
 
			if (streamCount == 0) {
				log.info("Stream subscriber count is zero; starting a new ReStreamer.  Stream: " + itemName);
 
				// Start pushing a broadcast to this PlayItem; assume we can find a corresponding SDP file in the SDPs
				// dir (even if not, ReStreamer will deal with the missing file by failing gracefully).
				ReStreamer rs = new ReStreamer(this.appScope, itemName, buildBroadcastStreamInfo());
 
				// ReStreamer initialized; start the restreaming
				Thread t = new Thread(rs);
				t.start();
			}
 
			// Increment record of subscribers to this stream
			log.info("Client connecting; incrementing stream count for stream '" + itemName + "' from " + streamCount + " to " + (streamCount+1) + ".");
			streamSubCount.put(itemName, streamCount + 1);
 
			// Record that we've actually started playing the stream for this client
			conn.setAttribute("startedPlayingStream", true);
		}
		finally {
			streamSubCountLock.unlock();
		}
 
		super.streamPlaylistItemPlay(stream, item, isLive);
	}
 
	@SuppressWarnings("unchecked")
	@Override
	public void streamSubscriberClose(ISubscriberStream stream) {
		String streamName;
		int streamCount;
 
		IConnection conn = Red5.getConnectionLocal();
 
		// We can assume that the connection is still alive until this notification method has been called by
		// virtue of its being a TCP connection, which guarantees an alive session and times out if connection
		// is lost.  We record that one less client is subscribing to this stream.
		Hashtable<String, Integer> streamSubCount = ((Hashtable<String, Integer>)appScope.getAttribute("streamSubCount"));
		ReentrantLock streamSubCountLock = ((ReentrantLock)appScope.getAttribute("streamSubCountLock"));
		streamSubCountLock.lock();
		try {
			streamName = ((String)conn.getAttribute("connectedToStream"));
			if (!streamSubCount.containsKey(streamName)) {
				streamSubCount.put(streamName, 0);
			}
			streamCount = streamSubCount.get(streamName);
 
			if (
				conn.hasAttribute("startedPlayingStream") &&
				((Boolean)conn.getAttribute("startedPlayingStream")) == true
			) {
				log.info("Client disconnecting; decrementing stream count for stream '" + streamName + "' from " + streamCount + " to " + (streamCount-1) + ".");
 
				// No longer playing the stream for this client
				conn.removeAttribute("startedPlayingStream");
 
				// Decrement record of subscribers to this stream
				streamSubCount.put(streamName, streamCount - 1);
			}
			else {
				log.info("Client disconnecting; never started playing subscribed stream: " + streamName);
			}
		}
		finally {
			streamSubCountLock.unlock();
		}
 
		// The one stream that we allowed the client to be connected to has been disconnected from... remove our
		// corresponding attribute.
		conn.removeAttribute("connectedToStream");
 
		super.streamSubscriberClose(stream);
	}
 
	/**
	 * Constructs a SimpleMediaFile, which can be used to describe broadcast stream info, with some default
	 * parameters, such as no audio (video-only), width, height, and various other video properties.
	 */
	private SimpleMediaFile buildBroadcastStreamInfo() {
		SimpleMediaFile smf = new SimpleMediaFile();
 
		smf.setHasAudio(false);
		smf.setHasVideo(true);
 
		// Unfortunately the ReStreamer needs to know the width and height you want to output as, even if
		// you don't know yet.
		smf.setVideoWidth(320);
		smf.setVideoHeight(240);
 
		smf.setVideoCodec(ICodec.ID.CODEC_ID_FLV1);
 
		return smf;
	}
 
 
 
 
 
 
 
 
 
 
	// *** RPCs ***
	// Returns the last error string resulting from this client connection being denied playback of a stream;
	// returns an empty string if no error was found.
	public String getLastPlaybackAllowedError() {
		IConnection conn = Red5.getConnectionLocal();
 
		if (conn.hasAttribute("lastPlaybackAllowedError")) {
			return (String)conn.getAttribute("lastPlaybackAllowedError");
		}
		else {
			return "";
		}
	}
 
	// RPC for testing!
	public double add(double a, double b) {
		log.info("add() RPC called.");
 
		return a + b;
	}
}
Personal tools