I’ve developed a distributed ffmpeg google app engine solution.

The solution uses a combination of publish subscribe and redis queues for distributed communication.
https://github.com/dmzoneill/appengine-ffmpeg

Its composed of 2 services which scale horizontally (default and worker).

Coordinator (default service and human interface)

#!/usr/bin/python
from flask import Flask
import os
import sys
import glob
import string
import random
import redis
import logging
from gcloud import storage, pubsub
from google.cloud import logging

PROJECT_ID = 'transcode-159215'
TOPIC = 'projects/{}/topics/message'.format(PROJECT_ID)

logclient = logging.Client()
logger = logclient.logger( "ffmpeg-pool" )

app = Flask(__name__)
app.config[ "SECRET_KEY" ] = "test"
app.debug = True


def publish( msg ):
    pubsub_client = pubsub.Client( PROJECT_ID )
    topic = pubsub_client.topic( "ffmpeg-pool" )

    if not topic.exists():
        topic.create()

    topic.publish( msg )


@app.route( "/readlog" )
def readLog():
    msg = ""

    try: 
	for entry in logger.list_entries():
            msg = msg + entry.payload + "
" logger.delete() except: msg = "" return msg @app.route( "/cleantopic" ) def cleanTopics(): client = pubsub.Client( PROJECT_ID ) topic = client.topic( "ffmpeg-pool" ) topic.delete() topic.create() return "Cleaned topic" @app.route( "/split" ) def split(): publish( "split" ) return "File queued for spliting" @app.route( "/transcode" ) def transcode(): publish( "transcode" ) return "Job queued for transcoding" @app.route( "/combine" ) def combine(): publish( "combine" ) return "Job queued for combining" @app.route( "/" ) def home(): return "/split | /transcode | /combine | /cleantopic | /readlog" if __name__ == '__main__': app.run(host='127.0.0.1', port=8080, debug=True)

Worker

import os
from gcloud import storage, pubsub, logging
import sys
import socket
import time
import redis
import glob
from google.cloud import logging


logclient = logging.Client()
logger = logclient.logger( "ffmpeg-pool" )

PROJECT_ID = 'transcode-159215'
TOPIC = 'projects/{}/topics/message'.format(PROJECT_ID)
psclient = None
pstopic = None
pssub = None

class RedisQueue(object):
    def __init__( self, name, namespace = 'queue' ):
       self.__db = redis.Redis( host = "redis-11670.c10.us-east-1-4.ec2.cloud.redislabs.com", port=11670 )
       self.key = '%s:%s' %(namespace, name)

    def qsize( self ):
        return self.__db.llen( self.key )

    def empty( self ):
        return self.qsize() == 0

    def put( self, item ):
        self.__db.rpush( self.key, item )

    def get( self, block=True, timeout=None ):
        if block:
            item = self.__db.blpop( self.key, timeout=timeout )
        else:
            item = self.__db.lpop( self.key )

        if item:
            item = item[1]
        return item

    def get_nowait( self ):
        return self.get( False )


def download( rfile ):
    client = storage.Client( PROJECT_ID )
    bucket = client.bucket( PROJECT_ID + ".appspot.com" )
    blob = bucket.blob( rfile )

    with open( "/tmp/" + rfile, 'w' ) as f:
        blob.download_to_file( f )
        logger.log_text( "Worker: Downloaded: /tmp/" + rfile )


def upload( rfile ):
    client = storage.Client( PROJECT_ID )
    bucket = client.bucket( PROJECT_ID + ".appspot.com" )
    blob = bucket.blob( rfile )
    
    blob = bucket.blob( rfile )
    blob.upload_from_file( open( "/tmp/" + rfile ) )

    logger.log_text( "Worker: Uploaded /tmp/" + rfile )


def transcode( rfile ):
    download( rfile )
    
    os.system( "rm /tmp/output*" )
    ret = os.system( "ffmpeg -i /tmp/" + rfile + " -c:v libx265 -preset medium -crf 28 -c:a aac -b:a 128k -strict -2 /tmp/output-" + rfile + ".mkv" )    
    
    if ret:
        logger.log_text( "Worker: convert failed : " + rfile + " - " + str( ret ).encode( 'utf-8' ) )
        return

    upload( "output-" + rfile + ".mkv" ) 


def split():
    rqueue = RedisQueue( "test" )
    download( "sample.mp4" )

    os.system( "rm -f /tmp/chunk*" )
    ret = os.system( "ffmpeg -i /tmp/sample.mp4 -map 0:a -map 0:v -codec copy -f segment -segment_time 10 -segment_format matroska -v error '/tmp/chunk-%03d.orig'" )

    if ret:
        return "Failed"

    for rfile in glob.glob( "/tmp/chunk*" ):
        basename = os.path.basename( rfile )
        upload( basename )
        rqueue.put( basename )


def combine():
    client = storage.Client( PROJECT_ID )
    bucket = client.bucket( PROJECT_ID + ".appspot.com" )
    blobs = bucket.list_blobs()

    os.system( "rm /tmp/*" )
    
    names = []
    
    for blob in blobs:
        if "output" in blob.name:
            names.append( blob.name.encode( 'utf-8' ) )

    names.sort()

    with open( '/tmp/combine.lst', 'w' ) as f1:
        for name in names:
            f1.write( "file '/tmp/" + name + "'\n" )
            download( name )

    logger.log_text( "Worker: created combine list: /tmp/combine.lst" )

    ret = os.system( "ffmpeg -f concat -safe 0 -i  /tmp/combine.lst -c copy /tmp/combined.mkv" )    
    
    if ret:
        logger.log_text( "Worker: combine failed: /tmp/combine.mkv - " + str(ret).encode( 'utf-8' ) )
        return

    upload( "combined.mkv" )


def subscribe():
    global psclient, pstopic, pssub

    psclient = pubsub.Client( PROJECT_ID )
    pstopic = psclient.topic( "ffmpeg-pool" )

    if not pstopic.exists():
        pstopic.create()
    
    pssub = pstopic.subscription( "ffmpeg-worker-" + socket.gethostname() )
    
    if not pssub.exists():
        pssub.create()
    

def handlemessages():
    global psclient, pstopic, pssub
    
    rqueue = RedisQueue( 'test' )
    subscribe()

    while True:
        messages = pssub.pull( return_immediately=False, max_messages=110 )

        for ack_id, message in messages:
            payload = message.data.encode( 'utf-8' ).replace( u"\u2018", "'" ).replace( u"\u2019", "'" )
            logger.log_text( "Worker: Received message: " + payload )
 
            try:
                pssub.acknowledge( [ack_id] )
                if payload == "combine":
                    combine()
                elif payload  == "split":
                    split()
                else:
                    rfile = rqueue.get()
                    basename = os.path.basename( rfile )
                    logger.log_text( "Worker: Redis popped: " + basename )

                    while basename != "None":
                        transcode( basename )
                        rfile = rqueue.get()
                        basename = os.path.basename( rfile )
                        logger.log_text( "Worker: Redis popped: " + rfile )

            except Exception as e:
                logger.log_text( "Worker: Error: " + e.message )
                sys.stderr.write( e.message )

        subscribe()
        time.sleep( 1 )


if __name__ == '__main__':
    handlemessages()