Reranker Framework (ReFr)
Reranking framework for structure prediction and discriminative language modeling
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
hadooputil.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #-----------------------------------------------------------------------
3 # Copyright 2012, Google Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are
8 # met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following disclaimer
14 # in the documentation and/or other materials provided with the
15 # distribution.
16 # * Neither the name of Google Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from
18 # this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 # -----------------------------------------------------------------------------
32 ## @file hadooputil.py
33 # A set of utilities to help interface with Hadoop.
34 #
35 
36 import os,sys, math
37 import pyutil
38 
39 ## @class HadoopInterface
40 # A simple class interface for running hadoop commands.
41 #
43  def __init__(self, hbasedir, streamingloc, minsplitsize, tasktimeout, libpath):
44  self.hadoopmr_ = hbasedir + "/bin/hadoop "
45  self.hadoopmr_ += " jar " + streamingloc
46  if (int(minsplitsize) > 0):
47  self.hadoopmr_ += " -Dmapred.min.split.size=" + str(minsplitsize)
48  if (int(tasktimeout) >= 0):
49  self.hadoopmr_ += " -Dmapred.task.timeout=" + str(tasktimeout)
50  self.hadooplibpath_ = ""
51  if (libpath):
52  self.hadooplibpath_ = " --cmdenv LD_LIBRARY_PATH=" + libpath + " "
53  self.hadoopfs_ = hbasedir + "/bin/hadoop fs "
54  self.hadooptest_ = hbasedir + "/bin/hadoop fs -test "
55  self.hadoopcat_ = hbasedir + "/bin/hadoop fs -cat "
56  self.hadoopput_ = hbasedir + "/bin/hadoop fs -put "
57  self.hadoopmove_ = hbasedir + "/bin/hadoop fs -moveFromLocal "
58  self.hadoopget_ = hbasedir + "/bin/hadoop fs -get "
59  self.hadoopmkdir_ = hbasedir + "/bin/hadoop fs -mkdir "
60  self.hadooprmr_ = hbasedir + "/bin/hadoop fs -rmr "
61 
62  ## Check if a directory exists on HDFS.
63  # @param[in] directory name of directory to check
64  # @return True if directory exits.
65  def CheckHDir(self, directory):
66  test_str = self.hadooptest_ + "-d " + directory
67  return (pyutil.runCommand(test_str) == 0)
68 
69  ## Function to check for the existence of a directory on the HDFS.
70  # @param[in] directory Direcotry to check.
71  # @param[in] remove Remove the directory if it exists.
72  # @return True if it did not exist or was removed.
73  def CheckRemoveHDir(self, directory, remove):
74  nodir = False
75  if (self.CheckHDir(directory)):
76  if (remove):
77  rm_str = self.hadooprmr_ + directory
78  pyutil.runCommand(rm_str)
79  nodir = True
80  else:
81  nodir = True
82  return nodir
83 
84  ## Function to check for a file on HDFS.
85  # @param[in] filename The file to check for.
86  def CheckHDFSFile(self, filename):
87  hdinput_test = self.hadooptest_ + "-e " + filename
88  return (pyutil.runCommand(hdinput_test) == 0)
89 
90  ## Check for an input file and prepare it for MR processing.
91  # @param[in] inputfile Name of the local file to prepare.
92  # @param[in] hdfsinputdir HDFS directory for data staging.
93  # @param[in] outputdir Local file system directory for the output.
94  # @param[in] force Reprocess data even if files already exist.
95  # @param[in] uncompress Uncompress data, if compressed, before running.
96  #
97  # @return A MR input string.
98  # Stage the input data for MapReduce processing.
99  # If we are uncompressing compressed files, then we move uncompressed data
100  # HDFS; otherwise, we simply copy the data to HDFS.
101  #
102  def CheckInputFile(self, inputfile, hdfsinputdir, outputdir, force, uncompress):
103  input_file_list = ""
104  if (inputfile.endswith(".gz") and uncompress):
105  input_filename = os.path.basename(inputfile).replace(".gz","")
106  else:
107  input_filename = os.path.basename(inputfile)
108  pyutil.printDebug(1, "Processing input " + input_filename + "\n")
109  # Copy the input data to HDFS
110  # Check that the input data exists and move to HDFS if necessary.
111  hdfsinputfile = hdfsinputdir + "/" + input_filename
112  if (not self.CheckHDFSFile(hdfsinputfile) or force):
113  pyutil.printInfo("Regenerating HDFS input: " + hdfsinputfile)
114  if (not self.CheckHDir(hdfsinputdir)):
115  pyutil.runCommand(self.hadoopmkdir_ + hdfsinputdir)
116  if (inputfile.endswith(".gz") and uncompress):
117  new_input = outputdir + "/" + input_filename
118  unzipcmd = "gunzip -c " + inputfile + " > " + new_input
119  if (pyutil.runCommand(unzipcmd) != 0):
120  pyutil.printError(12, "Unable to unzip file: " + inputfile)
121  pyutil.runCommand(self.hadoopmove_ + new_input + " " + hdfsinputdir)
122  input_file_list += " --input " + hdfsinputdir + "/" + input_filename
123  else:
124  pyutil.runCommand(self.hadoopput_ + inputfile + " " + hdfsinputdir)
125  input_file_list += " --input " + hdfsinputdir + "/" + input_filename
126  if (not self.CheckHDFSFile(hdfsinputfile)):
127  pyutil.printError(10, "Unable to create input on HDFS: " + hdfsinputfile)
128  else:
129  input_file_list += " --input " + hdfsinputdir + "/" + input_filename
130  pyutil.printDebug(5, "Found file on HDFS: " + hdfsinputdir + "/" + input_filename)
131  return input_file_list
132 
133  def CatPipe(self, hdfsfiles, pipecmd):
134  catcmd = self.hadoopcat_ + " " + hdfsfiles + " | " + pipecmd
135  if(pyutil.runCommand(catcmd)):
136  pyutil.printError(34, "Error: " + catcmd)
137 
138  def CatPipeRead(self, hdfsfiles, pipecmd, retval):
139  catcmd = self.hadoopcat_ + " " + hdfsfiles + " | " + pipecmd
140  if (not pyutil.readCommand(catcmd, retval)):
141  pyutil.printError(30, "Error running command " + catcmd)
142 
143  ## RunMR
144  # Run a MapReduce
145  # @param[in] input_files HDFS location of input files.
146  # @param[in] outputdir HDFS location of output.
147  # @param[in] reduce_tasks Number of reducer tasks (0 = use default).
148  # @param[in] reducer Full string of streaming reducer command.
149  # @param[in] mapper Full string of streaming mapper command.
150  # @param[in] mroptions Addition streaming MR options (usually specified with -D).
151  def RunMR(self, input_files, outputdir, reduce_tasks, reducer, mapper, mroptions):
152  mr_str = self.hadoopmr_
153  if (mroptions):
154  mr_str += mroptions + " "
155  mr_str += self.hadooplibpath_ + input_files
156  if not reducer:
157  mr_str += " -numReduceTasks 0 --reducer None "
158  #mr_str += " -numReduceTasks 100 --reducer cat "
159  else:
160  if (int(reduce_tasks) >= 0):
161  mr_str += " -numReduceTasks " + str(reduce_tasks)
162  mr_str += " --reducer " + reducer
163  mr_str += " --output " + outputdir
164  mr_str += " --mapper " + mapper
165  pyutil.printInfo("Running MR on: " + input_files)
166  if (pyutil.runCommand(mr_str) != 0):
167  pyutil.printError(33, "Error running MR" + mr_str)
def runCommand
Definition: pyutil.py:46
def RunMR
RunMR Run a MapReduce.
Definition: hadooputil.py:151
def CheckHDir
Check if a directory exists on HDFS.
Definition: hadooputil.py:65
def printDebug
Definition: pyutil.py:73
def CheckRemoveHDir
Function to check for the existence of a directory on the HDFS.
Definition: hadooputil.py:73
A simple class interface for running hadoop commands.
Definition: hadooputil.py:42
def readCommand
Definition: pyutil.py:50
def CheckHDFSFile
Function to check for a file on HDFS.
Definition: hadooputil.py:86
def printError
Definition: pyutil.py:84
def CheckInputFile
Check for an input file and prepare it for MR processing.
Definition: hadooputil.py:102
def printInfo
Definition: pyutil.py:78