MPEGTS over HTTP Inbound Module

There are now a significant number of sources available for video rtmp, hls, multicast etc. As content providers move from closed networks to more open ‘Internet’ based delivery many are providing MPEGTS over HTTP delivery, this type of delivery can not normally be ingested into Wowza.

The following module provides a simple example, one stream per application, to ingest and process the stream back into a MPEGTS over UDP thus making it avaialble for Wowza to ingest.

The module is broken into 3 seperate classes, a logger, a thread and an application module.

The logger class is as below. It is a simple wrapper to the WMSLogger provided by Wowza. This prefixes the package name to any output from this module.

package  guru.thewowza.module.mpegtshttp.input;
import com.wowza.wms.logging.WMSLogger;
public class OurLogOutput 
	{
	static private WMSLogger Logger = null;
	public OurLogOutput(WMSLogger Log)
		{
		Logger = Log;
		}
	public void LogThisMessage(String Message)
		{
		Logger.info("guru.thewowza.module.mpegtshttp.input: '"+Message+"'");	
		}
	}

The thread class is as below. It is a simple thread which attempts to connect to the URL provided, takes the output and splits it into 188 byte UDP packets. It does NO checking to see if the HTTP connection is valid MPEGTS, but this could be added.

package guru.thewowza.module.mpegtshttp.input;
import java.io.InputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLConnection;
import com.wowza.wms.application.*;
public class MPEGTSIWorker extends Thread
{
	private boolean running = true;
	private boolean quit = false;
	private int sleepTime = 4000;
	private IApplicationInstance appInstance = null;
	private OurLogOutput Log = null;
	private String SourceURL = null;
	private int SourceTimeout = 2000;
	private int OutPort = 0;

	private byte[] buffer = new byte[188];

	public MPEGTSIWorker( IApplicationInstance thisins, OurLogOutput log )
		{
		this.appInstance = thisins;
		this.Log = log;
		}
	
	
	public void setSourceURL( String sourceURL )
		{
		this.SourceURL = sourceURL;
		}

	public void setOutPort(int port)
		{
		this.OutPort = port;
		}
	
	public void setSourceTimeout ( int sourceTimeout)
		{
		this.SourceTimeout = sourceTimeout;
		}
	
	public synchronized boolean running()
		{
		return this.running;
		}
	
	public synchronized void quit()
		{
		this.quit = true;
		notify();
		}
	
	public synchronized void setSleepTime(int sleepTime)
		{
		this.sleepTime = sleepTime;
		}
	
	public void run()
		{
		this.Log.LogThisMessage("Thread MPEGTS Input Starting - "+
			appInstance.getApplication().getName() + "/"+ appInstance.getName());
		while (true)
			{	
			  try { 
				  URL myUrl = new URL( this.SourceURL );
				  URLConnection conn = myUrl.openConnection();
				  conn.setConnectTimeout(this.SourceTimeout);
				  conn.setReadTimeout(this.SourceTimeout);
				  InetAddress address = InetAddress.getByName("127.0.0.1");
				  DatagramSocket dsocket = new DatagramSocket();
				  InputStream is = conn.getInputStream();
				  DatagramPacket packet = new DatagramPacket(this.buffer, this.buffer.length, address, this.OutPort);
				  int amount;
				  int currentRead = 0 ;
				  while ( ( amount = is.read(this.buffer, currentRead, 188-currentRead)) > -1 )
				  	{  
					currentRead +=amount;
					if ( currentRead == 188 )
						{
						packet.setData(this.buffer);
						dsocket.send(packet);
						currentRead =0 ;
					  	}
					  
					if ( this.quit == true )
						{
						this.running = false;
						break;
						}
				  	}
				  dsocket.close();
				  is.close();
				  this.Log.LogThisMessage("Remote side closed connection - "+appInstance.getApplication().getName() + 
					"/"+ appInstance.getName());
			  	} 
			  	catch (Exception e) 
			  		{ 
			  		this.Log.LogThisMessage("Exception for application "+appInstance.getApplication().getName() + 
					"/"+ appInstance.getName()+"  exception is "+e.toString());
			  		}	  
			
			  
			synchronized(this)
				{
				try
					{
					Thread.currentThread().wait(this.sleepTime);
					}
					catch (Exception e)
					{
					this.Log.LogThisMessage("Thread MPEGTS Input wait problem: "+e.toString());
					}
				}

			synchronized(this)
				{
				if (this.quit)
					{
					this.running = false;
					break;
					}
				}	
			}
		}
}

For the two classes to work an Application module class is required to call them. It is shown below

package guru.thewowza.module.mpegtshttp.input;
import com.wowza.wms.application.*;
import com.wowza.wms.logging.WMSLoggerFactory;
import com.wowza.wms.module.*;
public class Reader extends ModuleBase 
	{
	private MPEGTSIWorker Worker = null;
	private OurLogOutput Log = null;
	private String URLString = "";
	private int URLPort = 10000;
	private int reconnectTime = 20000;

	public void onAppStart(IApplicationInstance appInstance) 
		{	
		this.Log = new OurLogOutput(WMSLoggerFactory.getLogger(null));
		this.Log.LogThisMessage("onAppStart: " + appInstance.getApplication().getName() + 
			"/"+ appInstance.getName() );
		this.URLString = appInstance.getProperties().getPropertyStr("mpegtsHTTPURL", "NONE");
		this.URLPort = appInstance.getProperties().getPropertyInt("mpegtsOUTPORT", 10000);
		this.reconnectTime= appInstance.getProperties().getPropertyInt("mpegtsReconnectTime", 20000);

		this.Log.LogThisMessage("URL Source is "+this.URLString);
		this.Log.LogThisMessage("Port To Output is "+this.URLPort);

		if ( !this.URLString.equalsIgnoreCase("NONE") && this.URLPort >0 )
			{
			this.Worker = new MPEGTSIWorker(appInstance,this.Log);
			this.Worker.setSourceURL(this.URLString);
			this.Worker.setOutPort(this.URLPort);
			this.Worker.setDaemon(true);
			this.Worker.setSleepTime(this.reconnectTime);
			this.Worker.start();
			}
	}

	public void onAppStop(IApplicationInstance appInstance) 
		{		
		this.Log.LogThisMessage("onAppStop: " + appInstance.getApplication().getName() + 
			"/"+ appInstance.getName() );
		  if (this.Worker != null)
			{
			this.Worker.quit();
			}
		}
}

The module code uses three properties, which are detailed below, it sets up the thread class and sets the reconnect time to 20 seconds (default) should the URL stop providing data.

For this code to work effectively it needs to be applied into a Wowza Module but also a Property needs to be added to the Application.xml.

To apply this to an Application add the following section as the LAST Module in the section near the end of the Application.xml associated with the application. It should look like the following

	<Module>
		<Name>MPEGTSHTTPInbound</Name>
		<Description>MPEGTS over HTTP Inbound</Description>
		<Class>guru.thewowza.module.mpegtshttp.input.Reader</Class>
	</Module>

The three properties that are required are

<Property>
<Name>mpegtsHTTPURL</Name>
<Value>http://myMPEGTSoverHTTLURL</Value>
<Type>String</Type>
</Property>

<Property>
<Name>mpegtsOUTPORT</Name>
<Value>10000</Value>
<Type>Integer</Type>
</Property>

<Property>
<Name>mpegtsReconnectTime</Name>
<Value>20000</Value>
<Type>Integer</Type>
</Property>

When you have the module correctly configured and running then the MPEGTS over HTTP source is converted to MPEGTS over UDP and packets are sent to 127.0.0.1 and the port defined in the mpegtsOUTPORT property ( defaults to 10000). This data can then be ingested as with any other MPEGTS stream using a .stream file.


Comments are closed.