大数据常用技术梳理
大数据常用技术梳理名称内容shell基本命令:cd、ll、ls、pwd、cp、python怎么连接数据库,完成基本的增、删、改、查python读取文件和向文件中写数据shell中怎么调用python、调用python脚本时怎么传递参数hadoophadoop常用命令梳理hadoop fs -ls 等等hiveshell中怎么调用hive,调用hive传递参数hive常见的建表语句impalashe
大数据的就业方向:
大数据开发常接触到的技术、工具等
大数据应用流程中每个环节相关的技术介绍
数据分层
大数据常用技术梳理
名称 | 内容 |
---|---|
Linuxl | 基本命令:cd、ll、ls、pwd、cp、 |
shell | 基本命令:cd、ll、ls、pwd、cp、 |
python | 怎么连接数据库,完成基本的增、删、改、查 |
python读取文件和向文件中写数据 | |
shell中怎么调用python、调用python脚本时怎么传递参数 | |
hadoop | hadoop常用命令梳理 |
hadoop fs -ls 等等 | |
hive | shell中怎么调用hive,调用hive传递参数 |
hive常见的建表语句 | |
impala | shell中怎么调用impala,调用impala传递参数 |
sqoop | sqoop工具的使用 |
集群上各种软件安装
软件名称 | 软件版本 | 下载地址 | 安装教程链接地址 |
---|---|---|---|
jdk | 1.8 | ||
mysql | 5.7 | ||
hadoop | 2.7 | ||
hive | |||
sqoop | |||
spark |
一、Shell语言
第一天
Linux基本操作命令
cd、 进入到某个目录
ll、 展示当前目录的文件及文件夹 竖屏
ls、 展示当前目录的文件及文件夹 横屏
pwd、 显示的是当前的目录
cp 复制文件
1.Shell语言
Shell 是一个用 C 语言编写的程序,它是用户使用 Linux 的桥梁。
Shell 既是一种命令语言,又是一种程序设计语言。
2.Shell脚本
Shell 脚本(shell script),是一种为 shell 编写的脚本程序,一般文件后缀为 .sh。
业界所说的 shell 通常都是指 shell 脚本,但 shell 和 shell script 是两个不同的概念。
3.Shell环境
Shell 编程跟 java、php、python 编程一样,只要有一个能编写代码的文本编辑器和一个能解释执行的脚本解释器就可以了。
Shell 的解释器种类众多,常见的有:
sh - 即 Bourne Shell。sh 是 Unix 标准默认的 shell。
bash - 即 Bourne Again Shell。bash 是 Linux 标准默认的 shell。
fish - 智能和用户友好的命令行 shell。
xiki - 使 shell 控制台更友好,更强大。
zsh - 功能强大的 shell 与脚本语言。
本教程关注的是 Bash,也就是 Bourne Again Shell,由于易用和免费,Bash 在日常工作中被广泛使用。同时,Bash 也是大多数Linux 系统默认的 Shell。
在一般情况下,人们并不区分 Bourne Shell 和 Bourne Again Shell,所以,像 #!/bin/sh,它同样也可以改为 #!/bin/bash。
#! 告诉系统其后路径所指定的程序即是解释此脚本文件的 Shell 程序。
4.指定脚本解释器
在 shell 脚本,#! 告诉系统其后路径所指定的程序即是解释此脚本文件的 Shell 解释器。#! 被称作shebang(也称为 Hashbang )。
所以,你应该会在 shell 中,见到诸如以下的注释:
指定 sh 解释器
#!/bin/sh
指定 bash 解释器
#!/bin/bash
5.echo
echo 用于字符串的输出。类似于Java或者python中的print()
echo -e "Hello World!"
6.注释
注释可以说明你的代码是什么作用,以及为什么这样写。
shell 语法中,注释是特殊的语句,会被 shell 解释器忽略。
单行注释 - 以 # 开头,到行尾结束。
多行注释 - 以 :<<EOF 开头,到 EOF 结束。
# '这是单行注释'
:<<EOF
'这是多行注释'
EOF
7.第一个shell脚本
echo "Hello World!"
8.shell脚本的执行
在当前目录下执行
sh xxx.sh
在其他路径下执行
pwd:显示当前的路径
path="aaa"
sh ${path} xxx.sh
9.变量
定义变量时,变量名不加美元符号($,PHP语言中变量需要)
shell语言中: name="yuebuqun"
java语言中:String name = "renwoxing"
python语言中: name = "linghuchong" 会自动识别变量的类型
Javascript中: var name = "wzx"
public static void main(String[] args) {
String name = "任我行";
Integer age = 30;
int age2= 40;
if(age2 instanceof int){
System.out.println("age2:是int类型");
}
if(age instanceof Integer){
System.out.println("age:是Integer类型");
}
if(age instanceof Object){
System.out.println("age:是object类型");
}
if(name instanceof Object){
System.out.println("name:是object类型");
}
if(name instanceof String){
System.out.println("name:是String");
}
}
def test_difine():
print("测试python自动识别变量类型")
name="wzx"
print("测试name的变量类型:" )
print(type(name))
age=30
print("测试age的变量类型:" )
print(type(age))
test_difine()
注意:
变量名和等号之间不能有空格,这可能和你熟悉的所有编程语言都不一样。同时,变量名的命名须遵循如下规则:
正确:name="renyingying"
错误:name = "yilin"
命名只能使用英文字母,数字和下划线,首个字符不能以数字开头。
中间不能有空格,可以使用下划线 _。
不能使用标点符号。
不能使用bash里的关键字(可用help命令查看保留关键字)。
错误案例: 1name="wzx"
name age="wzx"
,name="wzx"
除了显式地直接赋值,还可以用语句给变量赋值,如:
for file in `ls /root` 语句:ls /root
或
for file in $(ls /root)
以上语句将 /root 下目录的文件名循环出来。
使用变量:
使用一个定义过的变量,只要在变量名前面加美元符号即可,如:
name="wzx"
echo $name
echo ${name}
仔细看上面的两种写法:第一种是不加大括号的,第二种是加大括号的
平时是这两种写法都行,除了下面的这种场景
name="wzx"
echo "my name is ${wzx}from"
echo "my name is $wzxfrom"
推荐给所有变量加上花括号,这是个好的编程习惯。
已定义的变量,可以被重新定义,这种写法经常用哈,注意(尤其是在if else语句中)
name="任我行"
echo $name
name="岳不群"
echo $name
10.传递参数
我们可以在执行 Shell 脚本时,向脚本传递参数,脚本内获取参数的格式为:$n。n 代表一个数字,0 为执行的文件名(包含文件路径),1 为执行脚本的第一个参数,2 为执行脚本的第二个参数,以此类推……
参数 | 参数说明 | 重要程度 |
---|---|---|
$# | 传递到脚本的参数个数 | 一般 |
$* | 以一个单字符串显示所有向脚本传递的参数。如"$*“用「”」括起来的情况、以"$1 $2 … $n"的形式输出所有参数。 | 一般 |
$$ | 脚本运行的当前进程ID号 | 重要 |
$! | 后台运行的最后一个进程的ID号 | 熟悉 |
$@ | 与 ∗ 相 同 , 但 是 使 用 时 加 引 号 , 并 在 引 号 中 返 回 每 个 参 数 。 如 " *相同,但是使用时加引号,并在引号中返回每个参数。如" ∗相同,但是使用时加引号,并在引号中返回每个参数。如"@“用「”」括起来的情况、以"$1" “ 2 " … " 2" … " 2"…"n” 的形式输出所有参数。 | 一般 |
$- | 显示Shell使用的当前选项,与set命令功能相同。 | 一般 |
$? | 显示最后命令的退出状态。0表示没有错误,其他任何值表明有错误。 | 重要 |
day3.sh脚本代码
#!/bin/bash
script=$0
sfdm=$1
start_time=$2
end_time=$3
echo -e "查看当前的脚本: ${script}"
echo -e "查看当前的第一个参数: ${sfdm}"
echo -e "查看当前的第二个参数: ${start_time}"
echo -e "查看当前的第三个参数: ${end_time}"
#第二个脚本的路径
bash_path="/root/shell_script"
sh ${bash_path}/test.sh
result_one=$?
echo -e "执行第二个脚本的返回的状态值: ${result_one}"
python ${bash_path}/error.py
result_two=$?
echo -e "执行python脚本的返回的状态值: ${result_two}"
error.py脚本的代码
#-*-coding:utf-8-*-
def test_error_return():
print("测试调用python时,返回的状态值不为0")
result = 1 / 0
return result
test_error_return()
11.数组
数组中可以存放多个值。Bash Shell 只支持一维数组(不支持多维数组),初始化时不需要定义数组大小(与 PHP 类似)。
与大部分编程语言类似,数组元素的下标由 0 开始。
Shell 数组用括号来表示,元素用"空格"符号分割开,语法格式如下:
初始化两种数组的方法:
array=('任我行' '令狐冲' '岳不群' '余沧海')
array_index[0]='方正大师'
array_index[1]='冲虚道长'
读取数组的方法
${array[0]}
${array[1]}
获取数组中的所有元素
echo -e "数组array中所有的元素为 ${array[*]}"
echo -e "数组array_index中所有的元素为 ${array_index[@]}"
获取数组的长度
echo -e "数组array中元素个数为 ${#array[*]}"
echo -e "数组array_index中元素为 ${#array_index[@]}"
测试脚本
#!/bin/bash
array=('任我行' '令狐冲' '岳不群' '余沧海')
echo -e "读取array的第一个元素 ${array[0]}"
array_index[0]='方正大师'
array_index[1]='冲虚道长'
echo -e "读取array_index的第一个元素 ${array_index[0]}"
echo -e "读取array_index的第二个元素 ${array_index[1]}"
#获取数组中的所有元素
echo -e "数组array中所有的元素为 ${array[*]}"
echo -e "数组array_index中所有的元素为 ${array_index[@]}"
#获取数组的长度
echo -e "数组array中元素个数为 ${#array[*]}"
echo -e "数组array_index中元素为 ${#array_index[@]}"
hadoop
原始文件如下,上传到HDFS 上去
hello,wzx,word,name
wzx,word,hello,hello
name,wzx,wzx,wzx
运行Hadoop的jar包的命令:需要加上运行的主类的全路径
hadoop jar wordcount.jar com.wzx.mapreduce.WordCountDriver
22/01/03 01:53:31 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.15.10:8032
22/01/03 01:53:31 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
22/01/03 01:53:31 INFO input.FileInputFormat: Total input paths to process : 1
22/01/03 01:53:32 INFO mapreduce.JobSubmitter: number of splits:1
22/01/03 01:53:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1641144429960_0001
22/01/03 01:53:32 INFO impl.YarnClientImpl: Submitted application application_1641144429960_0001
22/01/03 01:53:32 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1641144429960_0001/
22/01/03 01:53:32 INFO mapreduce.Job: Running job: job_1641144429960_0001
22/01/03 01:53:37 INFO mapreduce.Job: Job job_1641144429960_0001 running in uber mode : false
22/01/03 01:53:37 INFO mapreduce.Job: map 0% reduce 0%
22/01/03 01:53:42 INFO mapreduce.Job: map 100% reduce 0%
22/01/03 01:53:45 INFO mapreduce.Job: map 100% reduce 100%
22/01/03 01:53:45 INFO mapreduce.Job: Job job_1641144429960_0001 completed successfully
22/01/03 01:53:45 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=136
FILE: Number of bytes written=214101
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=171
HDFS: Number of bytes written=28
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=7386
Total time spent by all reduces in occupied slots (ms)=3861
Total time spent by all map tasks (ms)=2462
Total time spent by all reduce tasks (ms)=1287
Total vcore-seconds taken by all map tasks=2462
Total vcore-seconds taken by all reduce tasks=1287
Total megabyte-seconds taken by all map tasks=7563264
Total megabyte-seconds taken by all reduce tasks=3953664
Map-Reduce Framework
Map input records=3
Map output records=12
Map output bytes=106
Map output materialized bytes=136
Input split bytes=113
Combine input records=0
Combine output records=0
Reduce input groups=4
Reduce shuffle bytes=136
Reduce input records=12
Reduce output records=4
Spilled Records=24
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=104
CPU time spent (ms)=1000
Physical memory (bytes) snapshot=445493248
Virtual memory (bytes) snapshot=4218527744
Total committed heap usage (bytes)=279445504
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=58
File Output Format Counters
Bytes Written=28
查看运行结果
hadoop fs -cat /wordcount/output/part-r-00000
hello 3
name 2
word 2
wzx 5
写shell脚本,测试Hadoop的命令是否执行成功
shell脚本
#!/bin/bash
hadoop jar wordcount.jar com.wzx.mapreduce.WordCountDriver
result=$?
echo -e "Hadoop的运行程序是否成功?-- ${result}"
sh test_hadoop_return_status.sh
22/01/03 02:06:38 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.15.10:8032
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/wordcount/output already exists
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:267)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:140)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1297)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1294)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1294)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1315)
at com.wzx.mapreduce.WordCountDriver.main(WordCountDriver.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Hadoop的运行程序是否成功?-- 1
删除生成的结果的目录以后,运行如下,这样就可以用shell脚本来监控Hadoop运行的程序是否有异常了,可以把这个异常结果放到关系型数据库中MySQL中,便于监控
sh test_hadoop_return_status.sh
22/01/03 02:09:50 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.15.10:8032
22/01/03 02:09:50 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
22/01/03 02:09:50 INFO input.FileInputFormat: Total input paths to process : 1
22/01/03 02:09:50 INFO mapreduce.JobSubmitter: number of splits:1
22/01/03 02:09:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1641144429960_0002
22/01/03 02:09:51 INFO impl.YarnClientImpl: Submitted application application_1641144429960_0002
22/01/03 02:09:51 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1641144429960_0002/
22/01/03 02:09:51 INFO mapreduce.Job: Running job: job_1641144429960_0002
22/01/03 02:09:55 INFO mapreduce.Job: Job job_1641144429960_0002 running in uber mode : false
22/01/03 02:09:55 INFO mapreduce.Job: map 0% reduce 0%
22/01/03 02:09:58 INFO mapreduce.Job: map 100% reduce 0%
22/01/03 02:10:02 INFO mapreduce.Job: map 100% reduce 100%
22/01/03 02:10:02 INFO mapreduce.Job: Job job_1641144429960_0002 completed successfully
22/01/03 02:10:02 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=136
FILE: Number of bytes written=214101
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=171
HDFS: Number of bytes written=28
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=4839
Total time spent by all reduces in occupied slots (ms)=4392
Total time spent by all map tasks (ms)=1613
Total time spent by all reduce tasks (ms)=1464
Total vcore-seconds taken by all map tasks=1613
Total vcore-seconds taken by all reduce tasks=1464
Total megabyte-seconds taken by all map tasks=4955136
Total megabyte-seconds taken by all reduce tasks=4497408
Map-Reduce Framework
Map input records=3
Map output records=12
Map output bytes=106
Map output materialized bytes=136
Input split bytes=113
Combine input records=0
Combine output records=0
Reduce input groups=4
Reduce shuffle bytes=136
Reduce input records=12
Reduce output records=4
Spilled Records=24
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=113
CPU time spent (ms)=990
Physical memory (bytes) snapshot=443863040
Virtual memory (bytes) snapshot=4219289600
Total committed heap usage (bytes)=279969792
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=58
File Output Format Counters
Bytes Written=28
Hadoop的运行程序是否成功?-- 0
yarn上任务如下;
HDFS上文件如下:
本地调试代码:
//这一行没有,就默认走本地的配置文件
conf.set(“fs.defaultFS”, “hdfs://master:9000”);
package com.wzx.localmapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
*
* @author AllenWoon
*
* 本类是客户端用来指定wordcount job程序运行时候所需要的很多参数
*
* 比如:指定哪个类作为map阶段的业务逻辑类 哪个类作为reduce阶段的业务逻辑类
* 指定用哪个组件作为数据的读取组件 数据结果输出组件
* 指定这个wordcount jar包所在的路径
*
* ....
* 以及其他各种所需要的参数
*/
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//这一行没有,就默认走本地的配置文件
conf.set("fs.defaultFS", "hdfs://master:9000");
Job job = Job.getInstance(conf);
//告诉框架,我们的的程序所在jar包的位置
job.setJar("/root/wordcount.jar");
job.setJarByClass(WordCountDriver.class);
//告诉程序,我们的程序所用的mapper类和reducer类是哪个
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
//TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路劲下
FileInputFormat.setInputPaths(job, new Path("d://wordcount/input"));
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, new Path("d://wordcount/output"));
boolean res = job.waitForCompletion(true);
/**
* 这个地方是返回给Linux的结果:是0的话表示程序没问题,是1的话表示程序运行有问题
*/
System.exit(res?0:1);
}
}
本地的配置文件如下:在jar包中有
本地测试时,报错如下:
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:557)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187)
at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174)
at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:115)
at org.apache.hadoop.mapred.LocalDistributedCacheManager.setup(LocalDistributedCacheManager.java:125)
at org.apache.hadoop.mapred.LocalJobRunner$Job.<init>(LocalJobRunner.java:163)
at org.apache.hadoop.mapred.LocalJobRunner.submitJob(LocalJobRunner.java:731)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:241)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1297)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1294)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1294)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1315)
at com.wzx.localmapreduce.WordCountDriver.main(WordCountDriver.java:63)
出现原因:在新版本的windows系统中,会取消部分文件,某些功能无法支持。本地的NativeIO无法写入,我们需要再写一个NativeIO的类,放入代码片段的包中;
NativeIO类的代码如下:直接复制即可使用
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.nativeio;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.PerformanceAdvisory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sun.misc.Unsafe;
import com.google.common.annotations.VisibleForTesting;
/**
* JNI wrappers for various native IO-related calls not available in Java.
* These functions should generally be used alongside a fallback to another
* more portable mechanism.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NativeIO {
public static class POSIX {
// Flags for open() call from bits/fcntl.h
public static final int O_RDONLY = 00;
public static final int O_WRONLY = 01;
public static final int O_RDWR = 02;
public static final int O_CREAT = 0100;
public static final int O_EXCL = 0200;
public static final int O_NOCTTY = 0400;
public static final int O_TRUNC = 01000;
public static final int O_APPEND = 02000;
public static final int O_NONBLOCK = 04000;
public static final int O_SYNC = 010000;
public static final int O_ASYNC = 020000;
public static final int O_FSYNC = O_SYNC;
public static final int O_NDELAY = O_NONBLOCK;
// Flags for posix_fadvise() from bits/fcntl.h
/* No further special treatment. */
public static final int POSIX_FADV_NORMAL = 0;
/* Expect random page references. */
public static final int POSIX_FADV_RANDOM = 1;
/* Expect sequential page references. */
public static final int POSIX_FADV_SEQUENTIAL = 2;
/* Will need these pages. */
public static final int POSIX_FADV_WILLNEED = 3;
/* Don't need these pages. */
public static final int POSIX_FADV_DONTNEED = 4;
/* Data will be accessed once. */
public static final int POSIX_FADV_NOREUSE = 5;
/* Wait upon writeout of all pages
in the range before performing the
write. */
public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
/* Initiate writeout of all those
dirty pages in the range which are
not presently under writeback. */
public static final int SYNC_FILE_RANGE_WRITE = 2;
/* Wait upon writeout of all pages in
the range after performing the
write. */
public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
private static final Log LOG = LogFactory.getLog(NativeIO.class);
private static boolean nativeLoaded = false;
private static boolean fadvisePossible = true;
private static boolean syncFileRangePossible = true;
static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
"hadoop.workaround.non.threadsafe.getpwuid";
static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = true;
private static long cacheTimeout = -1;
private static CacheManipulator cacheManipulator = new CacheManipulator();
public static CacheManipulator getCacheManipulator() {
return cacheManipulator;
}
public static void setCacheManipulator(CacheManipulator cacheManipulator) {
POSIX.cacheManipulator = cacheManipulator;
}
/**
* Used to manipulate the operating system cache.
*/
@VisibleForTesting
public static class CacheManipulator {
public void mlock(String identifier, ByteBuffer buffer,
long len) throws IOException {
POSIX.mlock(buffer, len);
}
public long getMemlockLimit() {
return NativeIO.getMemlockLimit();
}
public long getOperatingSystemPageSize() {
return NativeIO.getOperatingSystemPageSize();
}
public void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
POSIX.posixFadviseIfPossible(identifier, fd, offset,
len, flags);
}
public boolean verifyCanMlock() {
return NativeIO.isAvailable();
}
}
/**
* A CacheManipulator used for testing which does not actually call mlock.
* This allows many tests to be run even when the operating system does not
* allow mlock, or only allows limited mlocking.
*/
@VisibleForTesting
public static class NoMlockCacheManipulator extends CacheManipulator {
public void mlock(String identifier, ByteBuffer buffer,
long len) throws IOException {
LOG.info("mlocking " + identifier);
}
public long getMemlockLimit() {
return 1125899906842624L;
}
public long getOperatingSystemPageSize() {
return 4096;
}
public boolean verifyCanMlock() {
return true;
}
}
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
Configuration conf = new Configuration();
workaroundNonThreadSafePasswdCalls = conf.getBoolean(
WORKAROUND_NON_THREADSAFE_CALLS_KEY,
WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
initNative();
nativeLoaded = true;
cacheTimeout = conf.getLong(
CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY,
CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) *
1000;
LOG.debug("Initialized cache for IDs to User/Group mapping with a " +
" cache timeout of " + cacheTimeout/1000 + " seconds.");
} catch (Throwable t) {
// This can happen if the user has an older version of libhadoop.so
// installed - in this case we can continue without native IO
// after warning
PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t);
}
}
}
/**
* Return true if the JNI-based native IO extensions are available.
*/
public static boolean isAvailable() {
return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
}
private static void assertCodeLoaded() throws IOException {
if (!isAvailable()) {
throw new IOException("NativeIO was not loaded");
}
}
/** Wrapper around open(2) */
public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
/** Wrapper around fstat(2) */
private static native Stat fstat(FileDescriptor fd) throws IOException;
/** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */
private static native void chmodImpl(String path, int mode) throws IOException;
public static void chmod(String path, int mode) throws IOException {
if (!Shell.WINDOWS) {
chmodImpl(path, mode);
} else {
try {
chmodImpl(path, mode);
} catch (NativeIOException nioe) {
if (nioe.getErrorCode() == 3) {
throw new NativeIOException("No such file or directory",
Errno.ENOENT);
} else {
LOG.warn(String.format("NativeIO.chmod error (%d): %s",
nioe.getErrorCode(), nioe.getMessage()));
throw new NativeIOException("Unknown error", Errno.UNKNOWN);
}
}
}
}
/** Wrapper around posix_fadvise(2) */
static native void posix_fadvise(
FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
/** Wrapper around sync_file_range(2) */
static native void sync_file_range(
FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
/**
* Call posix_fadvise on the given file descriptor. See the manpage
* for this syscall for more information. On systems where this
* call is not available, does nothing.
*
* @throws NativeIOException if there is an error with the syscall
*/
static void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
if (nativeLoaded && fadvisePossible) {
try {
posix_fadvise(fd, offset, len, flags);
} catch (UnsupportedOperationException uoe) {
fadvisePossible = false;
} catch (UnsatisfiedLinkError ule) {
fadvisePossible = false;
}
}
}
/**
* Call sync_file_range on the given file descriptor. See the manpage
* for this syscall for more information. On systems where this
* call is not available, does nothing.
*
* @throws NativeIOException if there is an error with the syscall
*/
public static void syncFileRangeIfPossible(
FileDescriptor fd, long offset, long nbytes, int flags)
throws NativeIOException {
if (nativeLoaded && syncFileRangePossible) {
try {
sync_file_range(fd, offset, nbytes, flags);
} catch (UnsupportedOperationException uoe) {
syncFileRangePossible = false;
} catch (UnsatisfiedLinkError ule) {
syncFileRangePossible = false;
}
}
}
static native void mlock_native(
ByteBuffer buffer, long len) throws NativeIOException;
/**
* Locks the provided direct ByteBuffer into memory, preventing it from
* swapping out. After a buffer is locked, future accesses will not incur
* a page fault.
*
* See the mlock(2) man page for more information.
*
* @throws NativeIOException
*/
static void mlock(ByteBuffer buffer, long len)
throws IOException {
assertCodeLoaded();
if (!buffer.isDirect()) {
throw new IOException("Cannot mlock a non-direct ByteBuffer");
}
mlock_native(buffer, len);
}
/**
* Unmaps the block from memory. See munmap(2).
*
* There isn't any portable way to unmap a memory region in Java.
* So we use the sun.nio method here.
* Note that unmapping a memory region could cause crashes if code
* continues to reference the unmapped code. However, if we don't
* manually unmap the memory, we are dependent on the finalizer to
* do it, and we have no idea when the finalizer will run.
*
* @param buffer The buffer to unmap.
*/
public static void munmap(MappedByteBuffer buffer) {
if (buffer instanceof sun.nio.ch.DirectBuffer) {
sun.misc.Cleaner cleaner =
((sun.nio.ch.DirectBuffer)buffer).cleaner();
cleaner.clean();
}
}
/** Linux only methods used for getOwner() implementation */
private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
private static native String getUserName(long uid) throws IOException;
/**
* Result type of the fstat call
*/
public static class Stat {
private int ownerId, groupId;
private String owner, group;
private int mode;
// Mode constants
public static final int S_IFMT = 0170000; /* type of file */
public static final int S_IFIFO = 0010000; /* named pipe (fifo) */
public static final int S_IFCHR = 0020000; /* character special */
public static final int S_IFDIR = 0040000; /* directory */
public static final int S_IFBLK = 0060000; /* block special */
public static final int S_IFREG = 0100000; /* regular */
public static final int S_IFLNK = 0120000; /* symbolic link */
public static final int S_IFSOCK = 0140000; /* socket */
public static final int S_IFWHT = 0160000; /* whiteout */
public static final int S_ISUID = 0004000; /* set user id on execution */
public static final int S_ISGID = 0002000; /* set group id on execution */
public static final int S_ISVTX = 0001000; /* save swapped text even after use */
public static final int S_IRUSR = 0000400; /* read permission, owner */
public static final int S_IWUSR = 0000200; /* write permission, owner */
public static final int S_IXUSR = 0000100; /* execute/search permission, owner */
Stat(int ownerId, int groupId, int mode) {
this.ownerId = ownerId;
this.groupId = groupId;
this.mode = mode;
}
Stat(String owner, String group, int mode) {
if (!Shell.WINDOWS) {
this.owner = owner;
} else {
this.owner = stripDomain(owner);
}
if (!Shell.WINDOWS) {
this.group = group;
} else {
this.group = stripDomain(group);
}
this.mode = mode;
}
@Override
public String toString() {
return "Stat(owner='" + owner + "', group='" + group + "'" +
", mode=" + mode + ")";
}
public String getOwner() {
return owner;
}
public String getGroup() {
return group;
}
public int getMode() {
return mode;
}
}
/**
* Returns the file stat for a file descriptor.
*
* @param fd file descriptor.
* @return the file descriptor file stat.
* @throws IOException thrown if there was an IO error while obtaining the file stat.
*/
public static Stat getFstat(FileDescriptor fd) throws IOException {
Stat stat = null;
if (!Shell.WINDOWS) {
stat = fstat(fd);
stat.owner = getName(IdCache.USER, stat.ownerId);
stat.group = getName(IdCache.GROUP, stat.groupId);
} else {
try {
stat = fstat(fd);
} catch (NativeIOException nioe) {
if (nioe.getErrorCode() == 6) {
throw new NativeIOException("The handle is invalid.",
Errno.EBADF);
} else {
LOG.warn(String.format("NativeIO.getFstat error (%d): %s",
nioe.getErrorCode(), nioe.getMessage()));
throw new NativeIOException("Unknown error", Errno.UNKNOWN);
}
}
}
return stat;
}
private static String getName(IdCache domain, int id) throws IOException {
Map<Integer, CachedName> idNameCache = (domain == IdCache.USER)
? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE;
String name;
CachedName cachedName = idNameCache.get(id);
long now = System.currentTimeMillis();
if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) {
name = cachedName.name;
} else {
name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id);
if (LOG.isDebugEnabled()) {
String type = (domain == IdCache.USER) ? "UserName" : "GroupName";
LOG.debug("Got " + type + " " + name + " for ID " + id +
" from the native implementation");
}
cachedName = new CachedName(name, now);
idNameCache.put(id, cachedName);
}
return name;
}
static native String getUserName(int uid) throws IOException;
static native String getGroupName(int uid) throws IOException;
private static class CachedName {
final long timestamp;
final String name;
public CachedName(String name, long timestamp) {
this.name = name;
this.timestamp = timestamp;
}
}
private static final Map<Integer, CachedName> USER_ID_NAME_CACHE =
new ConcurrentHashMap<Integer, CachedName>();
private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE =
new ConcurrentHashMap<Integer, CachedName>();
private enum IdCache { USER, GROUP }
public final static int MMAP_PROT_READ = 0x1;
public final static int MMAP_PROT_WRITE = 0x2;
public final static int MMAP_PROT_EXEC = 0x4;
public static native long mmap(FileDescriptor fd, int prot,
boolean shared, long length) throws IOException;
public static native void munmap(long addr, long length)
throws IOException;
}
private static boolean workaroundNonThreadSafePasswdCalls = false;
public static class Windows {
// Flags for CreateFile() call on Windows
public static final long GENERIC_READ = 0x80000000L;
public static final long GENERIC_WRITE = 0x40000000L;
public static final long FILE_SHARE_READ = 0x00000001L;
public static final long FILE_SHARE_WRITE = 0x00000002L;
public static final long FILE_SHARE_DELETE = 0x00000004L;
public static final long CREATE_NEW = 1;
public static final long CREATE_ALWAYS = 2;
public static final long OPEN_EXISTING = 3;
public static final long OPEN_ALWAYS = 4;
public static final long TRUNCATE_EXISTING = 5;
public static final long FILE_BEGIN = 0;
public static final long FILE_CURRENT = 1;
public static final long FILE_END = 2;
public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L;
/**
* Create a directory with permissions set to the specified mode. By setting
* permissions at creation time, we avoid issues related to the user lacking
* WRITE_DAC rights on subsequent chmod calls. One example where this can
* occur is writing to an SMB share where the user does not have Full Control
* rights, and therefore WRITE_DAC is denied.
*
* @param path directory to create
* @param mode permissions of new directory
* @throws IOException if there is an I/O error
*/
public static void createDirectoryWithMode(File path, int mode)
throws IOException {
createDirectoryWithMode0(path.getAbsolutePath(), mode);
}
/** Wrapper around CreateDirectory() on Windows */
private static native void createDirectoryWithMode0(String path, int mode)
throws NativeIOException;
/** Wrapper around CreateFile() on Windows */
public static native FileDescriptor createFile(String path,
long desiredAccess, long shareMode, long creationDisposition)
throws IOException;
/**
* Create a file for write with permissions set to the specified mode. By
* setting permissions at creation time, we avoid issues related to the user
* lacking WRITE_DAC rights on subsequent chmod calls. One example where
* this can occur is writing to an SMB share where the user does not have
* Full Control rights, and therefore WRITE_DAC is denied.
*
* This method mimics the semantics implemented by the JDK in
* {@link FileOutputStream}. The file is opened for truncate or
* append, the sharing mode allows other readers and writers, and paths
* longer than MAX_PATH are supported. (See io_util_md.c in the JDK.)
*
* @param path file to create
* @param append if true, then open file for append
* @param mode permissions of new directory
* @return FileOutputStream of opened file
* @throws IOException if there is an I/O error
*/
public static FileOutputStream createFileOutputStreamWithMode(File path,
boolean append, int mode) throws IOException {
long desiredAccess = GENERIC_WRITE;
long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS;
return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(),
desiredAccess, shareMode, creationDisposition, mode));
}
/** Wrapper around CreateFile() with security descriptor on Windows */
private static native FileDescriptor createFileWithMode0(String path,
long desiredAccess, long shareMode, long creationDisposition, int mode)
throws NativeIOException;
/** Wrapper around SetFilePointer() on Windows */
public static native long setFilePointer(FileDescriptor fd,
long distanceToMove, long moveMethod) throws IOException;
/** Windows only methods used for getOwner() implementation */
private static native String getOwner(FileDescriptor fd) throws IOException;
/** Supported list of Windows access right flags */
public static enum AccessRight {
ACCESS_READ (0x0001), // FILE_READ_DATA
ACCESS_WRITE (0x0002), // FILE_WRITE_DATA
ACCESS_EXECUTE (0x0020); // FILE_EXECUTE
private final int accessRight;
AccessRight(int access) {
accessRight = access;
}
public int accessRight() {
return accessRight;
}
};
/** Windows only method used to check if the current process has requested
* access rights on the given path. */
private static native boolean access0(String path, int requestedAccess);
/**
* Checks whether the current process has desired access rights on
* the given path.
*
* Longer term this native function can be substituted with JDK7
* function Files#isReadable, isWritable, isExecutable.
*
* @param path input path
* @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE
* @return true if access is allowed
* @throws IOException I/O exception on error
*/
public static boolean access(String path, AccessRight desiredAccess)
throws IOException {
//return access0(path, desiredAccess.accessRight());
return true;
}
/**
* Extends both the minimum and maximum working set size of the current
* process. This method gets the current minimum and maximum working set
* size, adds the requested amount to each and then sets the minimum and
* maximum working set size to the new values. Controlling the working set
* size of the process also controls the amount of memory it can lock.
*
* @param delta amount to increment minimum and maximum working set size
* @throws IOException for any error
* @see POSIX#mlock(ByteBuffer, long)
*/
public static native void extendWorkingSetSize(long delta) throws IOException;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
initNative();
nativeLoaded = true;
} catch (Throwable t) {
// This can happen if the user has an older version of libhadoop.so
// installed - in this case we can continue without native IO
// after warning
PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t);
}
}
}
}
private static final Log LOG = LogFactory.getLog(NativeIO.class);
private static boolean nativeLoaded = false;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
initNative();
nativeLoaded = true;
} catch (Throwable t) {
// This can happen if the user has an older version of libhadoop.so
// installed - in this case we can continue without native IO
// after warning
PerformanceAdvisory.LOG.debug("Unable to initialize NativeIO libraries", t);
}
}
}
/**
* Return true if the JNI-based native IO extensions are available.
*/
public static boolean isAvailable() {
return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
}
/** Initialize the JNI method ID and class ID cache */
private static native void initNative();
/**
* Get the maximum number of bytes that can be locked into memory at any
* given point.
*
* @return 0 if no bytes can be locked into memory;
* Long.MAX_VALUE if there is no limit;
* The number of bytes that can be locked into memory otherwise.
*/
static long getMemlockLimit() {
return isAvailable() ? getMemlockLimit0() : 0;
}
private static native long getMemlockLimit0();
/**
* @return the operating system's page size.
*/
static long getOperatingSystemPageSize() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe)f.get(null);
return unsafe.pageSize();
} catch (Throwable e) {
LOG.warn("Unable to get operating system page size. Guessing 4096.", e);
return 4096;
}
}
private static class CachedUid {
final long timestamp;
final String username;
public CachedUid(String username, long timestamp) {
this.timestamp = timestamp;
this.username = username;
}
}
private static final Map<Long, CachedUid> uidCache =
new ConcurrentHashMap<Long, CachedUid>();
private static long cacheTimeout;
private static boolean initialized = false;
/**
* The Windows logon name has two part, NetBIOS domain name and
* user account name, of the format DOMAIN\UserName. This method
* will remove the domain part of the full logon name.
*
* @param Fthe full principal name containing the domain
* @return name with domain removed
*/
private static String stripDomain(String name) {
int i = name.indexOf('\\');
if (i != -1)
name = name.substring(i + 1);
return name;
}
public static String getOwner(FileDescriptor fd) throws IOException {
ensureInitialized();
if (Shell.WINDOWS) {
String owner = Windows.getOwner(fd);
owner = stripDomain(owner);
return owner;
} else {
long uid = POSIX.getUIDforFDOwnerforOwner(fd);
CachedUid cUid = uidCache.get(uid);
long now = System.currentTimeMillis();
if (cUid != null && (cUid.timestamp + cacheTimeout) > now) {
return cUid.username;
}
String user = POSIX.getUserName(uid);
LOG.info("Got UserName " + user + " for UID " + uid
+ " from the native implementation");
cUid = new CachedUid(user, now);
uidCache.put(uid, cUid);
return user;
}
}
/**
* Create a FileInputStream that shares delete permission on the
* file opened, i.e. other process can delete the file the
* FileInputStream is reading. Only Windows implementation uses
* the native interface.
*/
public static FileInputStream getShareDeleteFileInputStream(File f)
throws IOException {
if (!Shell.WINDOWS) {
// On Linux the default FileInputStream shares delete permission
// on the file opened.
//
return new FileInputStream(f);
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened.
//
FileDescriptor fd = Windows.createFile(
f.getAbsolutePath(),
Windows.GENERIC_READ,
Windows.FILE_SHARE_READ |
Windows.FILE_SHARE_WRITE |
Windows.FILE_SHARE_DELETE,
Windows.OPEN_EXISTING);
return new FileInputStream(fd);
}
}
/**
* Create a FileInputStream that shares delete permission on the
* file opened at a given offset, i.e. other process can delete
* the file the FileInputStream is reading. Only Windows implementation
* uses the native interface.
*/
public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
throws IOException {
if (!Shell.WINDOWS) {
RandomAccessFile rf = new RandomAccessFile(f, "r");
if (seekOffset > 0) {
rf.seek(seekOffset);
}
return new FileInputStream(rf.getFD());
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened, and set it to the
// given offset.
//
FileDescriptor fd = Windows.createFile(
f.getAbsolutePath(),
Windows.GENERIC_READ,
Windows.FILE_SHARE_READ |
Windows.FILE_SHARE_WRITE |
Windows.FILE_SHARE_DELETE,
Windows.OPEN_EXISTING);
if (seekOffset > 0)
Windows.setFilePointer(fd, seekOffset, Windows.FILE_BEGIN);
return new FileInputStream(fd);
}
}
/**
* Create the specified File for write access, ensuring that it does not exist.
* @param f the file that we want to create
* @param permissions we want to have on the file (if security is enabled)
*
* @throws AlreadyExistsException if the file already exists
* @throws IOException if any other error occurred
*/
public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions)
throws IOException {
if (!Shell.WINDOWS) {
// Use the native wrapper around open(2)
try {
FileDescriptor fd = POSIX.open(f.getAbsolutePath(),
POSIX.O_WRONLY | POSIX.O_CREAT
| POSIX.O_EXCL, permissions);
return new FileOutputStream(fd);
} catch (NativeIOException nioe) {
if (nioe.getErrno() == Errno.EEXIST) {
throw new AlreadyExistsException(nioe);
}
throw nioe;
}
} else {
// Use the Windows native APIs to create equivalent FileOutputStream
try {
FileDescriptor fd = Windows.createFile(f.getCanonicalPath(),
Windows.GENERIC_WRITE,
Windows.FILE_SHARE_DELETE
| Windows.FILE_SHARE_READ
| Windows.FILE_SHARE_WRITE,
Windows.CREATE_NEW);
POSIX.chmod(f.getCanonicalPath(), permissions);
return new FileOutputStream(fd);
} catch (NativeIOException nioe) {
if (nioe.getErrorCode() == 80) {
// ERROR_FILE_EXISTS
// 80 (0x50)
// The file exists
throw new AlreadyExistsException(nioe);
}
throw nioe;
}
}
}
private synchronized static void ensureInitialized() {
if (!initialized) {
cacheTimeout =
new Configuration().getLong("hadoop.security.uid.cache.secs",
4*60*60) * 1000;
LOG.info("Initialized cache for UID to User mapping with a cache" +
" timeout of " + cacheTimeout/1000 + " seconds.");
initialized = true;
}
}
/**
* A version of renameTo that throws a descriptive exception when it fails.
*
* @param src The source path
* @param dst The destination path
*
* @throws NativeIOException On failure.
*/
public static void renameTo(File src, File dst)
throws IOException {
if (!nativeLoaded) {
if (!src.renameTo(dst)) {
throw new IOException("renameTo(src=" + src + ", dst=" +
dst + ") failed.");
}
} else {
renameTo0(src.getAbsolutePath(), dst.getAbsolutePath());
}
}
public static void link(File src, File dst) throws IOException {
if (!nativeLoaded) {
HardLink.createHardLink(src, dst);
} else {
link0(src.getAbsolutePath(), dst.getAbsolutePath());
}
}
/**
* A version of renameTo that throws a descriptive exception when it fails.
*
* @param src The source path
* @param dst The destination path
*
* @throws NativeIOException On failure.
*/
private static native void renameTo0(String src, String dst)
throws NativeIOException;
private static native void link0(String src, String dst)
throws NativeIOException;
/**
* Unbuffered file copy from src to dst without tainting OS buffer cache
*
* In POSIX platform:
* It uses FileChannel#transferTo() which internally attempts
* unbuffered IO on OS with native sendfile64() support and falls back to
* buffered IO otherwise.
*
* It minimizes the number of FileChannel#transferTo call by passing the the
* src file size directly instead of a smaller size as the 3rd parameter.
* This saves the number of sendfile64() system call when native sendfile64()
* is supported. In the two fall back cases where sendfile is not supported,
* FileChannle#transferTo already has its own batching of size 8 MB and 8 KB,
* respectively.
*
* In Windows Platform:
* It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING
* flag, which is supported on Windows Server 2008 and above.
*
* Ideally, we should use FileChannel#transferTo() across both POSIX and Windows
* platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0)
* used by FileChannel#transferTo for unbuffered IO is not implemented on Windows.
* Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0
* on Windows simply returns IOS_UNSUPPORTED.
*
* Note: This simple native wrapper does minimal parameter checking before copy and
* consistency check (e.g., size) after copy.
* It is recommended to use wrapper function like
* the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs with pre/post copy
* checks.
*
* @param src The source path
* @param dst The destination path
* @throws IOException
*/
public static void copyFileUnbuffered(File src, File dst) throws IOException {
if (nativeLoaded && Shell.WINDOWS) {
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
} else {
FileInputStream fis = null;
FileOutputStream fos = null;
FileChannel input = null;
FileChannel output = null;
try {
fis = new FileInputStream(src);
fos = new FileOutputStream(dst);
input = fis.getChannel();
output = fos.getChannel();
long remaining = input.size();
long position = 0;
long transferred = 0;
while (remaining > 0) {
transferred = input.transferTo(position, remaining, output);
remaining -= transferred;
position += transferred;
}
} finally {
IOUtils.cleanup(LOG, output);
IOUtils.cleanup(LOG, fos);
IOUtils.cleanup(LOG, input);
IOUtils.cleanup(LOG, fis);
}
}
}
private static native void copyFileUnbuffered0(String src, String dst)
throws NativeIOException;
}
本地连接集群进行测试,这个时候是有问题的,连接上了,但是参数拼接的不对,需要修改Windows上的源码才可以,yarnrun的一个类,把其中的%号啥的都需要修改好才成
package com.wzx.localmapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
*
* @author AllenWoon
*
* 本类是客户端用来指定wordcount job程序运行时候所需要的很多参数
*
* 比如:指定哪个类作为map阶段的业务逻辑类 哪个类作为reduce阶段的业务逻辑类
* 指定用哪个组件作为数据的读取组件 数据结果输出组件
* 指定这个wordcount jar包所在的路径
*
* ....
* 以及其他各种所需要的参数
*/
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//这一行没有,就默认走本地的配置文件
conf.set("fs.defaultFS", "hdfs://master:9000");
conf.set("mapreduce.framework.name","yarn");
conf.set("yarn.resourcemanager.hostname", "master");
Job job = Job.getInstance(conf);
//告诉框架,我们的的程序所在jar包的位置
//job.setJar("/root/wordcount.jar");
job.setJarByClass(WordCountDriver.class);
//告诉程序,我们的程序所用的mapper类和reducer类是哪个
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
//TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路劲下
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
boolean res = job.waitForCompletion(true);
/**
* 这个地方是返回给Linux的结果:是0的话表示程序没问题,是1的话表示程序运行有问题
*/
System.exit(res?0:1);
}
}
运行结果如下:原因是拼接参数时,集群没识别出来Windows上的参数
WARN - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
INFO - Connecting to ResourceManager at master/192.168.15.10:8032
WARN - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
WARN - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
INFO - Total input paths to process : 1
INFO - number of splits:1
INFO - Submitting tokens for job: job_1641202044651_0001
INFO - Job jar is not present. Not adding any jar to the list of resources.
INFO - Submitted application application_1641202044651_0001
INFO - The url to track the job: http://master:8088/proxy/application_1641202044651_0001/
INFO - Running job: job_1641202044651_0001
INFO - Job job_1641202044651_0001 running in uber mode : false
INFO - map 0% reduce 0%
INFO - Job job_1641202044651_0001 failed with state FAILED due to: Application application_1641202044651_0001 failed 2 times due to AM Container for appattempt_1641202044651_0001_000002 exited with exitCode: 1
For more detailed output, check application tracking page:http://master:8088/proxy/application_1641202044651_0001/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1641202044651_0001_02_000001
Exit code: 1
Exception message: /bin/bash: 第 0 行:fg: 无任务控制
Stack trace: ExitCodeException exitCode=1: /bin/bash: 第 0 行:fg: 无任务控制
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
INFO - Counters: 0
注释掉两个参数进行测试:这个时候是可以连接集群的,但是集群的yarn上看不到结果
package com.wzx.localmapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
*
* @author AllenWoon
*
* 本类是客户端用来指定wordcount job程序运行时候所需要的很多参数
*
* 比如:指定哪个类作为map阶段的业务逻辑类 哪个类作为reduce阶段的业务逻辑类
* 指定用哪个组件作为数据的读取组件 数据结果输出组件
* 指定这个wordcount jar包所在的路径
*
* ....
* 以及其他各种所需要的参数
*/
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//这一行没有,就默认走本地的配置文件
conf.set("fs.defaultFS", "hdfs://master:9000");
//conf.set("mapreduce.framework.name","yarn");
//conf.set("yarn.resourcemanager.hostname", "master");
Job job = Job.getInstance(conf);
//告诉框架,我们的的程序所在jar包的位置
//job.setJar("/root/wordcount.jar");
job.setJarByClass(WordCountDriver.class);
//告诉程序,我们的程序所用的mapper类和reducer类是哪个
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
//TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路劲下
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
boolean res = job.waitForCompletion(true);
/**
* 这个地方是返回给Linux的结果:是0的话表示程序没问题,是1的话表示程序运行有问题
*/
System.exit(res?0:1);
}
}
启动报错权限问题:加上
WARN - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
WARN - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
WARN - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
INFO - Total input paths to process : 1
INFO - number of splits:1
INFO - Submitting tokens for job: job_local918163450_0001
INFO - The url to track the job: http://localhost:8080/
INFO - Running job: job_local918163450_0001
INFO - OutputCommitter set in config null
INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
WARN - job_local918163450_0001
org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="/wordcount":root:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6545)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6527)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6479)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4290)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4260)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4233)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2036)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2034)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2744)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2713)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:291)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:511)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=Administrator, access=WRITE, inode="/wordcount":root:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6545)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6527)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6479)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4290)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4260)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4233)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2036)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2034)
at org.apache.hadoop.ipc.Client.call(Client.java:1469)
at org.apache.hadoop.ipc.Client.call(Client.java:1400)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2742)
... 9 more
INFO - Job job_local918163450_0001 running in uber mode : false
INFO - map 0% reduce 0%
INFO - Job job_local918163450_0001 failed with state FAILED due to: NA
INFO - Counters: 0
更多推荐
所有评论(0)