/*
* SteamStream - Red5 stream transcoding application
* Copyright (c) 2009 by Jeremy Morton - All rights reserved.
*
* This application is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 2.1 of the License, or (at your option) any later version.
* Text of this license version 2.1 can be found at the following web address:
* http://www.gnu.org/licenses/lgpl-2.1.txt
*/
package steamStream;
import java.util.Hashtable;
import java.util.concurrent.locks.ReentrantLock;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.adapter.MultiThreadedApplicationAdapter;
import org.red5.server.api.IConnection;
import org.red5.server.api.IScope;
import org.red5.server.api.Red5;
import org.red5.server.api.stream.IPlayItem;
import org.red5.server.api.stream.IPlaylistSubscriberStream;
import org.red5.server.api.stream.ISubscriberStream;
import org.slf4j.Logger;
import com.xuggle.xuggler.ICodec;
import com.xuggle.xuggler.SimpleMediaFile;
public class Application extends MultiThreadedApplicationAdapter {
// SteamStream Red5 application. Is able to take incoming RTP video streams in
// MPEG4 format, demux and transcode them on-the-fly, and output them to a Red5
// IBroadcastStream as RTMP data in Sorenson Spark video format for reading by an
// incoming Flash client.
private final String version = "v1.2";
private IScope appScope;
private static Logger log = Red5LoggerFactory.getLogger(Application.class, "steamStream");
@Override
public boolean appStart(IScope app) {
log.info("SteamStream " + version + " started.");
// Record current scope
appScope = app;
// Init our attributes to be thread-shared
appScope.setAttribute("streamSubCount", new Hashtable<String, Integer>());
appScope.setAttribute("streamSubCountLock", new ReentrantLock());
// Register our stream publish/playback security classes
registerStreamPublishSecurity(new SsPublishSecurity());
registerStreamPlaybackSecurity(new SsPlaybackSecurity());
return true;
}
@Override
public void streamSubscriberStart(ISubscriberStream stream) {
log.info("streamSubscriberStart() called... stream: {}", stream.getName());
super.streamSubscriberStart(stream);
}
@SuppressWarnings("unchecked")
@Override
public void streamPlaylistItemPlay(IPlaylistSubscriberStream stream, IPlayItem item, boolean isLive) {
// NOTE: Red5 dynamically creates a BroadcastStream for the appScope if/when a Flash client requests
// a stream using Netstream.play(). By the time this notification method is called, the stream WILL
// exist even if it didn't before, as it will have been created (after passing our security checks).
// We just need to decide whether to pump data out to it with a new thread or not.
IConnection conn = Red5.getConnectionLocal();
String itemName = item.getName();
Object[] logParams = {stream.getName(), itemName, (isLive ? "Yes" : "No")};
log.info("streamPlaylistItemPlay() called... stream: {} | item: {} | isLive?: {}", logParams);
Hashtable<String, Integer> streamSubCount = ((Hashtable<String, Integer>)appScope.getAttribute("streamSubCount"));
ReentrantLock streamSubCountLock = ((ReentrantLock)appScope.getAttribute("streamSubCountLock"));
streamSubCountLock.lock();
try {
if (!streamSubCount.containsKey(itemName)) {
streamSubCount.put(itemName, 0);
}
int streamCount = streamSubCount.get(itemName);
if (streamCount == 0) {
log.info("Stream subscriber count is zero; starting a new ReStreamer. Stream: " + itemName);
// Start pushing a broadcast to this PlayItem; assume we can find a corresponding SDP file in the SDPs
// dir (even if not, ReStreamer will deal with the missing file by failing gracefully).
ReStreamer rs = new ReStreamer(this.appScope, itemName, buildBroadcastStreamInfo());
// ReStreamer initialized; start the restreaming
Thread t = new Thread(rs);
t.start();
}
// Increment record of subscribers to this stream
log.info("Client connecting; incrementing stream count for stream '" + itemName + "' from " + streamCount + " to " + (streamCount+1) + ".");
streamSubCount.put(itemName, streamCount + 1);
// Record that we've actually started playing the stream for this client
conn.setAttribute("startedPlayingStream", true);
}
finally {
streamSubCountLock.unlock();
}
super.streamPlaylistItemPlay(stream, item, isLive);
}
@SuppressWarnings("unchecked")
@Override
public void streamSubscriberClose(ISubscriberStream stream) {
String streamName;
int streamCount;
IConnection conn = Red5.getConnectionLocal();
// We can assume that the connection is still alive until this notification method has been called by
// virtue of its being a TCP connection, which guarantees an alive session and times out if connection
// is lost. We record that one less client is subscribing to this stream.
Hashtable<String, Integer> streamSubCount = ((Hashtable<String, Integer>)appScope.getAttribute("streamSubCount"));
ReentrantLock streamSubCountLock = ((ReentrantLock)appScope.getAttribute("streamSubCountLock"));
streamSubCountLock.lock();
try {
streamName = ((String)conn.getAttribute("connectedToStream"));
if (!streamSubCount.containsKey(streamName)) {
streamSubCount.put(streamName, 0);
}
streamCount = streamSubCount.get(streamName);
if (
conn.hasAttribute("startedPlayingStream") &&
((Boolean)conn.getAttribute("startedPlayingStream")) == true
) {
log.info("Client disconnecting; decrementing stream count for stream '" + streamName + "' from " + streamCount + " to " + (streamCount-1) + ".");
// No longer playing the stream for this client
conn.removeAttribute("startedPlayingStream");
// Decrement record of subscribers to this stream
streamSubCount.put(streamName, streamCount - 1);
}
else {
log.info("Client disconnecting; never started playing subscribed stream: " + streamName);
}
}
finally {
streamSubCountLock.unlock();
}
// The one stream that we allowed the client to be connected to has been disconnected from... remove our
// corresponding attribute.
conn.removeAttribute("connectedToStream");
super.streamSubscriberClose(stream);
}
/**
* Constructs a SimpleMediaFile, which can be used to describe broadcast stream info, with some default
* parameters, such as no audio (video-only), width, height, and various other video properties.
*/
private SimpleMediaFile buildBroadcastStreamInfo() {
SimpleMediaFile smf = new SimpleMediaFile();
smf.setHasAudio(false);
smf.setHasVideo(true);
// Unfortunately the ReStreamer needs to know the width and height you want to output as, even if
// you don't know yet.
smf.setVideoWidth(320);
smf.setVideoHeight(240);
smf.setVideoCodec(ICodec.ID.CODEC_ID_FLV1);
return smf;
}
// *** RPCs ***
// Returns the last error string resulting from this client connection being denied playback of a stream;
// returns an empty string if no error was found.
public String getLastPlaybackAllowedError() {
IConnection conn = Red5.getConnectionLocal();
if (conn.hasAttribute("lastPlaybackAllowedError")) {
return (String)conn.getAttribute("lastPlaybackAllowedError");
}
else {
return "";
}
}
// RPC for testing!
public double add(double a, double b) {
log.info("add() RPC called.");
return a + b;
}
}