hadoop学习心得

时间:2019-05-12 11:25:48下载本文作者:会员上传
简介:写写帮文库小编为你整理了多篇相关的《hadoop学习心得》,但愿对你工作学习有帮助,当然你在写写帮文库还可以找到更多《hadoop学习心得》。

第一篇:hadoop学习心得

1.FileInputFormat splits only large files.Here “large” means larger than an HDFS block.The split size is normally the size of an HDFS block, which is appropriate for most applications;however,it is possible to control this value by setting various Hadoop properties.2.So the split size is blockSize.3.Making the minimum split size greater than the block size increases the split size, but at the cost of locality.4.One reason for this is that FileInputFormat generates splits in such a way that each split is all or part of a single file.If the file is very small(“small” means significantly smaller than an HDFS block)and there are a lot of them, then each map task will process very little input, and there will be a lot of them(one per file), each of which imposes extra bookkeeping overhead.hadoop处理大量小数据文件效果不好:

hadoop对数据的处理是分块处理的,默认是64M分为一个数据块,如果存在大量小数据文件(例如:2-3M一个的文件)这样的小数据文件远远不到一个数据块的大小就要按一个数据块来进行处理。

这样处理带来的后果由两个:1.存储大量小文件占据存储空间,致使存储效率不高检索速度也比大文件慢。

2.在进行MapReduce运算的时候这样的小文件消费计算能力,默认是按块来分配Map任务的(这个应该是使用小文件的主要缺点)

那么如何解决这个问题呢?

1.使用Hadoop提供的Har文件,Hadoop命令手册中有可以对小文件进行归档。2.自己对数据进行处理,把若干小文件存储成超过64M的大文件。

FileInputFormat is the base class for all implementations of InputFormat that use files as their data source(see Figure 7-2).It provides two things: a place to define which files are included as the input to a job, and an implementation for generating splits for the input files.The job of dividing splits into records is performed by subclasses.An InputSplit has a length in bytes, and a set of storage locations, which are just hostname strings.Notice that a split doesn’t contain the input data;it is just a reference to the data.As a MapReduce application writer, you don’t need to deal with InputSplits directly, as they are created by an InputFormat.An InputFormat is responsible for creating the input splits, and dividing them into records.Before we see some concrete examples of InputFormat, let’s briefly examine how it is used in MapReduce.Here’s the interface:

public interface InputFormat { InputSplit[] getSplits(JobConf job, int numSplits)throws IOException;RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter)throws IOException;}

The JobClient calls the getSplits()method.On a tasktracker, the map task passes the split to the getRecordReader()method on InputFormat to obtain a RecordReader for that split.A related requirement that sometimes crops up is for mappers to have access to the full contents of a file.Not splitting the file gets you part of the way there, but you also need to have a RecordReader that delivers the file contents as the value of the record.One reason for this is that FileInputFormat generates splits in such a way that each split is all or part of a single file.If the file is very small(“small” means significantly smaller than an HDFS block)and there are a lot of them, then each map task will process very little input, and there will be a lot of them(one per file), each of which imposes extra bookkeeping overhead.Example 7-2.An InputFormat for reading a whole file as a record public class WholeFileInputFormat extends FileInputFormat { @Override protected boolean isSplitable(FileSystem fs, Path filename){ return false;} @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter)throws IOException { return new WholeFileRecordReader((FileSplit)split, job);} } We implement getRecordReader()to return a custom implementation of RecordReader.Example 7-3.The RecordReader used by WholeFileInputFormat for reading a whole file as a record class WholeFileRecordReader implements RecordReader { private FileSplit fileSplit;private Configuration conf;private boolean processed = false;public WholeFileRecordReader(FileSplit fileSplit, Configuration conf)throws IOException { this.fileSplit = fileSplit;this.conf = conf;} @Override public NullWritable createKey(){ return NullWritable.get();} @Override public BytesWritable createValue(){ return new BytesWritable();} @Override public long getPos()throws IOException { return processed ? fileSplit.getLength(): 0;} @Override public float getProgress()throws IOException { return processed ? 1.0f : 0.0f;} @Override public boolean next(NullWritable key, BytesWritable value)throws IOException { if(!processed){ byte[] contents = new byte[(int)fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try { in = fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally { IOUtils.closeStream(in);} processed = true;return true;} return false;} @Override public void close()throws IOException { // do nothing } }

Input splits are represented by the Java interface, InputSplit(which, like all of the classes mentioned in this section, is in the org.apache.hadoop.mapred package†): public interface InputSplit extends Writable { long getLength()throws IOException;String[] getLocations()throws IOException;}

An InputSplit has a length in bytes, and a set of storage locations, which are just hostname strings.Notice that a split doesn’t contain the input data;it is just a reference to the data.The storage locations are used by the MapReduce system to place map tasks as close to the split’s data as possible, and the size is used to order the splits so that the largest get processed first, in an attempt to minimize the job runtime(this is an instance of a greedy approximation algorithm).As a MapReduce application writer, you don’t need to deal with InputSplits directly, as they are created by an InputFormat.An InputFormat is responsible for creating the input splits, and dividing them into records.Before we see some concrete examples of InputFormat, let’s briefly examine how it is used in MapReduce.Here’s the interface:

public interface InputFormat { InputSplit[] getSplits(JobConf job, int numSplits)throws IOException;RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter)throws IOException;}

Having calculated the splits, the client sends them to the jobtracker, which uses their storage locations to schedule map tasks to process them on the tasktrackers.A path may represent a file, a directory, or, by using a glob, a collection of files and directories.A path representing a directory includes all the files in the directory as input to the job.See “File patterns” on page 60 for more on using globs.It is a common requirement to process sets of files in a single operation.For example, a MapReduce job for log processing might analyze a month worth of files, contained in a number of directories.Rather than having to enumerate each file and directory to specify the input, it is convenient to use wildcard characters to match multiple files with a single expression, an operation that is known as globbing.Hadoop provides two FileSystem methods for processing globs: public FileStatus[] globStatus(Path pathPattern)throws IOException public FileStatus[] globStatus(Path pathPattern, PathFilter filter)throws IOException

第二篇:Hadoop之JobTrack分析

Hadoop之JobTrack分析

1.client端指定Job的各种参数配置之后调用job.waitForCompletion(true)方法提交Job给JobTracker,等待Job 完成。

[java] view plaincopyprint?

1.public void submit()throws IOException, InterruptedException, 2.ClassNotFoundException { 3.ensureState(JobState.DEFINE);//检查JobState状态

4.setUseNewAPI();//检查及设置是否使用新的MapReduce API

5.6.// Connect to the JobTracker and submit the job

7.connect();//链接JobTracker

8.info = jobClient.submitJobInternal(conf);//将job信息提交

9.super.setJobID(info.getID());

10.state = JobState.RUNNING;//更改job状态

11.}

以上代码主要有两步骤,连接JobTracker并提交Job信息。connect方法主要是实例化JobClient对象,包括设置JobConf和init工作:

[java] view plaincopyprint?

1.public void init(JobConf conf)throws IOException {

2.String tracker = conf.get(“mapred.job.tracker”, “local”);//读取配置文件信息用于判断该Job是运行于本地单机模式还是分布式模式

3.tasklogtimeout = conf.getInt(4.TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);5.this.ugi = UserGroupInformation.getCurrentUser();

6.if(“local”.equals(tracker)){//如果是单机模式,new LocalJobRunner

7.conf.setNumMapTasks(1);

8.this.jobSubmitClient = new LocalJobRunner(conf);9.} else {

10.this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);

11.} 12.}

分布式模式下就会创建一个RPC代理链接:

[java] view plaincopyprint?

1.public static VersionedProtocol getProxy(2.Class protocol,3.long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,4.Configuration conf, SocketFactory factory, int rpcTimeout)throws IOException { 5.6.if(UserGroupInformation.isSecurityEnabled()){ 7.SaslRpcServer.init(conf);8.}

9.VersionedProtocol proxy =

10.(VersionedProtocol)Proxy.newProxyInstance(11.protocol.getClassLoader(), new Class[] { protocol },12.new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));

13.long serverVersion = proxy.getProtocolVersion(protocol.getName(), 14.clientVersion);15.if(serverVersion == clientVersion){ 16.return proxy;17.} else {

18.throw new VersionMismatch(protocol.getName(), clientVersion, 19.serverVersion);20.} 21.}

从上述代码可以看出hadoop实际上使用了Java自带的Proxy API来实现Remote Procedure Call 初始完之后,需要提交job [java] view plaincopyprint?

1.info = jobClient.submitJobInternal(conf);//将job信息提交

submit方法做以下几件事情:

1.将conf中目录名字替换成hdfs代理的名字

2.检查output是否合法:比如路径是否已经存在,是否是明确的3.将数据分成多个split并放到hdfs上面,写入job.xml文件

4.调用JobTracker的submitJob方法

该方法主要新建JobInProgress对象,然后检查访问权限和系统参数是否满足job,最后addJob:

[java] view plaincopyprint?

1.private synchronized JobStatus addJob(JobID jobId, JobInProgress job)2.throws IOException { 3.totalSubmissions++;4.5.synchronized(jobs){

6.synchronized(taskScheduler){

7.jobs.put(job.getProfile().getJobID(), job);

8.for(JobInProgressListener listener : jobInProgressListeners){ 9.listener.jobAdded(job);10.} 11.} 12.}

13.myInstrumentation.submitJob(job.getJobConf(), jobId);14.job.getQueueMetrics().submitJob(job.getJobConf(), jobId);15.16.LOG.info(“Job ” + jobId + “ added successfully for user '”

17.+ job.getJobConf().getUser()+ “' to queue '”

18.+ job.getJobConf().getQueueName()+ “'”);19.AuditLogger.logSuccess(job.getUser(),20.Operation.SUBMIT_JOB.name(), jobId.toString());21.return job.getStatus();22.}

totalSubmissions记录client端提交job到JobTracker的次数。而jobs则是JobTracker所有可以管理的job的映射表

Map jobs = Collections.synchronizedMap(new TreeMap());taskScheduler是用于调度job先后执行策略的,其类图如下所示:

hadoop job调度机制; public enum SchedulingMode { FAIR, FIFO } 1.公平调度FairScheduler 对于每个用户而言,分布式资源是公平分配的,每个用户都有一个job池,假若某个用户目前所占有的资源很多,对于其他用户而言是不公平的,那么调度器就会杀掉占有资源多的用户的一些task,释放资源供他人使用 2.容量调度JobQueueTaskScheduler 在分布式系统上维护多个队列,每个队列都有一定的容量,每个队列中的job按照FIFO的策略进行调度。队列中可以包含队列。

两个Scheduler都要实现TaskScheduler的public synchronized List assignTasks(TaskTracker tracker)方法,该方法通过具体的计算生成可以分配的task

接下来看看JobTracker的工作: 记录更新JobTracker重试的次数:

[java] view plaincopyprint?

1.while(true){ 2.try {

3.recoveryManager.updateRestartCount();4.break;

5.} catch(IOException ioe){

6.LOG.warn(“Failed to initialize recovery manager.”, ioe);7.// wait for some time

8.Thread.sleep(FS_ACCESS_RETRY_PERIOD);9.LOG.warn(“Retrying...”);10.} 11.}

启动Job调度器,默认是FairScheduler: taskScheduler.start();主要是初始化一些管理对象,比如job pool管理池

[java] view plaincopyprint?

1.// Initialize other pieces of the scheduler

2.jobInitializer = new JobInitializer(conf, taskTrackerManager);3.taskTrackerManager.addJobInProgressListener(jobListener);4.poolMgr = new PoolManager(this);5.poolMgr.initialize();

6.loadMgr =(LoadManager)ReflectionUtils.newInstance(7.conf.getClass(“mapred.fairscheduler.loadmanager”, 8.CapBasedLoadManager.class, LoadManager.class), conf);9.loadMgr.setTaskTrackerManager(taskTrackerManager);10.loadMgr.setEventLog(eventLog);11.loadMgr.start();

12.taskSelector =(TaskSelector)ReflectionUtils.newInstance(13.conf.getClass(“mapred.fairscheduler.taskselector”, 14.DefaultTaskSelector.class, TaskSelector.class), conf);15.taskSelector.setTaskTrackerManager(taskTrackerManager);16.taskSelector.start();

[java] view plaincopyprint?

1.JobInitializer有一个确定大小的ExecutorService threadPool,每个thread用于初始化job

[java] view plaincopyprint?

1.try {

2.JobStatus prevStatus =(JobStatus)job.getStatus().clone();3.LOG.info(“Initializing ” + job.getJobID());4.job.initTasks();

5.// Inform the listeners if the job state has changed 6.// Note : that the job will be in PREP state.7.JobStatus newStatus =(JobStatus)job.getStatus().clone();8.if(prevStatus.getRunState()!= newStatus.getRunState()){ 9.JobStatusChangeEvent event =

10.new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,11.newStatus);

12.synchronized(JobTracker.this){ 13.updateJobInProgressListeners(event);14.} 15.} 16.}

初始化操作主要用于初始化生成tasks然后通知其他的监听者执行其他操作。initTasks主要处理以下工作:

[java] view plaincopyprint?

1.// 记录用户提交的运行的job信息

2.try {

3.userUGI.doAs(new PrivilegedExceptionAction(){ 4.@Override

5.public Object run()throws Exception {

6.JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 7.startTimeFinal, hasRestarted());8.return null;9.} 10.});

11.} catch(InterruptedException ie){ 12.throw new IOException(ie);13.} 14.15.// 设置并记录job的优先级

16.setPriority(this.priority);17.18.//

19.//生成每个Task需要的密钥

20.//

21.generateAndStoreTokens();22.然后读取JobTracker split的数据的元信息,元信息包括以下属性信息:

[java] view plaincopyprint?

1.private TaskSplitIndex splitIndex;//洗牌后的索引位置

2.private long inputDataLength;//洗牌后数据长度 3.private String[] locations;//数据存储位置

然后根据元信息的长度来计算numMapTasks并校验数据存储地址是否可以连接 接下来生成map tasks和reducer tasks:

[java] view plaincopyprint?

1.maps = new TaskInProgress[numMapTasks];2.for(int i=0;i < numMapTasks;++i){

3.inputLength += splits[i].getInputDataLength();4.maps[i] = new TaskInProgress(jobId, jobFile, 5.splits[i],6.jobtracker, conf, this, i, numSlotsPerMap);

7.}

[java] view plaincopyprint?

1.this.jobFile = jobFile;2.this.splitInfo = split;3.this.jobtracker = jobtracker;4.this.job = job;5.this.conf = conf;

6.this.partition = partition;

7.this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);8.this.numSlotsRequired = numSlotsRequired;9.setMaxTaskAttempts();10.init(jobid);

以上除了task对应的jobTracker,split信息和job信息外,还设置了

[java] view plaincopyprint?

1.maxSkipRecords---记录task执行的时候最大可以跳过的错误记录数;

2.

setMaxTaskAttempts--设置task最多可以执行的次数。当一个task执行两次都失败了之后,会以skip mode模式再重新执行一次,记录那些bad record,3.然后第四次再执行的时候,跳过这些bad records 4.

5.新建reducer task的过程也很类似。

6.7.


8.

9.


10.

11.


12.

13.


14.

15.

第三篇:Hadoop常见错误总结

Hadoop常见错误总结 2010-12-30 13:55 错误1:bin/hadoop dfs 不能正常启动,持续提示:

INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:9000.Already tried 0 time(s).原因:由于 dfs 的部分文件默认保存在tmp文件夹,在系统重启时被删除。解决:修改core-site.xml 的 hadoop.tmp.dir配置文件路径:/home/hadoop/tmp。

错误2:hadoop出现了一些问题。用$ bin/hadoop dfsadmin-report 测试的时候,发现dfs没有加载。显示如下:

Configured Capacity: 0(0 KB)Present Capacity: 0(0 KB)DFS Remaining: 0(0 KB)DFS Used: 0(0 KB)DFS Used%: ?% Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 查看日志:

ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible namespaceIDs in /home/hadoop/data: namenode namespaceID = 2033006627;datanode namespaceID = 1589898341 经分析,是由于namenode namespaceID = 2033006627;和datanode namespaceID = 1589898341 不一致造成原因。

修改了namenode namespaceID = 1589898341 可以使用,但是重启之后,又不可以用了。

最后解决方案:删除hadoop用户下的name文件夹,data文件夹,tmp文件夹,temp文件里的内容,然后重新执行namenode命令。重启电脑之后,正常。

错误3:File /home/hadoop/tmp/mapred/system/jobtracker.info could only be replicated to 0 nodes, instead of 1 出现此错误,一般发生在datanode与namenode还没有进行连接,就开始往hdfs系统上put数据了。稍等待一会,就可以了。

也可以使用:hadoop dfsadmin –report命令查看集群的状态。错误4:

每次启动总有部分datanade不能去全部启动,查看日志文件,显示为: ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.net.UnknownHostException: zgchen-ubutun: zgchen-ubutun at java.net.InetAddress.getLocalHost(InetAddress.java:1426)。分析:这是由于datanode 找不到服务host引起的。

解决:通过查找/etc/hostname 找到hostname;比如:ubuntu。然后找到/etc/hosts,添加:127.0.1.1 ubuntu 错误5:

java.lang.OutOfMemoryError: GC overhead limit exceeded 分析:这个是JDK6新添的错误类型。是发生在GC占用大量时间为释放很小空间的时候发生的,是一种保护机制。解决方案是,关闭该功能,可以添加JVM的启动参数来限制使用内存:-XX:-UseGCOverheadLimit 添加位置是:mapred-site.xml 里新增项:mapred.child.java.opts 内容:-XX:-UseGCOverheadLimit java.lang.OutOfMemoryError: Java heap space 出现这种异常,明显是jvm内存不够得原因,要修改所有的datanode的jvm内存大小。

Java-Xms1024m-Xmx4096m 一般jvm的最大内存使用应该为总内存大小的一半,我们使用的8G内存,所以设置为4096m,这一值可能依旧不是最优的值。(其实对于最好设置为真实物理内存大小的0.8)

错误6:Too many fetch-failures Answer: 出现这个问题主要是结点间的连通不够全面。1)检查、/etc/hosts 要求本机ip 对应 服务器名

要求要包含所有的服务器ip + 服务器名 2)检查.ssh/authorized_keys 要求包含所有服务器(包括其自身)的public key 错误7:处理速度特别的慢 出现map很快 但是reduce很慢 而且反复出现 reduce=0% Answer: 结合第二点,然后修改可用内存大小。

conf/hadoop-env.sh 中的export HADOOP_HEAPSIZE=4000 错误8:能够启动datanode,但无法访问,也无法结束的错误

在重新格式化一个新的分布式文件时,需要将你NameNode上所配置的dfs.name.dir这一namenode用来存放NameNode 持久存储名字空间及事务日志的本地文件系统路径删除,同时将各DataNode上的dfs.data.dir的路径 DataNode 存放块数据的本地文件系统路径的目录也删除。如本此配置就是在NameNode上删除/home/hadoop/NameData,在DataNode上删除/home/hadoop/DataNode1和/home/hadoop/DataNode2。这是因为Hadoop在格式化一个新的分布式文件系统时,每个存储的名字空间都对应了建立时间的那个版本(可以查看/home/hadoop /NameData/current目录下的VERSION文件,上面记录了版本信息),在重新格式化新的分布式系统文件时,最好先删除NameData 目录。必须删除各DataNode的dfs.data.dir。这样才可以使namedode和datanode记录的信息版本对应。

注意:删除是个很危险的动作,不能确认的情况下不能删除!做好删除的文件等通通备份!

错误9:java.io.IOException: Could not obtain block: blk_***469_1100 file=/user/hive/warehouse/src_20100924_log/src_20100924_log 出现这种情况大多是结点断了,没有连接上。或者

mapred.tasktracker.map.tasks.maximum 的设置 超过 cpu cores数目,导致出现获取不到文件。

错误10:Task Id : attempt_201010291615_0001_m_000234_0, Status : FAILED Error: java.io.IOException: No space left on device Task Id : attempt_201010291615_0001_m_000240_0, Status : FAILED java.io.IOException: Spill failed 磁盘空间不够,应该分析磁盘空间df-h 检查是否还存在磁盘空间。错误11:Task Id : attempt_201011011336_0007_m_000001_0, Status : FAILED org.apache.hadoop.hbase.client.RegionOfflineException: region offline: lm,1288597709144 网上说,将/hbase删除;重启hbase后,可以正常应用了。但是我找不到/hbase目录,只好自己重新删除掉一些hadoop文件,重新生成文件管理系统。

还有一个可能是,配置错了/hbase/conf/hbase-env.sh的HBASE_CLASSPATH,这个默认是不配置的,所以可以不配置。

错误12:org.apache.hadoop.hbase.TableNotFoundException: org.apache.hadoop.hbase.TableNotFoundException: lm 找不到表,hbase启动了,检查一下是否存在需要的Htable。

第四篇:Hadoop运维工程师岗位职责简洁版

Hadoop运维工程师岗位职责简洁版

1.负责Hadoop相关项目日常运行维护、故障排查工作;

2.负责Hadoop集群的监控和配置调优工作;

3.负责Hadoop平台的用户管理、权限分配、资源分配;

4.负责集群服务器软件的安装、维护、部署、更新;

Hadoop运维工程师岗位职责(二)

1.负责高并发,大存储和实时流的Hadoop/spark大数据平台规划,运维,监控和优化工作;

2.保证Hadoop/spark平台各核心服务运行的稳定、高效;

___对Hadoop/spark平台运维不断优化,提升数据产品的质量和响应速度;

4.开发各种Hadoop大数据自动化运维与监控工具;

___平台大数据环境的部署维护和技术支持;

6.应用故障的处理跟踪及统计汇总分析;

7.应用安全,数据的日常备份和应急恢复。

Hadoop运维工程师岗位职责(三)

1、负责移动大数据中心的建设与维护;

2、负责安徽移动hadoop集群的故障排查、解决;

3、其他省份hadoop项目建设提供技术支持。

Hadoop运维工程师岗位职责(四)

1.负责Hadoop及相关组件的部署和维护,保证其稳定运行;

2.开发和使用Hadoop大数据自动化运维与监控工具;

3.基于大数据应用需求,不断调整和优化Hadoop框架及组件的配置,提升性能;

4.为数据仓库、数据挖掘建模等数据应用项目提供运行环境支持;

Hadoop运维工程师岗位职责(五)

1.hadoop集群运维、监控、开发;

2.hadoop生态圈框架部署、故障处理、异常排查等基本工作;

___对有助于提升集群处理能力、高可用性、扩展性的各种解决方案进行跟踪和落地;

4.解决海量数据不断增长面临的挑战,解决业务需求;

5、对Hadoop集群进行日常管理,保障系统正常运行;

第五篇:在三台虚拟机上部署多节点Hadoop

cd /etc/network vi interfaces 进入vi后,编辑 auto eth0 iface eth0 inet static address IP地址 netmask 掩码 gateway 网关

重启SSH:

serivce ssh restart 重启网卡: /etc/init.d/networking restart 取得root权限:

sudo passwd su 1.硬件环境

共有3台机器,均使用的linux系统,Java使用的是openjdk-6-jdk sudo apt-get update sudo apt-get install openjdk-6-jdk 修改主机名:

su vi /etc/hostname IP配置如下:

hadoop1:192.168.75.132(NameNode)hadoop2:192.168.75.131(DataNode)hadoop3:192.168.75.133(DataNode)这里有一点需要强调的就是,务必要确保每台机器的主机名和IP地址之间能正确解析。一个很简单的测试办法就是ping一下主机名,比如在hadoop1上ping hadoop2,如果能ping通就OK!若不能正确解析,可以修改/etc/hosts文件,如果该台机器作Namenode用,则需要在hosts文件中加上集群中所有机器的IP地址及其对应的主机名;如果该台机器作Datanode用,则只需要在hosts文件中加上本机IP地址和Namenode机器的IP地址。

以本文为例,hadoop1(NameNode)中的/etc/hosts文件看起来应该是这样的:

vi /etc/hosts 127.0.0.1 hadoop1 localhost 192.168.75.132 hadoop1 hadoop1 192.168.75.131 hadoop2 hadoop2 192.168.75.133 hadoop3 hadoop3 hadoop2(DataNode)中的/etc/hosts文件看起来就应该是这样的: 127.0.0.1 hadoop2 localhost 192.168.75.132 hadoop1 hadoop1 192.168.75.131 hadoop2 hadoop2 hadoop3(DataNode)中的/etc/hosts文件看起来就应该是这样的: 127.0.0.1 hadoop3 localhost 192.168.75.132 hadoop1 hadoop1 192.168.75.133 hadoop3 hadoop3 对于Hadoop来说,在HDFS看来,节点分为Namenode和Datanode,其中Namenode只有一个,Datanode可以是很多;在MapReduce看来,节点又分为Jobtracker和Tasktracker,其中Jobtracker只有一个,Tasktracker可以是很多。我是将namenode和jobtracker部署在hadoop1上,hadoop2, hadoop3作为datanode和tasktracker。当然你也可以将namenode,datanode,jobtracker,tasktracker全部部署在一台机器上。

2.目录结构

由于Hadoop要求所有机器上hadoop的部署目录结构要相同,并且都有一个相同的用户名的帐户。

addgroup hadoop adduser--ingroup hadoop hadoop 三台机器上是这样的:都有一个hadoop的帐户,主目录是/home/hadoop。

下载hadoop:

wget http://mirror.bjtu.edu.cn/apache/hadoop/core/stable/hadoop-0.20.203.0rc1.tar.gz 解压:[hadoop@ hadoop1:~]$tar-zxvf hadoop-0.21.0.tar.gz Hadoop部署目录结构如下:/home/hadoop/hadoop0.21.0,所有的hadoop版本放在这个目录中。

将hadoop0.21.0压缩包解压至hadoop0.21.0中,为了方便以后升级,建议建立一个链接指向要使用的hadoop版本,不妨设为hadoop,[hadoop@ hadoop1:~]$ln-s hadoop-0.21.0 hadoop 这样一来,所有的配置文件都在/home/hadoop/hadoop/conf/目录中,所有执行程序都在/home/hadoop/hadoop/bin目录中。但是由于上述目录中hadoop的配置文件和hadoop的安装目录是放在一起的,这样一旦日后升级hadoop版本的时候所有的配置文件都会被覆盖,因此建议将配置文件与安装目录分离,一种比较好的方法就是建立一个存放配置文件的目录,/home/hadoop/hadoop/hadoop-config,然后将/home/hadoop/hadoop/conf/目录中的masters,slaves,hadoop-env.sh三个文件拷贝到hadoop-config目录中,并指定环境变量$HADOOP_CONF_DIR指向该目录。环境变量在/home/hadoop/.bashrc和/etc/profile中设定。

如:

HADOOP_CONF_DIR=/home/hadoop/hadoop/hadoop-config export HADOOP_CONF_DIR 即:

cd /home/hadoop/hadoop mkdir hadoop-config cp conf/masters hadoop-config/ cp conf/slaves hadoop-config/ cp conf/hadoop-env.sh hadoop-config/ sudo vi /etc/profile HADOOP_CONF_DIR=/home/hadoop/hadoop/hadoop-config export HADOOP_CONF_DIR vi /home/hadoop/.bashrc HADOOP_CONF_DIR=/home/hadoop/hadoop/hadoop-config export HADOOP_CONF_DIR 综上所述,为了方便以后升级版本,我们需要做到配置文件与安装目录分离,并通过设定一个指向我们要使用的版本的hadoop的链接,这样可以减少我们对配置文件的维护。

3.SSH设置

在Hadoop启动以后,Namenode是通过SSH(Secure Shell)来启动和停止各个节点上的各种守护进程的,这就需要在节点之间执行指令的时候是不需要输入密码的方式,故我们需要配置SSH使用无密码公钥认证的方式。

首先要保证每台机器上都装了SSH服务器,且都正常启动。实际中我们用的都是OpenSSH,这是SSH协议的一个免费开源实现。

apt-get install ssh 以本文中的3台机器为例,现在hadoop1是主节点,它需要主动发起SSH连接到hadoop2,对于SSH服务来说,hadoop1就是SSH客户端,而hadoop2, hadoop3则是SSH服务端,因此在hadoop2,hadoop3上需要确定sshd服务已经启动。简单的说,在hadoop1上需要生成一个密钥对,即一个私钥,一个公钥。将公钥拷贝到hadoop2上,这样,比如当hadoop1向hadoop2发起ssh连接的时候,hadoop2上就会生成一个随机数并用hadoop1的公钥对这个随机数进行加密,并发送给hadoop1,hadoop1收到这个加密的数以后用私钥进行解密,并将解密后的数发送回hadoop2,hadoop2确认解密的数无误后就允许hadoop1进行连接了。这就完成了一次公钥认证过程。

对于本文中的3台机器,首先在hadoop1上生成密钥对:

[hapoop@hadoop1:~]$ssh-keygen-t dsa –P ‘’ –f ~/.ssh/id_dsa 这个命令将为hadoop1上的用户hadoop生成其密钥对。生成的密钥对id_rsa,id_rsa.pub,在/home/hadoop/.ssh目录下。

[hapoop@hadoop1:.ssh]$cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys 然后将id_rsa.pub的内容复制/home/hadoop/.ssh/authorized_keys文件中。

对于hadoop2:[hapoop @ hadoop2:~]$mkdir.ssh 对于hadoop3:[hapoop @ hadoop3:~]$mkdir.ssh [hapoop @ hapoop1:.ssh]$scp authorized_keys hapoop2:/home/hapoop/.ssh/ [hapoop @ hapoop1:.ssh]$scp authorized_keys hapoop3:/home/hapoop/.ssh/

此处的scp就是通过ssh进行远程copy,此处需要输入远程主机的密码,即hadoop2,hadoop3机器上hadoop帐户的密码,当然,你也可以用其他方法将authorized_keys文件拷贝到其他机器上。

[hadoop@hadoop2:.ssh]$chmod 640 authorized_keys [hadoop@hadoop3:.ssh]$chmod 640 authorized_keys

这一步非常关键,必须保证authorized_keys只对其所有者有读写权限,其他人不允许有写的权限,否则SSH是不会工作的。

[hadoop@ hadoop2:.ssh] $ls-la

drwx------2 hadoop hadoop.drwx------3 hadoop hadoop..-rw-r--r--1 hadoop hadoop authorized_keys

注意每个机器上的.ssh目录的ls-la都应该和上面是一样的

接着,在3台机器上都需要对sshd服务进行配置,修改文件/etc/ssh/sshd_config。

vi /etc/ssh/sshd_config #去除密码认证

PasswordAuthentication no

(?如果这样,putty就不能登陆Linux了,去掉)AuthorizedKeysFile.ssh/authorized_keys 至此各个机器上的SSH配置已经完成,可以测试一下了,比如hadoop1向hadoop2发起ssh连接。

[hadoop@hadoop1:~]$ssh hadoop2 如果ssh配置好了,就会出现以下提示信息 The authenticity of host [hadoop2] can't be established.Key fingerprint is 1024 5f:a0:0b:65:d3:82:df:ab:44:62:6d:98:9c:fe:e9:52.Are you sure you want to continue connecting(yes/no)? OpenSSH告诉你它不知道这台主机,但是你不用担心这个问题,因为你是第一次登录这台主机。键入“yes”。这将把这台主机的“识别标记”加到“~/.ssh/know_hosts”文件中。第二次访问这台主机的时候就不会再显示这条提示信息了。

然后你会发现不需要输入密码就可以建立ssh连接了,恭喜你,配置成功了,不过,别忘了测试本机ssh hadoop1。

(应该是只设置hadoop1对hadoop2发送ssh连接和hadoop1对hadoop3发送ssh连接,下面的需要配置吗?)同样,hadoop1对hadoop3发起ssh连接,再在hadoop3发起对hadoop1的连接。这里不在赘述。

4.Hadoop环境变量

在/home/hadoop/hadoop/hadoop-config目录下的hadoop-env.sh中设置Hadoop需要的环境变量,其中JAVA_HOME是必须设定的变量。HADOOP_HOME变量可以设定也可以不设定,如果不设定,HADOOP_HOME默认的是bin目录的父目录,即本文中的/home/hadoop/hadoop。我的是这样设置的: export HADOOP_HOME=/home/hadoop/hadoop 这一句话export JAVA_HOME=/usr/java/jdk1.6.0_20改成: export JAVA_HOME=/usr/lib/jvm/java-6-openjdk 5.Hadoop配置文件

修改slaves文件

如前所述,在hadoop1(NameNode)的hadoop-config/目录下,打开slaves文件,该文件用来指定所有的DataNode,一行指定一个主机名。即本文中的hadoop2、hadoop3,因此slaves文件看起来应该是这样的:

括号?????? 即hadoop2和192.168.75.131二者写一个就行 hadoop2(192.168.75.131)hadoop3(192.168.75.133)修改masters文件

打开masters文件,该文件用来指定NameNode,内容如下: hadoop1(192.168.75.132)

在hadoop的安装目录分别找到core-default.xml,hdfs-default.xml,mapred-default.xml三个文件,然后copy到hadoop-config/目录下,这三个文件包含Hadoop的所有配置项,然后把这个分别改名为core-site.xml,hdfs-site.xml,mapred-site.xml,我们根据需要修改如下:(注意:下面的配置文件中不要有中文或中文的符号,否则报错)修改core-site.xml core-site.xml的修改项如下:

hadoop.tmp.dir

/home/hadoop/hadoop/tmp

fs.defaultFS

hdfs://(master IP):9000 修改hdfs-site.xml 这里不需要修改此文件。修改mapred-site.xml

mapreduce.jobtracker.address

(master IP):9001

NameNode 注意:不同版本的变量名是不一样的啊~~~~~~~~~~~~ 6.部署Hadoop 前面讲的这么多Hadoop的环境变量和配置文件都是在hadoop1这台机器上的,现在需要将hadoop部署到其他的机器上,保证目录结构一致。

[hadoop@hadoop1:~]$scp-r /home/hadoop/hadoop hadoop2:/home/hadoop/ [hadoop@hadoop1:~]$scp-r /home/hadoop/hadoop hadoop3:/home/hadoop/

至此,可以说,Hadoop已经在各个机器上部署完毕了,下面就让我们开始启动Hadoop吧。

7.启动Hadoop 启动之前,我们先要格式化namenode,先进入~/hadoop/目录,执行下面的命令:

[hadoop@hadoop1 hadoop]$bin/hadoop namenode –format 不出意外,应该会提示格式化成功。如果不成功,就去hadoop/logs/目录下去查看日志文件。

下面就该正式启动hadoop啦,在bin/下面有很多启动脚本,可以根据自己的需要来启动。

* start-all.sh 启动所有的Hadoop守护。包括namenode, datanode, jobtracker, tasktrack * stop-all.sh 停止所有的Hadoop * start-mapred.sh 启动Map/Reduce守护。包括Jobtracker和Tasktrack * stop-mapred.sh 停止Map/Reduce守护

* start-dfs.sh 启动Hadoop DFS守护.Namenode和Datanode * stop-dfs.sh 停止DFS守护

在这里,简单启动所有守护:

[hadoop@hadoop1:hadoop]$bin/start-all.sh 同样,如果要停止hadoop,则

[hadoop@hadoop1:hadoop]$bin/stop-all.sh 8.HDFS操作

运行bin/目录的hadoop命令,可以查看Haoop所有支持的操作及其用法,这里以几个简单的操作为例。建立目录:

[hadoop@hadoop1 hadoop]$bin/hadoop dfs-mkdir testdir 在HDFS中建立一个名为testdir的目录,复制文件:

[hadoop@hadoop1 hadoop]$bin/hadoop dfs-put /home/large.zip testfile.zip 把本地文件large.zip拷贝到HDFS的根目录/user/hadoop/下,文件名为testfile.zip,查看现有文件:

[hadoop@hadoop1 hadoop]$bin/hadoop dfs-ls 1 多机模式部署

一、Jdk安装 命令:

sudo apt-get update sudo apt-get install openjdk-6-jdk 目录:/usr/lib/jvm/java-6-openjdk 配置环境变量:

sudo gedit /etc/environment 在其中添加如下两行:

CLASSPATH=./:/usr/lib/jvm/java-6-openjdk/lib JAVA_HOME=/usr/lib/jvm/java-6-openjdk PATH添加::/usr/lib/jvm/java-6-openjdk

二、新增hadoop用户

命令:sudo addgroup hadoop sudo adduser--ingroup hadoop hadoop sudo gedit etc/sudoers 添加配置:

在root ALL=(ALL)ALL后 hadoop ALL=(ALL)ALL使hadoop用户具有root权限(id:查看用户)

三、ssh配置

用hadoop账户进行登录。

安装openssh-server:sudo apt-get install openssh-server 建立SSH KEY:ssh-keygen-t rsa-P “" 启用SSH KEY:cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys sudo /etc/init.d/ssh reload 验证SSH的配置:ssh localhost

四、安装hadoop并更改用户所有权

wget http://mirror.bjtu.edu.cn/apache/hadoop/core/stable/hadoop-0.20.203.0rc1.tar.gz 我们采用的Hadoop版本是最新的Hadoop-0.20.2,可到Apache基金会官方主页下载http://www.xiexiebang.com/dyn/closer.cgi/hadoop/core,然后使用tar或直接解压到/home/hadoop下,解压后得到一个hadoop-0.20.2的一个文件夹。

安装目录:/usr/local/hadoop

更改所有权:chown-R hadoop:hadoop hadoop

五、配置hadoop(1)配置$HADOOP_HOME/conf/hadoop-env.sh 切换到Hadoop的安装路径找到hadoop-0.20.2下的conf/hadoop-env.sh文件 将:# export JAVA_HOME=/usr/lib/j2sdk1.5-sun 改为:export JAVA_HOME=/usr/lib/jvm/java-6-openjdk(2)配置$HADOOP_HOME/conf/core-site.xml 切换到Hadoop的安装路径找到hadoop-0.20.2下的conf/core-site.xml文件

fs.default.name

hdfs://localhost:9000

hadoop.tmp.dir

/home/hadoop/tmp --配置第二名称节点

fs.checkpoint.dir

{}/home/hadoop/secondname --设置回收站保留时间

fs.trash.interval

10080

Number of minutes between trash checkpoints.If zero, the trash feature is disabled

(3)配置$HADOOP_HOME/conf/hdfs-site.xml 切换到Hadoop的安装路径找到hadoop-0.20.2下的conf/hdfs-site.xml文件内容如下:

dfs.name.dir /home/hadoop/name

dfs.data.dir /home/hadoop/data

dfs.replication 2

--NameNode HTTP状态监视地址

dfs.http.address localhost:50070

--SecondaryNameNode HTTP状态监视地址

dfs.secondary.http.address localhost2:50070 (4)配置$HADOOP_HOME/conf/mapred-site.xml 切换到hadoop的安装路径找到hadoop-0.20.2下的conf/mapred-site.xml文件内容如下:

mapred.local.dir /home/hadoop/temp

mapred.job.tracker localhost:9001

--每个job的map任务数

mapred.map.tasks 7

--每一个tasktracker同时运行的map任务数为2

mapred.tasktracker.map.tasks.maximum 2

--每一个tasktracker同时运行的reduce任务数为4

mapred.tasktracker.reduce.tasks.maximum 4 --jvm虚拟机最大内存

mapred.child.java.opts

-XX:-UseGCOverheadLimit

六、格式化namenode 命令:bin/hadoop namenode-format

七、启动及验证

命令:./bin/start-all.sh 验证方法1:jps 验证方法2:bin/hadoop dfsadmin-report

八:运行Wordcount 1.准备工作

准备两个文本文件并拷贝到dfs里,具体命令操作如下: $ echo ”hello hadoop world.“ > /tmp/test_file1.txt $ echo ”hello world hadoop.i'm ceshiuser." > /tmp/test_file2.txt $ bin/hadoop dfs-mkdir test-in $ bin/hadoop dfs-copyFromLocal /tmp/test*.txt test-in--$ bin/hadoop dfs-put /tmp/test*.txt test-in--$ bin/hadoop dfs-copyToLocal test-out /tmp/test*.txt--$ bin/hadoop dfs-get test-out /tmp/test*.txt $ bin/hadoop dfs-ls test-in 2.运行

$ bin/hadoop jar hadoop-0.20.2-examples.jar wordcount test-in test-out--% ln-s /home/hadoop/junyu-hadoop/hadoop-iflytek-tempcount.jar tempcount 3.查看结果

$ bin/hadoop dfs-ls test-out $ bin/hadoop dfs-cat test-out/part-r-00000 分布式部署:

一,修改/etc/hosts文件: 192.168.71.89 node0 192.168.77.213 node1

二,ssh配置

(1)使用scp 命令把生成的公钥传给其它服务器,并保存成不同的名字,如192.168.71.89-> 192.168.77.213,并且保存为89_id_rsa.pub 的时候命令如下:

scp id_rsa.pub hadoop@192.168.77.213:/home/hadoop/.ssh/89_id_rsa.pub(2)登录192.168.77.213,可以远程登录:ssh hadoop@192.168.77.213,进入当前用户默认目录下的.ssh 目录:cd ~/.ssh

(3)把公钥内容添加到登录认证文件中:cat 89_id_rsa.pub >> authorized_keys

(4)登录到192.168.71.89 主机,使用ssh hadoop@192.168.77.213 进行连接213 主机,这时候会出现输入yes/no? 的提示,输入yes。

三,配置hadoop文件。目录:/usr/local/hadoop/conf/ conf/masters :192.168.71.89 conf/slaves :192.168.77.213

修改 core-site.xml 文件

fs.default.name

hdfs://192.168.71.89:9000/

修改 hdfs-site.xml 文件

1:dfs.http.address

192.168.71.89:50070 2:dfs.secondary.http.address

192.168.71.213:50070

修改mapred-site.xml文件

mapred.job.tracker

192.168.71.89:9001

下载hadoop学习心得word格式文档
下载hadoop学习心得.doc
将本文档下载到自己电脑,方便修改和收藏,请勿使用迅雷等下载。
点此处下载文档

文档为doc格式


声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:645879355@qq.com 进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。

相关范文推荐