Basic

  1. python3.5+

  2. pip

  3. java 1.8

  4. maven version >=3.3.0

Pyspark

1. pip install pyspark

2.code

(1)streaming

import sys
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
import os
# os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python3.7"
def pysparkStreamingWordCount(host,port):
    sc = SparkContext("local[2]", "NetworkWordCount")
    ssc = StreamingContext(sc, 5)
    lines = ssc.socketTextStream(host, port)
    # Split each line into words
    words = lines.flatMap(lambda line: line.split(" "))
    # Count each word in each batch
    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    wordCounts.pprint()
    ssc.start()  # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate

if __name__ == "__main__":
    if len(sys.argv) > 2 :
        host = sys.argv[1]
        port = int(sys.argv[2])
        pysparkStreamingWordCount(host,port)
    else:
        pysparkStreamingWordCount("", "")

(2)batch

import sys
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
import os
#os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python3.7"
def pysparkBatchWordCount(input,output):
    if(input==""):
        input="/tmp/input"
    if (output == ""):
        output = "/tmp/output"
    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc=SparkContext.getOrCreate(conf)
    lines = sc.textFile(input)
    words=lines.flatMap(lambda sentence: sentence.split(" "))
    pairs =words.map(lambda word:(word,1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    wordCounts.coalesce(1).saveAsTextFile(output)
    print(wordCounts.take(1000))

if __name__ == "__main__":
    if len(sys.argv) > 2 :
        input = sys.argv[1]
        output = sys.argv[2]
        pysparkBatchWordCount(input,output)
    else:
        pysparkBatchWordCount("", "")

3.执行脚本

bin/spark-submit --master yarn \

--py-files 项目.zip \ #(项目jar) ..

--jars ./resources/mysql-connector-java-5.1.39.jar \ #(引入的其他python的jar包)

--executor-memory 3g --driver-memory 3g --queue etl --num-executors 10 \

./main/wordcont/pyspark.py \ #**(脚本的项目路径)

/tmp/input /tmp/output

PyFlink 

1.构建python

git clone https://github.com/apache/flink.git

git fetch origin release-1.9 && git checkout -b release-1.9 origin/release-1.9

mvn clean install -DskipTests -Dfast

cd flink/flink-python; python3 setup.py sdist bdist_wheel

pip3 install dist/*.tar.gz //python必须在3.5+

这里可以参考官网:

构建flink-python: Apache Flink 1.10 Documentation: Building Flink from Source

python api: Apache Flink 1.10 Documentation: Python API Tutorial

有俩种方式:

                方式一: 编译后构建环境

                方式二: 另外一种: python -m pip install apache-flink=1.9 (这个调试起来麻烦)

2.flink's 'stanalone ( 其他环境)

wget https://archive.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.12.tgz

tar xvf flink-1.9.0-bin-scala_2.12.tgz

cd flink-1.9.0/bin

./start-cluster.sh

3.pyflink.py

准备数据源默认是:

/tmp/input 的(本地磁盘或者hdfs://...)

指定path: /tmp/output

import sys
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

def wordCount(inputPath,outputPath):
    if(inputPath==""):
        inputPath="/tmp/input"
    if (outputPath == ""):
        outputPath = "/tmp/output"
    print ("intput file path: ",inputPath,"output file path:",outputPath)
    exec_env = ExecutionEnvironment.get_execution_environment()
    # 注意: solt>1
    exec_env.set_parallelism(1)
    t_config = TableConfig()
    t_env = BatchTableEnvironment.create(exec_env, t_config)

    t_env.connect(FileSystem().path(inputPath)) \
        .with_format(OldCsv()
                     .line_delimiter(' ')
                     .field('word', DataTypes.STRING())) \
        .with_schema(Schema()
                     .field('word', DataTypes.STRING())) \
        .register_table_source('mySource')

    t_env.connect(FileSystem().path(outputPath, )) \
        .with_format(OldCsv()
                     .field_delimiter('\t')
                     .field('word', DataTypes.STRING())
                     .field('count', DataTypes.BIGINT())) \
        .with_schema(Schema()
                     .field('word', DataTypes.STRING())
                     .field('count', DataTypes.BIGINT())) \
        .register_table_sink('mySink')

    t_env.scan('mySource') \
        .group_by('word') \
        .select('word, count(1)') \
        .insert_into('mySink')

    t_env.execute("python_job")

if __name__ == "__main__":
    if len(sys.argv) > 2 :
        inputPath = sys.argv[1]
        outputPath = sys.argv[2]
        wordCount(inputPath,outputPath)
    else:
        wordCount("", "")

4. 运行命令

cd flink-1.9.0/bin && ./flink run -py ~/demo/pyflink.py /temp/input /temp/output

5. result:

访问服务器的结果或者到指定的输出文件查找信息

pyMapReduce

参考: python实现Mapreduce的wordcount_飞翔的荷兰人号z的博客-CSDN博客

1.code

mapper.py

#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print(word,"\t", 1)

reducer.py

#!/usr/bin/env python

import sys
current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

        # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print (current_word,"\t", current_count)
        current_count = count
        current_word = word

    # do not forget to output the last word if needed!
if current_word == word:
    print (current_word, "\t",current_count)

test.sh 

(可以简单确认py是否有错)

echo "foo foo quux labs foo bar quux" | ./mapper.py

echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py

chmod a+x $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar (默认是没有权限的)

hdfs dfs -mkdir -p /root/data

hdfs dfs -put input.txt /root /data

run.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar \

-mapper mapper.py \

-file mapper.py \

-reducer reducer.py \

-file reducer.py \

-input /root/data/input* -output /ooxx/output/

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐