64 from optparse 
import OptionParser
 
   65 import os, sys, re, gzip, glob, signal, atexit, operator, random
 
   99 optParse = OptionParser()
 
  100 optParse.add_option(
"-H", 
"--hadooproot", dest=
"hadooproot",
 
  101                     help = 
"Location of hadoop installation.  If not set, " +
 
  102                            "the script will attempt to find it.",
 
  104 optParse.add_option(
"--refrbin", dest=
"refrbin",
 
  105                     help = 
"Location of the Reranker Framework (ReFr) bin directory",
 
  106                     default = defs.refrbin + 
"/")
 
  107 optParse.add_option(
"-d", 
"--develdata", dest=
"develdata",
 
  108                     help = 
"Location of development data")
 
  109 optParse.add_option(
"-i", 
"--input", dest=
"inputlist",
 
  110                     help = 
"Location of input data on local FS",
 
  112 optParse.add_option(
"-I", 
"--hdfsinputdir", dest=
"hdfsinputdir",
 
  113                     help = 
"Location of input data on HDFS")
 
  114 optParse.add_option(
"-O", 
"--hdfsoutputdir", dest=
"hdfsoutputdir",
 
  115                     help = 
"Output directory (on HDFS) - will be removed before each iteration")
 
  116 optParse.add_option(
"-o", 
"--outputdir", dest=
"outputdir", help = 
"Output directory ")
 
  117 optParse.add_option(
"-M", 
"--inputmodel", dest=
"inputmodel",
 
  118                     help = 
"name of model to start with")
 
  119 optParse.add_option(
"-S", 
"--inputmodeliter", dest=
"startiter",
 
  120                     help = 
"Iteration number of input model (will start with next iteration)",
 
  122 optParse.add_option(
"-m", 
"--modelname", dest=
"modelname",
 
  123                     help = 
"name of model file (new models written to --outputdir)",
 
  125 optParse.add_option(
"--maxiter", dest=
"maxiter",
 
  126                     help = 
"maximum number of iterations to run", default = 100)
 
  127 optParse.add_option(
"--numreducer", dest=
"numreducer",
 
  128                     help = 
"Number of reducers.", default = 1)
 
  129 optParse.add_option(
"--streamingloc", dest=
"streamingloc",
 
  130                     help = 
"Location streaming jar file.  " +
 
  131                            "An empty string will force the script to attempt to find the streaming jar file.",
 
  134 optParse.add_option(
"--libpath", dest=
"libpath",
 
  135                     help = 
"Specify the LD_LIBRARY_PATH",
 
  136                     default = 
"/usr/local/lib:")
 
  137 optParse.add_option(
"--splitsize", dest=
"minsplitsize",
 
  138                     help = 
"Min size of each data split",
 
  140 optParse.add_option(
"--tasktimeout", dest=
"tasktimeout",
 
  141                     help = 
"Amount of time (seconds) for task to run (e.g., loading mode) " +
 
  142                            " before processing the next input record",
 
  144 optParse.add_option(
"--force", dest=
"force",
 
  145                     help = 
"Force all data processing even if files exist",
 
  146                     action = 
"store_true",
 
  148 optParse.add_option(
"--forcecompile", dest=
"forcecompile",
 
  149                     help = 
"Force precomilation if applicable",
 
  150                     action = 
"store_true",
 
  152 optParse.add_option(
"--compilefeatures", dest=
"compilefeatures",
 
  153                     help = 
"Compile features before processing",
 
  154                     action = 
"store_true",
 
  156 optParse.add_option(
"--maxdecline", dest=
"max_num_in_decline",
 
  157                     help = 
"Number of iterations of an increasing loss before we stop training",
 
  159 optParse.add_option(
"-v", 
"--verbosity", dest=
"verbosity",
 
  160                     help = 
"Set the verbosity of the debugging output",
 
  162 optParse.add_option(
"--no-weighted-loss", dest=
"weightedloss",
 
  163                     help = 
"Do not use a weighted loss (e.g., when there is no reference)",
 
  164                     action = 
"store_false",
 
  166 optParse.add_option(
"--model-config", dest=
"modelconfig",
 
  167                     help = 
"Specifies the model configuration file")
 
  168 optParse.add_option(
"--train-config", dest=
"trainconfig",
 
  169                     help = 
"Specifies the feature extractor configuration " +
 
  170                            "file for training instances")
 
  171 optParse.add_option(
"--dev-config", dest=
"devconfig",
 
  172                     help = 
"Specifies the feature extractor configuraiton " +
 
  173                            "file for devtest instances")
 
  174 optParse.add_option(
"--mapperfiles", dest=
"mapperfiles",
 
  175                     help = 
"A list of files to be passed to the training mapper",
 
  178 (options, args) = optParse.parse_args()
 
  181 if (
not options.inputlist):
 
  182   optParse.error(
"--input option is required")
 
  183 if (
not options.hdfsinputdir):
 
  184   optParse.error(
"--hdfsinputdir option is required")
 
  185 if (
not options.hdfsoutputdir):
 
  186   optParse.error(
"--hdfsoutputdir option is required")
 
  187 if (
not options.outputdir):
 
  188   optParse.error(
"--outputdir option is required")
 
  190 pyutil.DEBUG = options.verbosity
 
  193 hadooproot = options.hadooproot
 
  195   if os.path.isdir(
"/usr/lib/hadoop"):
 
  196     hadooproot = 
"/usr/lib/hadoop" 
  197   elif os.path.isdir(
"/usr/local/lib/hadoop"):
 
  198     hadooproot = 
"/usr/local/lib/hadoop" 
  199   elif os.path.isdir(
"/opt/lib/hadoop"):
 
  200     hadooproot = 
"/opt/lib/hadoop" 
  203                       "Please specify with --hadooproot.")
 
  205 streamingloc = options.streamingloc
 
  207   if os.path.exists(hadooproot + 
"/hadoop-streaming.jar"):
 
  208     streamingloc = hadooproot + 
"/hadoop-streaming.jar" 
  210     tmppath = hadooproot + 
"/contrib/streaming" 
  211     if not os.path.isdir(tmppath):
 
  213                         "Please specify location of hadoop streaming jar file with " +
 
  215     streamingjar = glob.glob(tmppath + 
"/hadoop-streaming*.jar")
 
  216     if len(streamingjar) != 1:
 
  217       pyutil.printError(10, 
"Unable to find streaming jar, please specify with --streamingloc")
 
  218     streamingloc = streamingjar[0]
 
  221 if (
not os.path.isdir(hadooproot) 
or 
  222     not os.path.exists(hadooproot + 
"/bin/hadoop")):
 
  223   optParse.error(
"--hadooproot must be the base directory of the " +
 
  224                  "hadoop installation")
 
  226 if (
not os.path.exists(streamingloc)):
 
  227   optParse.error(
"--streamingloc does not specify a valid jar files for the " + 
 
  228                  "streaming interface (checked: " + streamingloc)
 
  230 if (
not os.path.isdir(options.refrbin) 
or 
  231     not os.path.exists(options.refrbin + 
"/run-model")):
 
  232   optParse.error(
"--refrbin directory must be the Reranker Framework bin " +
 
  233                  "direcotry.  Checked: " + options.refrbin)
 
  238 for inputstring 
in options.inputlist:
 
  239   for tmpfile 
in inputstring.split():
 
  240     filenames += glob.glob(tmpfile)
 
  242 for input 
in filenames:
 
  244   if (
not os.path.exists(input)):
 
  247 if (options.develdata 
and not os.path.exists(options.develdata)):
 
  248   pyutil.printError(131, 
"Specified devel data file not found: " + options.develdata)
 
  251 if (
not os.path.isdir(options.outputdir)):
 
  252   os.makedirs(options.outputdir)
 
  258                                     options.minsplitsize,
 
  265 train_map_options = 
"" 
  269 if (options.modelconfig):
 
  270   train_map_options += 
" --model-config ./" + os.path.basename(options.modelconfig)
 
  271   train_files += 
" -file " + options.modelconfig
 
  272 if (options.trainconfig):
 
  273   train_map_options += 
" --train-config ./" + os.path.basename(options.trainconfig)
 
  274   train_files += 
" -file " + options.trainconfig
 
  275 train_map = (
"'" + options.refrbin + 
"/run-model" + train_map_options +
 
  276             " --train - --mapper -m -")
 
  279 if options.mapperfiles:
 
  280   for mapperfile 
in options.mapperfiles:
 
  281     train_files += 
" -file " + mapperfile
 
  284 extractsym_map = 
"'" + options.refrbin + 
"/compile-features -i -'" 
  285 compiledata_map = 
"'" + options.refrbin + 
"/compile-features -i - --clear-raw --input-symbols " 
  286 train_reduce = options.refrbin + 
"/model-merge-reducer" 
  287 train_recomb = options.refrbin + 
"/model-combine-shards" 
  288 symbol_recomb = options.refrbin + 
"/model-combine-symbols" 
  289 pipeeval_options = 
"" 
  290 if (options.devconfig):
 
  291   pipeeval_options = 
" --dev-config " + options.devconfig
 
  292 pipeeval = options.refrbin + 
"/piped-model-evaluator" + pipeeval_options
 
  294 hadoop_inputfiles = 
"" 
  295 for inputfile 
in filenames:
 
  296   hadoop_inputfiles += hdproc.CheckInputFile(inputfile, options.hdfsinputdir,
 
  297                                              options.outputdir, options.force,
 
  301 precompdevfile = options.develdata
 
  306 if (options.compilefeatures):
 
  308   if (options.develdata):
 
  309     precompdevfile = options.outputdir + 
"/" 
  310     precompdevfile += os.path.basename(options.develdata).replace(
".gz",
"")
 
  311     precompdevfile += 
"." + options.modelname + 
".compiled.gz" 
  312   symbol_dir = options.hdfsinputdir + 
"/Symbols/" 
  313   precomp_dir = options.hdfsinputdir + 
"/Precompiled/" 
  314   precompdev_dir = options.hdfsinputdir + 
"/PrecompiledDev/" 
  317   if (hdproc.CheckRemoveHDir(precomp_dir, (options.force 
or options.forcecompile)) 
or 
  318       options.forcecompile):
 
  320     if (options.develdata):
 
  321       addl_data = hdproc.CheckInputFile(options.develdata, options.hdfsinputdir,
 
  322                                         options.outputdir, options.force,
 
  326     symfile_name = options.outputdir + 
"/" + options.modelname + 
".symbols.gz" 
  327     if (
not os.path.exists(symfile_name)):
 
  328       hdproc.CheckRemoveHDir(symbol_dir, 
True)
 
  329       hdproc.RunMR(hadoop_inputfiles + addl_data, symbol_dir, 100,
 
  330                    "'" + train_reduce +  
" -S'", extractsym_map, 
"")
 
  332       hdproc.CatPipe(symbol_dir + 
"/part-*", symbol_recomb + 
" -o " + symfile_name)
 
  335     hdproc.RunMR(hadoop_inputfiles, precomp_dir, 0, 
"",
 
  336                  compiledata_map + 
"./" + os.path.basename(symfile_name) +
 
  337                  "' -file " + symfile_name, 
"")
 
  338     if (options.develdata):
 
  339       hdproc.CheckRemoveHDir(precompdev_dir, 
True)
 
  340       hdproc.RunMR(addl_data, precompdev_dir, 0, 
"",
 
  341                    compiledata_map + 
"./" + os.path.basename(symfile_name) +
 
  342                    "' -file " + symfile_name, 
"")
 
  343       hdproc.CatPipe(precompdev_dir + 
"/part-*", 
" gzip -c > " + precompdevfile)
 
  344       hdproc.CheckRemoveHDir(precompdev_dir, 
True)
 
  345   hadoop_inputfiles = 
" --input " + precomp_dir
 
  351 cur_model = options.inputmodel
 
  354 iteration = int(options.startiter)
 
  359 if (options.develdata):
 
  360   eval_cmd = pipeeval + 
" -d " + precompdevfile
 
  361   if (
not options.weightedloss):
 
  362     eval_cmd += 
" --use-weighted-loss false" 
  365 while (
not converged 
and iteration < int(options.maxiter)):
 
  370   hdproc.CheckRemoveHDir(options.hdfsoutputdir, 
True)
 
  375     iter_str = 
" -i ./" + os.path.basename(cur_model) + 
"' -file " + cur_model
 
  377   hdproc.RunMR(hadoop_inputfiles, options.hdfsoutputdir, options.numreducer,
 
  378                train_reduce, train_map + iter_str + train_files, 
"")
 
  381   model_output = options.outputdir + 
"/" + options.modelname + 
"_iter" + str(iteration) + 
".gz" 
  382   proc_cmd = train_recomb + 
" -o " + model_output
 
  383   hdproc.CatPipe(options.hdfsoutputdir + 
"/part-*", proc_cmd)
 
  386   if (options.develdata):
 
  387     devtest_score = evalio.sendreceive(model_output)
 
  392     loss = float(devtest_score)
 
  393   if (
not loss_history):
 
  394     loss_history.append(loss)
 
  397     diff = loss_history[-1] - loss
 
  398     if (loss < loss_history[best_loss_index]):
 
  400       best_loss_index = len(loss_history)
 
  401     if (loss > loss_history[-1]):
 
  406     loss_history.append(loss)
 
  407     pyutil.printInfo(
"Loss for iteration " + str(iteration) + 
": " + str(loss) +
 
  408                      " loss-delta: " + str(diff))
 
  409     if (num_in_decline >= options.max_num_in_decline):
 
  412                        " iterations in decline")
 
  415                        str(num_in_decline) + 
", which is less than " +
 
  416                        str(options.max_num_in_decline))
 
  418   cur_model = model_output
 
  419 pyutil.printInfo(
"Best model is from iteration: " + str(best_loss_index + 1) +
 
  420                  " with a devset loss of: " + str(loss_history[best_loss_index]))
 
A simple class interface for running hadoop commands.