继承http://blog.csdn.net/weihongrao/article/details/16826763 把上次得出的数据在和group维度表进行关联得到group名称

mapper:joinm


#!/usr/bin/python

import sys

class mapper:
    def map(self):
        for line in sys.stdin:
            try:
                ori=line.strip()
                if ori.count(',')>=4:
                    gp=ori.split(',')
                    print gp[0]+'\t'+gp[2]
                else:
                    if ori.count('\t')>=2:
                        fact=ori.split('\t')
                        print fact[1]+'\t'+fact[0]+'\t'+fact[2]+'\t'+fact[3]+'\t'+fact[4]
            except:
                pass


if __name__=='__main__':
    mymapper=mapper()
    mymapper.map()

reducer:joinr:

#!/usr/bin/python

import sys
key=''
name='null'
lis=[]
for line in sys.stdin:
    fros=line.strip().split('\t')
    #print line
    keya=fros[0]
    if key=='':
        key=keya

    if  key==keya and line.count('\t')>3:
        lis.append(fros)
        if line.count('\t')<3:
            name=fros[1].strip()

    if key!=keya:
        for val in lis:
            val[0]=name
            tmp=''
            for i in val:
                if tmp!='':
                    tmp=tmp+'\t'+i
                else:
                    tmp=i
            print tmp
        key=keya
        lis=[]
        if line.count('\t')<3:
            name=fros[1].strip()
        else:
            lis.append(fros)
        
for val in lis:
    val[0]=name
    tmp=''
    for i in val:
        if tmp!='':
            tmp=tmp+'\t'+i
        else:
            tmp=i
    print tmp
              
        
    




执行:

hadoop jar /usr/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=1 \
-input /rs2/groupaccountdim.csv \
-input  /rs2/output2s/part-00000 \
-output /rs2/outputjoin/ \
-mapper joinm.py \
-reducer joinr.py \
-file joinm.py \
-file joinr.py;



可以利用python的groupby将上面的reducer重写成:

#!/usr/bin/python
import sys
from itertools import groupby
from operator import itemgetter


def readMap(file):
    for line in file:
        keyline=line.strip().split('\t',1)
        key=keyline[0]
        value=keyline[1]
        yield dict(key=key,value=value)
        #yield [key,value]

def reduce():
    data=readMap(sys.stdin)
    for keys,values in groupby(data,lambda x:x.get('key')):
        vlist=(va.get('value').split('\t') for va in values )
        name=''
        for i in vlist:
            if len(i)<2:
                name=i[0].strip()
                break
        for i in vlist:
            if len(i)>2:
                tmp=name
                for j in i:
                    tmp=tmp+'\t'+j
                print tmp

        


if __name__=='__main__':
    reduce()


Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐