Hadoop 是Google MapReduce的一個(gè)Java實(shí)現(xiàn)。MapReduce是一種簡(jiǎn)化的分布式編程模式,讓程序自動(dòng)分布到一個(gè)由普通機(jī)器組成的超大集群上并發(fā)執(zhí)行。就如同java程序員可以不考慮內(nèi)存泄露一樣, MapReduce的run-time系統(tǒng)會(huì)解決輸入數(shù)據(jù)的分布細(xì)節(jié),跨越機(jī)器集群的程序執(zhí)行調(diào)度,處理機(jī)器的失效,并且管理機(jī)器之間的通訊請(qǐng)求。這樣的模式允許程序員可以不需要有什么并發(fā)處理或者分布式系統(tǒng)的經(jīng)驗(yàn),就可以處理超大的分布式系統(tǒng)得資源。
一、概論
作為Hadoop程序員,他要做的事情就是:
1、定義Mapper,處理輸入的Key-Value對(duì),輸出中間結(jié)果。
2、定義Reducer,可選,對(duì)中間結(jié)果進(jìn)行規(guī)約,輸出最終結(jié)果。
3、定義InputFormat 和OutputFormat,可選,InputFormat將每行輸入文件的內(nèi)容轉(zhuǎn)換為Java類(lèi)供Mapper函數(shù)使用,不定義時(shí)默認(rèn)為String。
4、定義main函數(shù),在里面定義一個(gè)Job并運(yùn)行它。
然后的事情就交給系統(tǒng)了。
1.基本概念:Hadoop的HDFS實(shí)現(xiàn)了google的GFS文件系統(tǒng),NameNode作為文件系統(tǒng)的負(fù)責(zé)調(diào)度運(yùn)行在master,DataNode運(yùn)行在每個(gè)機(jī)器上。同時(shí)Hadoop實(shí)現(xiàn)了Google的MapReduce,JobTracker作為MapReduce的總調(diào)度運(yùn)行在master,TaskTracker則運(yùn)行在每個(gè)機(jī)器上執(zhí)行Task。
2.main()函數(shù),創(chuàng)建JobConf,定義Mapper,Reducer,Input/OutputFormat 和輸入輸出文件目錄,最后把Job提交給JobTracker,等待Job結(jié)束。
3.JobTracker,創(chuàng)建一個(gè)InputFormat的實(shí)例,調(diào)用它的getSplits()方法,把輸入目錄的文件拆分成FileSplist作為Mapper task 的輸入,生成Mapper task加入Queue。
4.TaskTracker 向 JobTracker索求下一個(gè)Map/Reduce。
Mapper Task先從InputFormat創(chuàng)建RecordReader,循環(huán)讀入FileSplits的內(nèi)容生成Key與Value,傳給Mapper函數(shù),處理完后中間結(jié)果寫(xiě)成SequenceFile.
Reducer Task 從運(yùn)行Mapper的TaskTracker的Jetty上使用http協(xié)議獲取所需的中間內(nèi)容(33%),Sort/Merge后(66%),執(zhí)行Reducer函數(shù),最后按照OutputFormat寫(xiě)入結(jié)果目錄。
TaskTracker 每10秒向JobTracker報(bào)告一次運(yùn)行情況,每完成一個(gè)Task10秒后,就會(huì)向JobTracker索求下一個(gè)Task。
Nutch項(xiàng)目的全部數(shù)據(jù)處理都構(gòu)建在Hadoop之上,詳見(jiàn)Scalable Computing with Hadoop。
二、程序員編寫(xiě)的代碼
我們做一個(gè)簡(jiǎn)單的分布式的Grep,簡(jiǎn)單對(duì)輸入文件進(jìn)行逐行的正則匹配,如果符合就將該行打印到輸出文件。因?yàn)槭呛?jiǎn)單的全部輸出,所以我們只要寫(xiě)Mapper函數(shù),不用寫(xiě)Reducer函數(shù),也不用定義Input/Output Format。
package demo.hadoop
public class HadoopGrep {
public static class RegMapper extends MapReduceBase implements Mapper {
private Pattern pattern;
public void configure(JobConf job) {
pattern = Pattern.compile(job.get( " mapred.mapper.regex " ));
}
public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)
throws IOException {
String text = ((Text) value).toString();
Matcher matcher = pattern.matcher(text);
if (matcher.find()) {
output.collect(key, value);
}
}
}
private HadoopGrep () {
} // singleton
public static void main(String[] args) throws Exception {
JobConf grepJob = new JobConf(HadoopGrep. class );
grepJob.setJobName( " grep-search " );
grepJob.set( " mapred.mapper.regex " , args[ 2 ]);
grepJob.setInputPath( new Path(args[ 0 ]));
grepJob.setOutputPath( new Path(args[ 1 ]));
grepJob.setMapperClass(RegMapper. class );
grepJob.setReducerClass(IdentityReducer. class );
JobClient.runJob(grepJob);
}
}
RegMapper類(lèi)的configure()函數(shù)接受由main函數(shù)傳入的查找字符串,map() 函數(shù)進(jìn)行正則匹配,key是行數(shù),value是文件行的內(nèi)容,符合的文件行放入中間結(jié)果。
main()函數(shù)定義由命令行參數(shù)傳入的輸入輸出目錄和匹配字符串,Mapper函數(shù)為RegMapper類(lèi),Reduce函數(shù)是什么都不做,直接把中間結(jié)果輸出到最終結(jié)果的的IdentityReducer類(lèi),運(yùn)行Job。
整個(gè)代碼非常簡(jiǎn)單,絲毫沒(méi)有分布式編程的任何細(xì)節(jié)。
三.運(yùn)行Hadoop程序
Hadoop這方面的文檔寫(xiě)得不全面,綜合參考GettingStartedWithHadoop 與Nutch Hadoop Tutorial 兩篇后,再碰了很多釘子才終于完整的跑起來(lái)了,記錄如下:
3.1 local運(yùn)行模式
完全不進(jìn)行任何分布式計(jì)算,不動(dòng)用任何namenode,datanode的做法,適合一開(kāi)始做調(diào)試代碼。
解壓hadoop,其中conf目錄是配置目錄,hadoop的配置文件在hadoop-default.xml,如果要修改配置,不是直接修改該文件,而是修改hadoop-site.xml,將該屬性在hadoop-site.xml里重新賦值。
hadoop-default.xml的默認(rèn)配置已經(jīng)是local運(yùn)行,不用任何修改,配置目錄里唯一必須修改的是hadoop-env.sh 里JAVA_HOME的位置。
將編譯好的HadoopGrep與RegMapper.class 放入hadoop/build/classes/demo/hadoop/目錄 找一個(gè)比較大的log文件放入一個(gè)目錄,然后運(yùn)行
hadoop / bin / hadoop demo.hadoop.HadoopGrep log文件所在目錄 任意的輸出目錄 grep的字符串
查看輸出目錄的結(jié)果,查看hadoop/logs/里的運(yùn)行日志。
在重新運(yùn)行前,先刪掉輸出目錄。
3.2 單機(jī)集群運(yùn)行模式
現(xiàn)在來(lái)搞一下只有單機(jī)的集群.假設(shè)以完成3.1中的設(shè)置,本機(jī)名為hadoopserver
第1步. 然后修改hadoop-site.xml ,加入如下內(nèi)容:
< property >
< name > fs.default.name </ name >
< value > hadoopserver:9000 </ value >
</ property >
< property >
< name > mapred.job.tracker </ name >
< value > hadoopserver:9001 </ value >
</ property >
< property >
< name > dfs.replication </ name >
< value > 1 </ value >
</ property >
從此就將運(yùn)行從local文件系統(tǒng)轉(zhuǎn)向了hadoop的hdfs系統(tǒng),mapreduce的jobtracker也從local的進(jìn)程內(nèi)操作變成了分布式的任務(wù)系統(tǒng),9000,9001兩個(gè)端口號(hào)是隨便選擇的兩個(gè)空余端口號(hào)。
另外,如果你的/tmp目錄不夠大,可能還要修改hadoop.tmp.dir屬性。
第2步. 增加ssh不輸入密碼即可登陸。
因?yàn)镠adoop需要不用輸入密碼的ssh來(lái)進(jìn)行調(diào)度,在不su的狀態(tài)下,在自己的home目錄運(yùn)行ssh-keygen -t rsa ,然后一路回車(chē)生成密鑰,再進(jìn)入.ssh目錄,cp id_rsa.pub authorized_keys
詳細(xì)可以man 一下ssh, 此時(shí)執(zhí)行ssh hadoopserver,不需要輸入任何密碼就能進(jìn)入了。
3.格式化namenode,執(zhí)行
bin/hadoop namenode -format
4.啟動(dòng)Hadoop
執(zhí)行hadoop/bin/start-all.sh, 在本機(jī)啟動(dòng)namenode,datanode,jobtracker,tasktracker
5.現(xiàn)在將待查找的log文件放入hdfs,。
執(zhí)行hadoop/bin/hadoop dfs 可以看到它所支持的文件操作指令。
執(zhí)行hadoop/bin/hadoop dfs put log文件所在目錄 in ,則log文件目錄已放入hdfs的/user/user-name/in 目錄中
6.現(xiàn)在來(lái)執(zhí)行Grep操作
hadoop/bin/hadoop demo.hadoop.HadoopGrep in out
查看hadoop/logs/里的運(yùn)行日志,重新執(zhí)行前。運(yùn)行hadoop/bin/hadoop dfs rmr out 刪除out目錄。
7.運(yùn)行hadoop/bin/stop-all.sh 結(jié)束
3.3 集群運(yùn)行模式
假設(shè)已執(zhí)行完3.2的配置,假設(shè)第2臺(tái)機(jī)器名是hadoopserver2
1.創(chuàng)建與hadoopserver同樣的執(zhí)行用戶,將hadoop解壓到相同的目錄。
2.同樣的修改haoop-env.sh中的JAVA_HOME 及修改與3.2同樣的hadoop-site.xml
3. 將hadoopserver中的/home/username/.ssh/authorized_keys 復(fù)制到hadoopserver2,保證hadoopserver可以無(wú)需密碼登陸hadoopserver2
scp /home/username/.ssh/authorized_keys username@hadoopserver2:/home/username/.ssh/authorized_keys
4.修改hadoop-server的hadoop/conf/slaves文件, 增加集群的節(jié)點(diǎn),將localhost改為
hadoop-server
hadoop-server2
5.在hadoop-server執(zhí)行hadoop/bin/start-all.sh
將會(huì)在hadoop-server啟動(dòng)namenode,datanode,jobtracker,tasktracker
在hadoop-server2啟動(dòng)datanode 和tasktracker
6.現(xiàn)在來(lái)執(zhí)行Grep操作
hadoop/bin/hadoop demo.hadoop.HadoopGrep in out
重新執(zhí)行前,運(yùn)行hadoop/bin/hadoop dfs rmr out 刪除out目錄
7.運(yùn)行hadoop/bin/stop-all.sh 結(jié)束。
四、效率
經(jīng)測(cè)試,Hadoop并不是萬(wàn)用靈丹,很取決于文件的大小和數(shù)量,處理的復(fù)雜度以及群集機(jī)器的數(shù)量,相連的帶寬,當(dāng)以上四者并不大時(shí),hadoop優(yōu)勢(shì)并不明顯。
比如,不用hadoop用java寫(xiě)的簡(jiǎn)單grep函數(shù)處理100M的log文件只要4秒,用了hadoop local的方式運(yùn)行是14秒,用了hadoop單機(jī)集群的方式是30秒,用雙機(jī)集群10M網(wǎng)口的話更慢,慢到不好意思說(shuō)出來(lái)的地步。