spark flink hadoop mr on python 实践
目录一.基本环境准备:python3.5+pipjava 1.8maven version >=3.3.0二.Pyspark Test1. pip install pyspark2.code(1)streaming3.执行脚本三.Flink Test1.构建python2.flink's 'stanalone ( 其他环境)3.pyflink.py4. 运行命令5. result:三.Map
Basic
-
python3.5+
-
pip
-
java 1.8
-
maven version >=3.3.0
Pyspark
1. pip install pyspark
2.code
(1)streaming
import sys
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
import os
# os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python3.7"
def pysparkStreamingWordCount(host,port):
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream(host, port)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
if __name__ == "__main__":
if len(sys.argv) > 2 :
host = sys.argv[1]
port = int(sys.argv[2])
pysparkStreamingWordCount(host,port)
else:
pysparkStreamingWordCount("", "")
(2)batch
import sys
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
import os
#os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python3.7"
def pysparkBatchWordCount(input,output):
if(input==""):
input="/tmp/input"
if (output == ""):
output = "/tmp/output"
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
lines = sc.textFile(input)
words=lines.flatMap(lambda sentence: sentence.split(" "))
pairs =words.map(lambda word:(word,1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.coalesce(1).saveAsTextFile(output)
print(wordCounts.take(1000))
if __name__ == "__main__":
if len(sys.argv) > 2 :
input = sys.argv[1]
output = sys.argv[2]
pysparkBatchWordCount(input,output)
else:
pysparkBatchWordCount("", "")
3.执行脚本
bin/spark-submit --master yarn \
--py-files 项目.zip \ #(项目jar) ..
--jars ./resources/mysql-connector-java-5.1.39.jar \ #(引入的其他python的jar包)
--executor-memory 3g --driver-memory 3g --queue etl --num-executors 10 \
./main/wordcont/pyspark.py \ #**(脚本的项目路径)
/tmp/input /tmp/output
PyFlink
1.构建python
git clone https://github.com/apache/flink.git
git fetch origin release-1.9 && git checkout -b release-1.9 origin/release-1.9
mvn clean install -DskipTests -Dfast
cd flink/flink-python; python3 setup.py sdist bdist_wheel
pip3 install dist/*.tar.gz //python必须在3.5+
这里可以参考官网:
构建flink-python: Apache Flink 1.10 Documentation: Building Flink from Source
python api: Apache Flink 1.10 Documentation: Python API Tutorial
有俩种方式:
方式一: 编译后构建环境
方式二: 另外一种: python -m pip install apache-flink=1.9 (这个调试起来麻烦)
2.flink's 'stanalone ( 其他环境)
wget https://archive.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.12.tgz
tar xvf flink-1.9.0-bin-scala_2.12.tgz
cd flink-1.9.0/bin
./start-cluster.sh
3.pyflink.py
准备数据源默认是:
/tmp/input 的(本地磁盘或者hdfs://...)
指定path: /tmp/output
import sys
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
def wordCount(inputPath,outputPath):
if(inputPath==""):
inputPath="/tmp/input"
if (outputPath == ""):
outputPath = "/tmp/output"
print ("intput file path: ",inputPath,"output file path:",outputPath)
exec_env = ExecutionEnvironment.get_execution_environment()
# 注意: solt>1
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
t_env.connect(FileSystem().path(inputPath)) \
.with_format(OldCsv()
.line_delimiter(' ')
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.register_table_source('mySource')
t_env.connect(FileSystem().path(outputPath, )) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.register_table_sink('mySink')
t_env.scan('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
t_env.execute("python_job")
if __name__ == "__main__":
if len(sys.argv) > 2 :
inputPath = sys.argv[1]
outputPath = sys.argv[2]
wordCount(inputPath,outputPath)
else:
wordCount("", "")
4. 运行命令
cd flink-1.9.0/bin && ./flink run -py ~/demo/pyflink.py /temp/input /temp/output
5. result:
访问服务器的结果或者到指定的输出文件查找信息
pyMapReduce
参考: python实现Mapreduce的wordcount_飞翔的荷兰人号z的博客-CSDN博客
1.code
mapper.py
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print(word,"\t", 1)
reducer.py
#!/usr/bin/env python
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print (current_word,"\t", current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print (current_word, "\t",current_count)
test.sh
(可以简单确认py是否有错)
echo "foo foo quux labs foo bar quux" | ./mapper.py
echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py
chmod a+x $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar (默认是没有权限的)
hdfs dfs -mkdir -p /root/data
hdfs dfs -put input.txt /root /data
run.sh
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar \
-mapper mapper.py \
-file mapper.py \
-reducer reducer.py \
-file reducer.py \
-input /root/data/input* -output /ooxx/output/
更多推荐
所有评论(0)