MR python joiner
继承http://blog.csdn.net/weihongrao/article/details/16826763 把上次得出的数据在和group维度表进行关联得到group名称mapper:joinm#!/usr/bin/pythonimport sysclass mapper: def map(self): for line in
继承http://blog.csdn.net/weihongrao/article/details/16826763 把上次得出的数据在和group维度表进行关联得到group名称
mapper:joinm
#!/usr/bin/python
import sys
class mapper:
def map(self):
for line in sys.stdin:
try:
ori=line.strip()
if ori.count(',')>=4:
gp=ori.split(',')
print gp[0]+'\t'+gp[2]
else:
if ori.count('\t')>=2:
fact=ori.split('\t')
print fact[1]+'\t'+fact[0]+'\t'+fact[2]+'\t'+fact[3]+'\t'+fact[4]
except:
pass
if __name__=='__main__':
mymapper=mapper()
mymapper.map()
reducer:joinr:
#!/usr/bin/python
import sys
key=''
name='null'
lis=[]
for line in sys.stdin:
fros=line.strip().split('\t')
#print line
keya=fros[0]
if key=='':
key=keya
if key==keya and line.count('\t')>3:
lis.append(fros)
if line.count('\t')<3:
name=fros[1].strip()
if key!=keya:
for val in lis:
val[0]=name
tmp=''
for i in val:
if tmp!='':
tmp=tmp+'\t'+i
else:
tmp=i
print tmp
key=keya
lis=[]
if line.count('\t')<3:
name=fros[1].strip()
else:
lis.append(fros)
for val in lis:
val[0]=name
tmp=''
for i in val:
if tmp!='':
tmp=tmp+'\t'+i
else:
tmp=i
print tmp
执行:
hadoop jar /usr/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=1 \
-input /rs2/groupaccountdim.csv \
-input /rs2/output2s/part-00000 \
-output /rs2/outputjoin/ \
-mapper joinm.py \
-reducer joinr.py \
-file joinm.py \
-file joinr.py;
可以利用python的groupby将上面的reducer重写成:
#!/usr/bin/python
import sys
from itertools import groupby
from operator import itemgetter
def readMap(file):
for line in file:
keyline=line.strip().split('\t',1)
key=keyline[0]
value=keyline[1]
yield dict(key=key,value=value)
#yield [key,value]
def reduce():
data=readMap(sys.stdin)
for keys,values in groupby(data,lambda x:x.get('key')):
vlist=(va.get('value').split('\t') for va in values )
name=''
for i in vlist:
if len(i)<2:
name=i[0].strip()
break
for i in vlist:
if len(i)>2:
tmp=name
for j in i:
tmp=tmp+'\t'+j
print tmp
if __name__=='__main__':
reduce()
更多推荐
所有评论(0)