64 from optparse
import OptionParser
65 import os, sys, re, gzip, glob, signal, atexit, operator, random
99 optParse = OptionParser()
100 optParse.add_option(
"-H",
"--hadooproot", dest=
"hadooproot",
101 help =
"Location of hadoop installation. If not set, " +
102 "the script will attempt to find it.",
104 optParse.add_option(
"--refrbin", dest=
"refrbin",
105 help =
"Location of the Reranker Framework (ReFr) bin directory",
106 default = defs.refrbin +
"/")
107 optParse.add_option(
"-d",
"--develdata", dest=
"develdata",
108 help =
"Location of development data")
109 optParse.add_option(
"-i",
"--input", dest=
"inputlist",
110 help =
"Location of input data on local FS",
112 optParse.add_option(
"-I",
"--hdfsinputdir", dest=
"hdfsinputdir",
113 help =
"Location of input data on HDFS")
114 optParse.add_option(
"-O",
"--hdfsoutputdir", dest=
"hdfsoutputdir",
115 help =
"Output directory (on HDFS) - will be removed before each iteration")
116 optParse.add_option(
"-o",
"--outputdir", dest=
"outputdir", help =
"Output directory ")
117 optParse.add_option(
"-M",
"--inputmodel", dest=
"inputmodel",
118 help =
"name of model to start with")
119 optParse.add_option(
"-S",
"--inputmodeliter", dest=
"startiter",
120 help =
"Iteration number of input model (will start with next iteration)",
122 optParse.add_option(
"-m",
"--modelname", dest=
"modelname",
123 help =
"name of model file (new models written to --outputdir)",
125 optParse.add_option(
"--maxiter", dest=
"maxiter",
126 help =
"maximum number of iterations to run", default = 100)
127 optParse.add_option(
"--numreducer", dest=
"numreducer",
128 help =
"Number of reducers.", default = 1)
129 optParse.add_option(
"--streamingloc", dest=
"streamingloc",
130 help =
"Location streaming jar file. " +
131 "An empty string will force the script to attempt to find the streaming jar file.",
134 optParse.add_option(
"--libpath", dest=
"libpath",
135 help =
"Specify the LD_LIBRARY_PATH",
136 default =
"/usr/local/lib:")
137 optParse.add_option(
"--splitsize", dest=
"minsplitsize",
138 help =
"Min size of each data split",
140 optParse.add_option(
"--tasktimeout", dest=
"tasktimeout",
141 help =
"Amount of time (seconds) for task to run (e.g., loading mode) " +
142 " before processing the next input record",
144 optParse.add_option(
"--force", dest=
"force",
145 help =
"Force all data processing even if files exist",
146 action =
"store_true",
148 optParse.add_option(
"--forcecompile", dest=
"forcecompile",
149 help =
"Force precomilation if applicable",
150 action =
"store_true",
152 optParse.add_option(
"--compilefeatures", dest=
"compilefeatures",
153 help =
"Compile features before processing",
154 action =
"store_true",
156 optParse.add_option(
"--maxdecline", dest=
"max_num_in_decline",
157 help =
"Number of iterations of an increasing loss before we stop training",
159 optParse.add_option(
"-v",
"--verbosity", dest=
"verbosity",
160 help =
"Set the verbosity of the debugging output",
162 optParse.add_option(
"--no-weighted-loss", dest=
"weightedloss",
163 help =
"Do not use a weighted loss (e.g., when there is no reference)",
164 action =
"store_false",
166 optParse.add_option(
"--model-config", dest=
"modelconfig",
167 help =
"Specifies the model configuration file")
168 optParse.add_option(
"--train-config", dest=
"trainconfig",
169 help =
"Specifies the feature extractor configuration " +
170 "file for training instances")
171 optParse.add_option(
"--dev-config", dest=
"devconfig",
172 help =
"Specifies the feature extractor configuraiton " +
173 "file for devtest instances")
174 optParse.add_option(
"--mapperfiles", dest=
"mapperfiles",
175 help =
"A list of files to be passed to the training mapper",
178 (options, args) = optParse.parse_args()
181 if (
not options.inputlist):
182 optParse.error(
"--input option is required")
183 if (
not options.hdfsinputdir):
184 optParse.error(
"--hdfsinputdir option is required")
185 if (
not options.hdfsoutputdir):
186 optParse.error(
"--hdfsoutputdir option is required")
187 if (
not options.outputdir):
188 optParse.error(
"--outputdir option is required")
190 pyutil.DEBUG = options.verbosity
193 hadooproot = options.hadooproot
195 if os.path.isdir(
"/usr/lib/hadoop"):
196 hadooproot =
"/usr/lib/hadoop"
197 elif os.path.isdir(
"/usr/local/lib/hadoop"):
198 hadooproot =
"/usr/local/lib/hadoop"
199 elif os.path.isdir(
"/opt/lib/hadoop"):
200 hadooproot =
"/opt/lib/hadoop"
203 "Please specify with --hadooproot.")
205 streamingloc = options.streamingloc
207 if os.path.exists(hadooproot +
"/hadoop-streaming.jar"):
208 streamingloc = hadooproot +
"/hadoop-streaming.jar"
210 tmppath = hadooproot +
"/contrib/streaming"
211 if not os.path.isdir(tmppath):
213 "Please specify location of hadoop streaming jar file with " +
215 streamingjar = glob.glob(tmppath +
"/hadoop-streaming*.jar")
216 if len(streamingjar) != 1:
217 pyutil.printError(10,
"Unable to find streaming jar, please specify with --streamingloc")
218 streamingloc = streamingjar[0]
221 if (
not os.path.isdir(hadooproot)
or
222 not os.path.exists(hadooproot +
"/bin/hadoop")):
223 optParse.error(
"--hadooproot must be the base directory of the " +
224 "hadoop installation")
226 if (
not os.path.exists(streamingloc)):
227 optParse.error(
"--streamingloc does not specify a valid jar files for the " +
228 "streaming interface (checked: " + streamingloc)
230 if (
not os.path.isdir(options.refrbin)
or
231 not os.path.exists(options.refrbin +
"/run-model")):
232 optParse.error(
"--refrbin directory must be the Reranker Framework bin " +
233 "direcotry. Checked: " + options.refrbin)
238 for inputstring
in options.inputlist:
239 for tmpfile
in inputstring.split():
240 filenames += glob.glob(tmpfile)
242 for input
in filenames:
244 if (
not os.path.exists(input)):
247 if (options.develdata
and not os.path.exists(options.develdata)):
248 pyutil.printError(131,
"Specified devel data file not found: " + options.develdata)
251 if (
not os.path.isdir(options.outputdir)):
252 os.makedirs(options.outputdir)
258 options.minsplitsize,
265 train_map_options =
""
269 if (options.modelconfig):
270 train_map_options +=
" --model-config ./" + os.path.basename(options.modelconfig)
271 train_files +=
" -file " + options.modelconfig
272 if (options.trainconfig):
273 train_map_options +=
" --train-config ./" + os.path.basename(options.trainconfig)
274 train_files +=
" -file " + options.trainconfig
275 train_map = (
"'" + options.refrbin +
"/run-model" + train_map_options +
276 " --train - --mapper -m -")
279 if options.mapperfiles:
280 for mapperfile
in options.mapperfiles:
281 train_files +=
" -file " + mapperfile
284 extractsym_map =
"'" + options.refrbin +
"/compile-features -i -'"
285 compiledata_map =
"'" + options.refrbin +
"/compile-features -i - --clear-raw --input-symbols "
286 train_reduce = options.refrbin +
"/model-merge-reducer"
287 train_recomb = options.refrbin +
"/model-combine-shards"
288 symbol_recomb = options.refrbin +
"/model-combine-symbols"
289 pipeeval_options =
""
290 if (options.devconfig):
291 pipeeval_options =
" --dev-config " + options.devconfig
292 pipeeval = options.refrbin +
"/piped-model-evaluator" + pipeeval_options
294 hadoop_inputfiles =
""
295 for inputfile
in filenames:
296 hadoop_inputfiles += hdproc.CheckInputFile(inputfile, options.hdfsinputdir,
297 options.outputdir, options.force,
301 precompdevfile = options.develdata
306 if (options.compilefeatures):
308 if (options.develdata):
309 precompdevfile = options.outputdir +
"/"
310 precompdevfile += os.path.basename(options.develdata).replace(
".gz",
"")
311 precompdevfile +=
"." + options.modelname +
".compiled.gz"
312 symbol_dir = options.hdfsinputdir +
"/Symbols/"
313 precomp_dir = options.hdfsinputdir +
"/Precompiled/"
314 precompdev_dir = options.hdfsinputdir +
"/PrecompiledDev/"
317 if (hdproc.CheckRemoveHDir(precomp_dir, (options.force
or options.forcecompile))
or
318 options.forcecompile):
320 if (options.develdata):
321 addl_data = hdproc.CheckInputFile(options.develdata, options.hdfsinputdir,
322 options.outputdir, options.force,
326 symfile_name = options.outputdir +
"/" + options.modelname +
".symbols.gz"
327 if (
not os.path.exists(symfile_name)):
328 hdproc.CheckRemoveHDir(symbol_dir,
True)
329 hdproc.RunMR(hadoop_inputfiles + addl_data, symbol_dir, 100,
330 "'" + train_reduce +
" -S'", extractsym_map,
"")
332 hdproc.CatPipe(symbol_dir +
"/part-*", symbol_recomb +
" -o " + symfile_name)
335 hdproc.RunMR(hadoop_inputfiles, precomp_dir, 0,
"",
336 compiledata_map +
"./" + os.path.basename(symfile_name) +
337 "' -file " + symfile_name,
"")
338 if (options.develdata):
339 hdproc.CheckRemoveHDir(precompdev_dir,
True)
340 hdproc.RunMR(addl_data, precompdev_dir, 0,
"",
341 compiledata_map +
"./" + os.path.basename(symfile_name) +
342 "' -file " + symfile_name,
"")
343 hdproc.CatPipe(precompdev_dir +
"/part-*",
" gzip -c > " + precompdevfile)
344 hdproc.CheckRemoveHDir(precompdev_dir,
True)
345 hadoop_inputfiles =
" --input " + precomp_dir
351 cur_model = options.inputmodel
354 iteration = int(options.startiter)
359 if (options.develdata):
360 eval_cmd = pipeeval +
" -d " + precompdevfile
361 if (
not options.weightedloss):
362 eval_cmd +=
" --use-weighted-loss false"
365 while (
not converged
and iteration < int(options.maxiter)):
370 hdproc.CheckRemoveHDir(options.hdfsoutputdir,
True)
375 iter_str =
" -i ./" + os.path.basename(cur_model) +
"' -file " + cur_model
377 hdproc.RunMR(hadoop_inputfiles, options.hdfsoutputdir, options.numreducer,
378 train_reduce, train_map + iter_str + train_files,
"")
381 model_output = options.outputdir +
"/" + options.modelname +
"_iter" + str(iteration) +
".gz"
382 proc_cmd = train_recomb +
" -o " + model_output
383 hdproc.CatPipe(options.hdfsoutputdir +
"/part-*", proc_cmd)
386 if (options.develdata):
387 devtest_score = evalio.sendreceive(model_output)
392 loss = float(devtest_score)
393 if (
not loss_history):
394 loss_history.append(loss)
397 diff = loss_history[-1] - loss
398 if (loss < loss_history[best_loss_index]):
400 best_loss_index = len(loss_history)
401 if (loss > loss_history[-1]):
406 loss_history.append(loss)
407 pyutil.printInfo(
"Loss for iteration " + str(iteration) +
": " + str(loss) +
408 " loss-delta: " + str(diff))
409 if (num_in_decline >= options.max_num_in_decline):
412 " iterations in decline")
415 str(num_in_decline) +
", which is less than " +
416 str(options.max_num_in_decline))
418 cur_model = model_output
419 pyutil.printInfo(
"Best model is from iteration: " + str(best_loss_index + 1) +
420 " with a devset loss of: " + str(loss_history[best_loss_index]))
A simple class interface for running hadoop commands.