rewrite apple motion renderer to use the queing api
This commit is contained in:
parent
4d92bd486a
commit
1a5bb7f3df
1 changed files with 122 additions and 49 deletions
|
@ -8,6 +8,7 @@ import shlex
|
|||
import time
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
|
||||
from xml.sax.saxutils import escape as xmlescape
|
||||
|
||||
|
@ -71,70 +72,142 @@ if args.debug:
|
|||
else:
|
||||
events = list(renderlib.events(args.schedule))
|
||||
|
||||
def run_check(command, **kwargs):
|
||||
def describe_event(event):
|
||||
return "#{}: {}".format(event['id'], event['title'])
|
||||
|
||||
def event_print(event, message):
|
||||
print("{} – {}".format(describe_event(event), message))
|
||||
|
||||
tempdir = tempfile.TemporaryDirectory()
|
||||
print('working in '+tempdir.name)
|
||||
|
||||
|
||||
def fmt_command(command, **kwargs):
|
||||
args = {}
|
||||
for key, value in kwargs.items():
|
||||
args[key] = shlex.quote(value)
|
||||
|
||||
command = command.format(**args)
|
||||
print(" -> "+command)
|
||||
subprocess.check_call(shlex.split(command))
|
||||
return shlex.split(command)
|
||||
|
||||
def render(event):
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
work_doc = os.path.join(tempdir, "work.motn")
|
||||
intermediate_clip = os.path.join(tempdir, "intermediate.mov")
|
||||
final_clip = os.path.join(os.path.dirname(args.motn), str(event['id'])+'.ts')
|
||||
def run(command, **kwargs):
|
||||
return subprocess.check_call(
|
||||
fmt_command(command, **kwargs))
|
||||
|
||||
with open(args.motn, 'r') as fp:
|
||||
xmlstr = fp.read()
|
||||
|
||||
for key, value in event.items():
|
||||
xmlstr = xmlstr.replace("$"+str(key), xmlescape(str(value)))
|
||||
|
||||
with open(work_doc, 'w') as fp:
|
||||
fp.write(xmlstr)
|
||||
|
||||
print(" generated work-document in " + work_doc + ", now starting compressor")
|
||||
run_check(
|
||||
'/Applications/Compressor.app/Contents/MacOS/Compressor -jobpath "{jobpath}" -settingpath {home}/Library/Application\ Support/Compressor/Settings/Apple\ ProRes\ 4444.cmprstng -locationpath "{locationpath}"',
|
||||
jobpath=work_doc,
|
||||
home=os.getenv('HOME'),
|
||||
locationpath=intermediate_clip)
|
||||
|
||||
while True:
|
||||
time.sleep(5)
|
||||
ps = subprocess.check_output(shlex.split('ps aux')).decode('utf-8')
|
||||
|
||||
pscnt = ps.count('compressord')
|
||||
filexists = os.path.isfile(intermediate_clip)
|
||||
if pscnt == 0 and filexists:
|
||||
break
|
||||
|
||||
print(" "+str(pscnt)+" Compressor.app-processes running, filexists? "+str(filexists))
|
||||
def run_output(command, **kwargs):
|
||||
return subprocess.check_output(
|
||||
fmt_command(command, **kwargs),
|
||||
encoding='utf-8',
|
||||
stderr=subprocess.STDOUT)
|
||||
|
||||
|
||||
print(" generated intermediate-clip in " + intermediate_clip + ", now starting transcoder")
|
||||
run_check(
|
||||
'ffmpeg -y -i "{input}" -ar 48000 -ac 1 -f s16le -i /dev/zero -map 0:0 -c:v mpeg2video -q:v 0 -aspect 16:9 -map 1:0 -map 1:0 -map 1:0 -map 1:0 -shortest -f mpegts "{output}"',
|
||||
input=intermediate_clip,
|
||||
output=final_clip)
|
||||
def enqueue_job(event):
|
||||
event_id = str(event['id'])
|
||||
work_doc = os.path.join(tempdir.name, event_id+'.motn')
|
||||
intermediate_clip = os.path.join(tempdir.name, event_id+'.mov')
|
||||
|
||||
print(" transcoded final-clip to " + final_clip)
|
||||
with open(args.motn, 'r') as fp:
|
||||
xmlstr = fp.read()
|
||||
|
||||
for key, value in event.items():
|
||||
xmlstr = xmlstr.replace("$"+str(key), xmlescape(str(value)))
|
||||
|
||||
with open(work_doc, 'w') as fp:
|
||||
fp.write(xmlstr)
|
||||
|
||||
compressor_info = run_output(
|
||||
'/Applications/Compressor.app/Contents/MacOS/Compressor -batchname {batchname} -jobpath {jobpath} -settingpath {home}/Library/Application\ Support/Compressor/Settings/Apple\ ProRes\ 4444.cmprstng -locationpath {locationpath}',
|
||||
batchname=describe_event(event),
|
||||
jobpath=work_doc,
|
||||
home=os.getenv('HOME'),
|
||||
locationpath=intermediate_clip)
|
||||
|
||||
match = re.search("<jobID ([A-Z0-9\-]+) ?\/>", compressor_info)
|
||||
if not match:
|
||||
event_print(event, "unexpected output from compressor: \n"+compressor_info)
|
||||
return
|
||||
|
||||
return match.group(1)
|
||||
|
||||
def fetch_job_status():
|
||||
compressor_status = run_output('/Applications/Compressor.app/Contents/MacOS/Compressor -monitor')
|
||||
job_status_matches = re.finditer("<jobStatus (.*) \/jobStatus>", compressor_status)
|
||||
|
||||
status_dict = {}
|
||||
for match in job_status_matches:
|
||||
lexer = shlex.shlex(match.group(1), posix=True)
|
||||
lexer.wordchars += "="
|
||||
|
||||
job_status = dict(word.split("=", maxsplit=1) for word in lexer)
|
||||
job_id = job_status['jobid']
|
||||
status_dict[job_id] = job_status
|
||||
|
||||
return status_dict
|
||||
|
||||
|
||||
|
||||
n = len(events)
|
||||
i = 0
|
||||
|
||||
def filter_finished_jobs(active_jobs):
|
||||
job_status = fetch_job_status()
|
||||
|
||||
new_active_jobs = []
|
||||
finished_jobs = []
|
||||
for job_id, event in active_jobs:
|
||||
if job_id not in job_status:
|
||||
status = 'Processing'
|
||||
else:
|
||||
status = job_status[job_id]['status']
|
||||
|
||||
if status == 'Processing':
|
||||
new_active_jobs.append((job_id, event))
|
||||
continue
|
||||
elif status == 'Successful':
|
||||
finished_jobs.append((job_id, event))
|
||||
else:
|
||||
event_print(event, "failed with staus="+status+" – removing from postprocessing queue")
|
||||
|
||||
return new_active_jobs, finished_jobs
|
||||
|
||||
|
||||
def finalize_job(job_id, event):
|
||||
event_id = str(event['id'])
|
||||
intermediate_clip = os.path.join(tempdir.name, event_id+'.mov')
|
||||
final_clip = os.path.join(os.path.dirname(args.motn), event_id+'.ts')
|
||||
|
||||
run('ffmpeg -y -hide_banner -loglevel error -i "{input}" -ar 48000 -ac 1 -f s16le -i /dev/zero -map 0:0 -c:v mpeg2video -q:v 0 -aspect 16:9 -map 1:0 -map 1:0 -map 1:0 -map 1:0 -shortest -f mpegts "{output}"',
|
||||
input=intermediate_clip,
|
||||
output=final_clip)
|
||||
|
||||
event_print(event, "finalized intro to "+final_clip)
|
||||
|
||||
|
||||
|
||||
active_jobs = []
|
||||
|
||||
print("enqueuing jobs into compressor")
|
||||
for event in events:
|
||||
i = i + 1
|
||||
if args.ids and event['id'] not in args.ids:
|
||||
continue
|
||||
|
||||
headline("rendering {i}/{n}: #{id}: {title}".format(
|
||||
i=i, n=n,
|
||||
id=event['id'],
|
||||
title=event['title']))
|
||||
job_id = enqueue_job(event)
|
||||
if not job_id:
|
||||
event_print(event, "job was not enqueued successfully, skipping postprocessing")
|
||||
continue
|
||||
|
||||
render(event)
|
||||
print()
|
||||
event_print(event, "enqueued as "+job_id)
|
||||
active_jobs.append((job_id, event))
|
||||
|
||||
print("waiting for rendering to complete")
|
||||
|
||||
while len(active_jobs) > 0:
|
||||
time.sleep(60)
|
||||
active_jobs, finished_jobs = filter_finished_jobs(active_jobs)
|
||||
|
||||
print("{} jobs in queue, {} ready to finalize".format(len(active_jobs), len(finished_jobs)))
|
||||
for job_id, event in finished_jobs:
|
||||
event_print(event, "finalizing job")
|
||||
finalize_job(job_id, event)
|
||||
|
||||
|
||||
print('all done, cleaning up '+tempdir.name)
|
||||
tempdir.cleanup()
|
||||
|
|
Loading…
Add table
Reference in a new issue