From 1a5bb7f3dfe4ea4bdf05f88dc508638681f36059 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20K=C3=B6rner?= Date: Wed, 24 May 2017 21:45:40 +0200 Subject: [PATCH] rewrite apple motion renderer to use the queing api --- make-apple-motion.py | 171 ++++++++++++++++++++++++++++++------------- 1 file changed, 122 insertions(+), 49 deletions(-) diff --git a/make-apple-motion.py b/make-apple-motion.py index e900c14..917c97f 100755 --- a/make-apple-motion.py +++ b/make-apple-motion.py @@ -8,6 +8,7 @@ import shlex import time import sys import os +import re from xml.sax.saxutils import escape as xmlescape @@ -71,70 +72,142 @@ if args.debug: else: events = list(renderlib.events(args.schedule)) -def run_check(command, **kwargs): +def describe_event(event): + return "#{}: {}".format(event['id'], event['title']) + +def event_print(event, message): + print("{} – {}".format(describe_event(event), message)) + +tempdir = tempfile.TemporaryDirectory() +print('working in '+tempdir.name) + + +def fmt_command(command, **kwargs): args = {} for key, value in kwargs.items(): args[key] = shlex.quote(value) command = command.format(**args) - print(" -> "+command) - subprocess.check_call(shlex.split(command)) + return shlex.split(command) -def render(event): - with tempfile.TemporaryDirectory() as tempdir: - work_doc = os.path.join(tempdir, "work.motn") - intermediate_clip = os.path.join(tempdir, "intermediate.mov") - final_clip = os.path.join(os.path.dirname(args.motn), str(event['id'])+'.ts') +def run(command, **kwargs): + return subprocess.check_call( + fmt_command(command, **kwargs)) - with open(args.motn, 'r') as fp: - xmlstr = fp.read() - - for key, value in event.items(): - xmlstr = xmlstr.replace("$"+str(key), xmlescape(str(value))) - - with open(work_doc, 'w') as fp: - fp.write(xmlstr) - - print(" generated work-document in " + work_doc + ", now starting compressor") - run_check( - '/Applications/Compressor.app/Contents/MacOS/Compressor -jobpath "{jobpath}" -settingpath {home}/Library/Application\ Support/Compressor/Settings/Apple\ ProRes\ 4444.cmprstng -locationpath "{locationpath}"', - jobpath=work_doc, - home=os.getenv('HOME'), - locationpath=intermediate_clip) - - while True: - time.sleep(5) - ps = subprocess.check_output(shlex.split('ps aux')).decode('utf-8') - - pscnt = ps.count('compressord') - filexists = os.path.isfile(intermediate_clip) - if pscnt == 0 and filexists: - break - - print(" "+str(pscnt)+" Compressor.app-processes running, filexists? "+str(filexists)) +def run_output(command, **kwargs): + return subprocess.check_output( + fmt_command(command, **kwargs), + encoding='utf-8', + stderr=subprocess.STDOUT) - print(" generated intermediate-clip in " + intermediate_clip + ", now starting transcoder") - run_check( - 'ffmpeg -y -i "{input}" -ar 48000 -ac 1 -f s16le -i /dev/zero -map 0:0 -c:v mpeg2video -q:v 0 -aspect 16:9 -map 1:0 -map 1:0 -map 1:0 -map 1:0 -shortest -f mpegts "{output}"', - input=intermediate_clip, - output=final_clip) +def enqueue_job(event): + event_id = str(event['id']) + work_doc = os.path.join(tempdir.name, event_id+'.motn') + intermediate_clip = os.path.join(tempdir.name, event_id+'.mov') - print(" transcoded final-clip to " + final_clip) + with open(args.motn, 'r') as fp: + xmlstr = fp.read() + + for key, value in event.items(): + xmlstr = xmlstr.replace("$"+str(key), xmlescape(str(value))) + + with open(work_doc, 'w') as fp: + fp.write(xmlstr) + + compressor_info = run_output( + '/Applications/Compressor.app/Contents/MacOS/Compressor -batchname {batchname} -jobpath {jobpath} -settingpath {home}/Library/Application\ Support/Compressor/Settings/Apple\ ProRes\ 4444.cmprstng -locationpath {locationpath}', + batchname=describe_event(event), + jobpath=work_doc, + home=os.getenv('HOME'), + locationpath=intermediate_clip) + + match = re.search("", compressor_info) + if not match: + event_print(event, "unexpected output from compressor: \n"+compressor_info) + return + + return match.group(1) + +def fetch_job_status(): + compressor_status = run_output('/Applications/Compressor.app/Contents/MacOS/Compressor -monitor') + job_status_matches = re.finditer("", compressor_status) + + status_dict = {} + for match in job_status_matches: + lexer = shlex.shlex(match.group(1), posix=True) + lexer.wordchars += "=" + + job_status = dict(word.split("=", maxsplit=1) for word in lexer) + job_id = job_status['jobid'] + status_dict[job_id] = job_status + + return status_dict -n = len(events) -i = 0 + +def filter_finished_jobs(active_jobs): + job_status = fetch_job_status() + + new_active_jobs = [] + finished_jobs = [] + for job_id, event in active_jobs: + if job_id not in job_status: + status = 'Processing' + else: + status = job_status[job_id]['status'] + + if status == 'Processing': + new_active_jobs.append((job_id, event)) + continue + elif status == 'Successful': + finished_jobs.append((job_id, event)) + else: + event_print(event, "failed with staus="+status+" – removing from postprocessing queue") + + return new_active_jobs, finished_jobs + + +def finalize_job(job_id, event): + event_id = str(event['id']) + intermediate_clip = os.path.join(tempdir.name, event_id+'.mov') + final_clip = os.path.join(os.path.dirname(args.motn), event_id+'.ts') + + run('ffmpeg -y -hide_banner -loglevel error -i "{input}" -ar 48000 -ac 1 -f s16le -i /dev/zero -map 0:0 -c:v mpeg2video -q:v 0 -aspect 16:9 -map 1:0 -map 1:0 -map 1:0 -map 1:0 -shortest -f mpegts "{output}"', + input=intermediate_clip, + output=final_clip) + + event_print(event, "finalized intro to "+final_clip) + + + +active_jobs = [] + +print("enqueuing jobs into compressor") for event in events: - i = i + 1 if args.ids and event['id'] not in args.ids: continue - headline("rendering {i}/{n}: #{id}: {title}".format( - i=i, n=n, - id=event['id'], - title=event['title'])) + job_id = enqueue_job(event) + if not job_id: + event_print(event, "job was not enqueued successfully, skipping postprocessing") + continue - render(event) - print() + event_print(event, "enqueued as "+job_id) + active_jobs.append((job_id, event)) + +print("waiting for rendering to complete") + +while len(active_jobs) > 0: + time.sleep(60) + active_jobs, finished_jobs = filter_finished_jobs(active_jobs) + + print("{} jobs in queue, {} ready to finalize".format(len(active_jobs), len(finished_jobs))) + for job_id, event in finished_jobs: + event_print(event, "finalizing job") + finalize_job(job_id, event) + + +print('all done, cleaning up '+tempdir.name) +tempdir.cleanup()