Reranker Framework (ReFr)
Reranking framework for structure prediction and discriminative language modeling
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
hadoop-run.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 #-----------------------------------------------------------------------
3 # Copyright 2012, Google Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are
8 # met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following disclaimer
14 # in the documentation and/or other materials provided with the
15 # distribution.
16 # * Neither the name of Google Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from
18 # this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 # -----------------------------------------------------------------------------
32 ## @file hadoop-run.py
33 # A python program which will train a reranking model on a Hadoop cluster using
34 # the Iterative Parameter Mixtures perceptron training algorithm.
35 #
36 # You must first have a Hadoop account configured. In order to train, you will
37 # need to have the following:
38 # - Training data locally accessible (accessible by the script)
39 # - A HadoopFS (HDFS) directory with enough space to store the input
40 # training data, the intermediate models and the final model.
41 #
42 # The program will attempt to locate the Hadoop binary and the
43 # Hadoop streaming library. If this fails, you can specify these
44 # via command-line parameters (--hadooproot and --streamingloc).
45 #
46 # Usage:
47 # hadoop-run.py --input InputData --hdfsinputdir HDFSIndir \\
48 # --hdfsoutputdir HDFSOutDir --outputdir OutputDir
49 #
50 # InputData - A comma-separated list of file globs containing the training data.
51 # These must be accessible by script.
52 # OutputDir - The local directory where the trained model(s) are written. The
53 # default model name is 'model'. You can change this using the
54 # --modelname command-line parameter.
55 # HDFSInDir - A directory on HDFS where the input data will be copied to.
56 # HDFSOutDir - A directory on HDFS where the temporary data and output data
57 # will be written to.
58 # The final models are copied to the locally-accessible OutputDir.
59 #
60 # Check input command line options.
61 # @author kbhall@google.com (Keith Hall)
62 #
63 
64 from optparse import OptionParser
65 import os, sys, re, gzip, glob, signal, atexit, operator, random
66 import codecs
67 import pyutil
68 import hadooputil
69 import defs
70 
71 ##
72 # The following arguments are available to hadoop-run.py
73 #
74 # @param[in] hadooproot Location of hadoop installation.
75 # @param[in] refrbin Location of the Reranker Framework bin directory.
76 # @param[in] develdata Location of development data.
77 # @param[in] input Location of input data on local FS.
78 # @param[in] hdfsinputdir Location of input data on HDFS.
79 # @param[in] hdfsoutputdir Output directory (on HDFS) - will be removed before each iteration.
80 # @param[in] outputdir Output directory.
81 # @param[in] inputmodel Name of model to start with.
82 # @param[in] inputmodeliter Iteration number of input model (will start with next iteration).
83 # @param[in] modelname Name of model file (new models written to --outputdir).
84 # @param[in] maxiter Maximum number of iterations to run.
85 # @param[in] numreducer Number of reducers.
86 # @param[in] streamingloc Location under hadooproot for streaming jar file.
87 # @param[in] libpath Specify the LD_LIBRARY_PATH for jobs run on Hadoop.
88 # @param[in] splitsize Min size f each data split.
89 # @param[in] tasktimeout Amount of time (seconds) for task to run
90 # (e.g., loading mode) before processing the next input record.
91 # @param[in] force Force all data processing even if files exist.
92 # @param[in] forcecompile Force precomilation if applicable.
93 # @param[in] compilefeatures Compile features before processing.
94 # @param[in] maxdecline Number of iterations in decline before stopping
95 # @param[in] model-config Model configuration file
96 # @param[in] train-config Feature extractor configuration file for training
97 # @param[in] dev-config Feature extractor configuration file for dev
98 
99 optParse = OptionParser()
100 optParse.add_option("-H", "--hadooproot", dest="hadooproot",
101  help = "Location of hadoop installation. If not set, " +
102  "the script will attempt to find it.",
103  default = "")
104 optParse.add_option("--refrbin", dest="refrbin",
105  help = "Location of the Reranker Framework (ReFr) bin directory",
106  default = defs.refrbin + "/")
107 optParse.add_option("-d", "--develdata", dest="develdata",
108  help = "Location of development data")
109 optParse.add_option("-i", "--input", dest="inputlist",
110  help = "Location of input data on local FS",
111  action = "append")
112 optParse.add_option("-I", "--hdfsinputdir", dest="hdfsinputdir",
113  help = "Location of input data on HDFS")
114 optParse.add_option("-O", "--hdfsoutputdir", dest="hdfsoutputdir",
115  help = "Output directory (on HDFS) - will be removed before each iteration")
116 optParse.add_option("-o", "--outputdir", dest="outputdir", help = "Output directory ")
117 optParse.add_option("-M", "--inputmodel", dest="inputmodel",
118  help = "name of model to start with")
119 optParse.add_option("-S", "--inputmodeliter", dest="startiter",
120  help = "Iteration number of input model (will start with next iteration)",
121  default = 0)
122 optParse.add_option("-m", "--modelname", dest="modelname",
123  help = "name of model file (new models written to --outputdir)",
124  default = "model")
125 optParse.add_option("--maxiter", dest="maxiter",
126  help = "maximum number of iterations to run", default = 100)
127 optParse.add_option("--numreducer", dest="numreducer",
128  help = "Number of reducers.", default = 1)
129 optParse.add_option("--streamingloc", dest="streamingloc",
130  help = "Location streaming jar file. " +
131  "An empty string will force the script to attempt to find the streaming jar file.",
132  default = "")
133 # default = "contrib/streaming/hadoop-0.20.2-streaming.jar")
134 optParse.add_option("--libpath", dest="libpath",
135  help = "Specify the LD_LIBRARY_PATH",
136  default = "/usr/local/lib:")
137 optParse.add_option("--splitsize", dest="minsplitsize",
138  help = "Min size of each data split",
139  default = 0)
140 optParse.add_option("--tasktimeout", dest="tasktimeout",
141  help = "Amount of time (seconds) for task to run (e.g., loading mode) " +
142  " before processing the next input record",
143  default = 0)
144 optParse.add_option("--force", dest="force",
145  help = "Force all data processing even if files exist",
146  action = "store_true",
147  default = False)
148 optParse.add_option("--forcecompile", dest="forcecompile",
149  help = "Force precomilation if applicable",
150  action = "store_true",
151  default = False)
152 optParse.add_option("--compilefeatures", dest="compilefeatures",
153  help = "Compile features before processing",
154  action = "store_true",
155  default = False)
156 optParse.add_option("--maxdecline", dest="max_num_in_decline",
157  help = "Number of iterations of an increasing loss before we stop training",
158  default = 5)
159 optParse.add_option("-v", "--verbosity", dest="verbosity",
160  help = "Set the verbosity of the debugging output",
161  default = 0)
162 optParse.add_option("--no-weighted-loss", dest="weightedloss",
163  help = "Do not use a weighted loss (e.g., when there is no reference)",
164  action = "store_false",
165  default = True)
166 optParse.add_option("--model-config", dest="modelconfig",
167  help = "Specifies the model configuration file")
168 optParse.add_option("--train-config", dest="trainconfig",
169  help = "Specifies the feature extractor configuration " +
170  "file for training instances")
171 optParse.add_option("--dev-config", dest="devconfig",
172  help = "Specifies the feature extractor configuraiton " +
173  "file for devtest instances")
174 optParse.add_option("--mapperfiles", dest="mapperfiles",
175  help = "A list of files to be passed to the training mapper",
176  action = "append")
177 
178 (options, args) = optParse.parse_args()
179 
180 # Check input command line options.
181 if (not options.inputlist):
182  optParse.error("--input option is required")
183 if (not options.hdfsinputdir):
184  optParse.error("--hdfsinputdir option is required")
185 if (not options.hdfsoutputdir):
186  optParse.error("--hdfsoutputdir option is required")
187 if (not options.outputdir):
188  optParse.error("--outputdir option is required")
189 
190 pyutil.DEBUG = options.verbosity
191 
192 # Attempt to find the hadoop installation.
193 hadooproot = options.hadooproot
194 if not hadooproot:
195  if os.path.isdir("/usr/lib/hadoop"):
196  hadooproot = "/usr/lib/hadoop"
197  elif os.path.isdir("/usr/local/lib/hadoop"):
198  hadooproot = "/usr/local/lib/hadoop"
199  elif os.path.isdir("/opt/lib/hadoop"):
200  hadooproot = "/opt/lib/hadoop"
201  else:
202  pyutil.printError(10, "Unable to find the hadoop installation. " +
203  "Please specify with --hadooproot.")
204 
205 streamingloc = options.streamingloc
206 if not streamingloc:
207  if os.path.exists(hadooproot + "/hadoop-streaming.jar"):
208  streamingloc = hadooproot + "/hadoop-streaming.jar"
209  else:
210  tmppath = hadooproot + "/contrib/streaming"
211  if not os.path.isdir(tmppath):
212  pyutil.printError(10, hadooproot + "/contrib/streaming does not exist. " +
213  "Please specify location of hadoop streaming jar file with " +
214  "--streamingloc")
215  streamingjar = glob.glob(tmppath + "/hadoop-streaming*.jar")
216  if len(streamingjar) != 1:
217  pyutil.printError(10, "Unable to find streaming jar, please specify with --streamingloc")
218  streamingloc = streamingjar[0]
219 
220 # Sanity check of Directories.
221 if (not os.path.isdir(hadooproot) or
222  not os.path.exists(hadooproot + "/bin/hadoop")):
223  optParse.error("--hadooproot must be the base directory of the " +
224  "hadoop installation")
225 
226 if (not os.path.exists(streamingloc)):
227  optParse.error("--streamingloc does not specify a valid jar files for the " +
228  "streaming interface (checked: " + streamingloc)
229 
230 if (not os.path.isdir(options.refrbin) or
231  not os.path.exists(options.refrbin + "/run-model")):
232  optParse.error("--refrbin directory must be the Reranker Framework bin " +
233  "direcotry. Checked: " + options.refrbin)
234 
235 
236 ## Collect input filenames.
237 filenames = []
238 for inputstring in options.inputlist:
239  for tmpfile in inputstring.split():
240  filenames += glob.glob(tmpfile)
241 
242 for input in filenames:
243  pyutil.printInfo("Input file: " + input)
244  if (not os.path.exists(input)):
245  pyutil.printError(130, "Input file not found: " + input)
246 
247 if (options.develdata and not os.path.exists(options.develdata)):
248  pyutil.printError(131, "Specified devel data file not found: " + options.develdata)
249 
250 ## Create output directory if it does not exist.
251 if (not os.path.isdir(options.outputdir)):
252  os.makedirs(options.outputdir)
253 
254 ## @var hdproc
255 # HadoopInterface object used to process all Hadoop MR utils.
256 hdproc = hadooputil.HadoopInterface(hadooproot,
257  streamingloc,
258  options.minsplitsize,
259  options.tasktimeout,
260  options.libpath)
261 
262 ## Configuration for training options
263 # @var train_map_options
264 # Options passed to the mapper binary.
265 train_map_options = ""
266 # @var train_files
267 # string containing '-file filename' for all external files.
268 train_files = ""
269 if (options.modelconfig):
270  train_map_options += " --model-config ./" + os.path.basename(options.modelconfig)
271  train_files += " -file " + options.modelconfig
272 if (options.trainconfig):
273  train_map_options += " --train-config ./" + os.path.basename(options.trainconfig)
274  train_files += " -file " + options.trainconfig
275 train_map = ("'" + options.refrbin + "/run-model" + train_map_options +
276  " --train - --mapper -m -")
277 
278 
279 if options.mapperfiles:
280  for mapperfile in options.mapperfiles:
281  train_files += " -file " + mapperfile
282 
283 ## Shortcuts to command-line programs.
284 extractsym_map = "'" + options.refrbin + "/compile-features -i -'"
285 compiledata_map = "'" + options.refrbin + "/compile-features -i - --clear-raw --input-symbols "
286 train_reduce = options.refrbin + "/model-merge-reducer"
287 train_recomb = options.refrbin + "/model-combine-shards"
288 symbol_recomb = options.refrbin + "/model-combine-symbols"
289 pipeeval_options = ""
290 if (options.devconfig):
291  pipeeval_options = " --dev-config " + options.devconfig
292 pipeeval = options.refrbin + "/piped-model-evaluator" + pipeeval_options
293 
294 hadoop_inputfiles = ""
295 for inputfile in filenames:
296  hadoop_inputfiles += hdproc.CheckInputFile(inputfile, options.hdfsinputdir,
297  options.outputdir, options.force,
298  True)
299  #not options.compilefeatures)
300 
301 precompdevfile = options.develdata
302 
303 ## Precopilation of string features.
304 # Optional - reduces the size of the models, but takes time to create initial precompiled data.
305 #
306 if (options.compilefeatures):
307  pyutil.printInfo("Precompiling feature indices")
308  if (options.develdata):
309  precompdevfile = options.outputdir + "/"
310  precompdevfile += os.path.basename(options.develdata).replace(".gz","")
311  precompdevfile += "." + options.modelname + ".compiled.gz"
312  symbol_dir = options.hdfsinputdir + "/Symbols/"
313  precomp_dir = options.hdfsinputdir + "/Precompiled/"
314  precompdev_dir = options.hdfsinputdir + "/PrecompiledDev/"
315 
316  # Extract all features.
317  if (hdproc.CheckRemoveHDir(precomp_dir, (options.force or options.forcecompile)) or
318  options.forcecompile):
319  addl_data = ""
320  if (options.develdata):
321  addl_data = hdproc.CheckInputFile(options.develdata, options.hdfsinputdir,
322  options.outputdir, options.force,
323  True)
324  pyutil.printInfo("Dev data file: " + addl_data)
325  # Copy data to HDFS
326  symfile_name = options.outputdir + "/" + options.modelname + ".symbols.gz"
327  if (not os.path.exists(symfile_name)):
328  hdproc.CheckRemoveHDir(symbol_dir, True)
329  hdproc.RunMR(hadoop_inputfiles + addl_data, symbol_dir, 100,
330  "'" + train_reduce + " -S'", extractsym_map, "")
331  # Concatenate symbols to local symbol table.
332  hdproc.CatPipe(symbol_dir + "/part-*", symbol_recomb + " -o " + symfile_name)
333 
334  # Convert the original input data.
335  hdproc.RunMR(hadoop_inputfiles, precomp_dir, 0, "",
336  compiledata_map + "./" + os.path.basename(symfile_name) +
337  "' -file " + symfile_name, "")
338  if (options.develdata):
339  hdproc.CheckRemoveHDir(precompdev_dir, True)
340  hdproc.RunMR(addl_data, precompdev_dir, 0, "",
341  compiledata_map + "./" + os.path.basename(symfile_name) +
342  "' -file " + symfile_name, "")
343  hdproc.CatPipe(precompdev_dir + "/part-*", " gzip -c > " + precompdevfile)
344  hdproc.CheckRemoveHDir(precompdev_dir, True)
345  hadoop_inputfiles = " --input " + precomp_dir
346 
347 #------------
348 # Run Hadoop Iterative MapReduce
349 # (Iterative Parameter Mixtures)
350 #------------
351 cur_model = options.inputmodel
352 converged = False
353 
354 iteration = int(options.startiter)
355 prev_loss = -9999
356 loss_history = []
357 num_in_decline = 0
358 best_loss_index = 0
359 if (options.develdata):
360  eval_cmd = pipeeval + " -d " + precompdevfile
361  if (not options.weightedloss):
362  eval_cmd += " --use-weighted-loss false"
363  evalio = pyutil.CommandIO(eval_cmd)
364 
365 while (not converged and iteration < int(options.maxiter)):
366  iteration += 1
367  pyutil.printInfo("Training iteration: " + str(iteration))
368  # Make sure the output directory is
369  # Run the MapReducer Job
370  hdproc.CheckRemoveHDir(options.hdfsoutputdir, True)
371 
372  # Create the MR string and run the MR
373  iter_str = "'"
374  if (cur_model):
375  iter_str = " -i ./" + os.path.basename(cur_model) + "' -file " + cur_model
376 
377  hdproc.RunMR(hadoop_inputfiles, options.hdfsoutputdir, options.numreducer,
378  train_reduce, train_map + iter_str + train_files, "")
379 
380  # Copy data form the mapreduce to the local file-system
381  model_output = options.outputdir + "/" + options.modelname + "_iter" + str(iteration) + ".gz"
382  proc_cmd = train_recomb + " -o " + model_output
383  hdproc.CatPipe(options.hdfsoutputdir + "/part-*", proc_cmd)
384 
385  devtest_score = 0
386  if (options.develdata):
387  devtest_score = evalio.sendreceive(model_output)
388 
389  loss = 0.0;
390  if (devtest_score):
391  # Get the score returned on STDOUT
392  loss = float(devtest_score)
393  if (not loss_history):
394  loss_history.append(loss)
395  pyutil.printInfo("Loss for iteration " + str(iteration) + ": " + str(loss))
396  else:
397  diff = loss_history[-1] - loss
398  if (loss < loss_history[best_loss_index]):
399  # Loss is appended below:
400  best_loss_index = len(loss_history)
401  if (loss > loss_history[-1]):
402  num_in_decline += 1
403  else:
404  num_in_decline = 0
405  # Append loss to end of history.
406  loss_history.append(loss)
407  pyutil.printInfo("Loss for iteration " + str(iteration) + ": " + str(loss) +
408  " loss-delta: " + str(diff))
409  if (num_in_decline >= options.max_num_in_decline):
410  converged = True
411  pyutil.printInfo("Stopping after " + str(num_in_decline) +
412  " iterations in decline")
413  else:
414  pyutil.printInfo("Continuing to train as number epochs in decline is: " +
415  str(num_in_decline) + ", which is less than " +
416  str(options.max_num_in_decline))
417 
418  cur_model = model_output
419 pyutil.printInfo("Best model is from iteration: " + str(best_loss_index + 1) +
420  " with a devset loss of: " + str(loss_history[best_loss_index]))
A simple class interface for running hadoop commands.
Definition: hadooputil.py:42
def printError
Definition: pyutil.py:84
def printInfo
Definition: pyutil.py:78