43 def __init__(self, hbasedir, streamingloc, minsplitsize, tasktimeout, libpath):
46 if (int(minsplitsize) > 0):
47 self.
hadoopmr_ +=
" -Dmapred.min.split.size=" + str(minsplitsize)
48 if (int(tasktimeout) >= 0):
49 self.
hadoopmr_ +=
" -Dmapred.task.timeout=" + str(tasktimeout)
57 self.
hadoopmove_ = hbasedir +
"/bin/hadoop fs -moveFromLocal "
104 if (inputfile.endswith(
".gz")
and uncompress):
105 input_filename = os.path.basename(inputfile).replace(
".gz",
"")
107 input_filename = os.path.basename(inputfile)
111 hdfsinputfile = hdfsinputdir +
"/" + input_filename
116 if (inputfile.endswith(
".gz")
and uncompress):
117 new_input = outputdir +
"/" + input_filename
118 unzipcmd =
"gunzip -c " + inputfile +
" > " + new_input
122 input_file_list +=
" --input " + hdfsinputdir +
"/" + input_filename
125 input_file_list +=
" --input " + hdfsinputdir +
"/" + input_filename
129 input_file_list +=
" --input " + hdfsinputdir +
"/" + input_filename
130 pyutil.printDebug(5,
"Found file on HDFS: " + hdfsinputdir +
"/" + input_filename)
131 return input_file_list
134 catcmd = self.
hadoopcat_ +
" " + hdfsfiles +
" | " + pipecmd
139 catcmd = self.
hadoopcat_ +
" " + hdfsfiles +
" | " + pipecmd
151 def RunMR(self, input_files, outputdir, reduce_tasks, reducer, mapper, mroptions):
154 mr_str += mroptions +
" "
157 mr_str +=
" -numReduceTasks 0 --reducer None "
160 if (int(reduce_tasks) >= 0):
161 mr_str +=
" -numReduceTasks " + str(reduce_tasks)
162 mr_str +=
" --reducer " + reducer
163 mr_str +=
" --output " + outputdir
164 mr_str +=
" --mapper " + mapper
def RunMR
RunMR Run a MapReduce.
def CheckHDir
Check if a directory exists on HDFS.
def CheckRemoveHDir
Function to check for the existence of a directory on the HDFS.
A simple class interface for running hadoop commands.
def CheckHDFSFile
Function to check for a file on HDFS.
def CheckInputFile
Check for an input file and prepare it for MR processing.