JavaRush /Курсы /All lectures for AM purposes /BigData: MapReduce ծրագրերի մշակում: Մեթոդներ և ռազմավարո...

BigData: MapReduce ծրագրերի մշակում: Մեթոդներ և ռազմավարություններ

All lectures for AM purposes
1 уровень , 338 лекция
Открыта

5.1 Map only job

Ժամանակն է նկարագրել տարբեր մեթոդներ, որոնք թույլ են տալիս MapReduce-ը արդյունավետ օգտագործել գործնական խնդիրների լուծման համար, ինչպես նաև ցուցադրել որոշ հետաքրքիր հատկություններ Hadoop-ի, որոնք թույլ են տալիս պարզեցնել մշակումն կամ զգալիորեն արագացնել MapReduce խնդրի կատարման ընթացքը կլաստերում:

Ինչպես մենք հիշում ենք, MapReduce-ը բաղկացած է Map, Shuffle և Reduce փուլերից: Սովորաբար, Shuffle փուլը ամենածանրն է գործնական խնդիրներում, քանի որ այս փուլում տվյալների դասակարգում է տեղի ունենում: Իրականում կան մի շարք խնդիրներ, որոնք հնարավոր է լուծել միայն Map փուլով: Օրինակ, նման խնդիրներից են:

  • Տվյալների ֆիլտրացում (օրինակ, «Գտնել բոլոր գրառումները 123.123.123.123 IP հասցեից» web սերվերի լոգերում);
  • Տվյալների փոխակերպում («Ջնջել սյունակը csv-լոգերում»);
  • Տվյալների ներբեռնում և բեռնաբեռնում արտաքին աղբյուրից («Ավելացնել բոլոր գրառումները լոգից տվյալների բազայում»):

Այդպիսի խնդիրները լուծվում են Map-Only օգնությամբ: Hadoop-ում Map-Only առաջադրանք ստեղծելու համար անհրաժեշտ է նշել զրոյական քանակությամբ reducer’ներ:

Օրինակ, Map-Only առաջադրանքի կոնֆիգուրացիա Hadoop-ում:

Native interface Hadoop Streaming Interface

Ուղարկել զրոյական քանակությամբ reducers job-ի կոնֆիգուրացիայում:

job.setNumReduceTasks(0); 

Ոչ մի reducer չենք նշում և նշում ենք զրոյական քանակությամբ reducers: Օրինակ:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Map Only jobs-ը իրականում շատ օգտակար կարող է լինել: Օրինակ, Facetz.DCA պլատֆորմում օգտագործվում է հենց մեկ մեծ map-only, որի ամեն mapper-ը ընդունում է տվյալներ օգտագործողից և վերադարձնում նրա բնութագրերը:

5.2 Combine

Ինչպես արդեն նշեցի, սովորաբար ամենածանր պահը՝ Map-Reduce առաջադրանքի կատարման ժամանակ, shuffle փուլն է։ Դա այն պատճառով է, որ միջանկյալ արդյունքները (mapper-ի ելքը) գրվում են սկավառակ, դասակարգվում և փոխանցվում ցանցով: Բայց կան խնդիրներ, որտեղ այդպիսի պահվածքը այնքան էլ խելամիտ չէ։ Օրինակ, թե՛ բառերի հաշվարկում կարելի է նախնական ավելացնել արդյունքները mapper-ներից մի քանի հանգույցներից map-reduce առաջադրանքի մեկ մեքենայի վրա և փոխանցել reducer-ին արդեն ամփոփված արժեքներ յուրաքանչյուր մեքենայի համար:

Hadoop-ում կարելի է սահմանել կոմբինացնող ֆունկցիա, որը կվերամշակի mapper-ների որոշ մասերի ելքը: Կոմբինացնող ֆունկցիան շատ նման է reduce-ին - այն ընդունում է mapper-ներից որոշ մասերի ելքը և վերադարձնում է այդ mapper-ների համար համախմբված արդյունքը, այդ իսկ պատճառով շատ հաճախ reducer-ը օգտագործվում է նաև որպես combiner: Կարևոր տարբերությունը reduce-ից՝ բոլոր արժեքները, որոնք համապատասխանում են միևնույն բանալիին, կոմբինացնող ֆունկցիայի վրա չեն հայտնվում:

Ավելին, Hadoop-ը չի երաշխավորում, որ կոմբինացնող ֆունկցիան ընտրված mapper-ի համար կկատարվի: Այդ իսկ պատճառով կոմբինացնող ֆունկցիան միշտ չէ, որ կիրառվում է, օրինակ, միջին արժեքի որոնման դեպքում բանալիի համար: Սակայն այն դեպքերում, երբ կոմբինացնող ֆունկցիան կիրառելի է, այն օգտագործելը կարող է զգալիորեն արագացնել MapReduce առաջադրանքի կատարման ընթացքը:

Combiner-ի օգտագործումը hadoop-ում:

Native Interface Hadoop streaming

Job-ի կոնֆիգուրացիայի ժամանակ նշել Combiner դասը: Սովորաբար այն նույնը է, ինչ Reducer-ը:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Կոմանդային տողի պարամետրերի մեջ նշել -combiner հրամանը: Սովորաբար այս հրամանը նույնն է, ինչ reducer-ի հրամանը: Օրինակ:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce առաջադրանքների շղթաներ

Կան իրավիճակներ, երբ մի MapReduce-ով հնարավոր չէ լուծել խնդիրը: Օրինակ, դիտարկենք WordCount խնդրի մի փոքր փոխված տարբերակը: Կա մի շարք տեքստային փաստաթղթեր, և անհրաժեշտ է հաշվարկել, թե քանի բառ հանդիպել է 1-ից 1000 անգամ շարքից, քանի բառ 1001-ից 2000, եւ այլն: Կարևորը 2 MapReduce job է:

  • Փոփոխված wordcount, որը կկատարի յուրաքանչյուր բառի համար՝ թե որ միջակայքի պատկանում է;
  • MapReduce, որը կհաշվի, թե քանիս անգամ յուրաքանչյուր միջակայքը հանդիպել է առաջին MapReduce-ի ելքից:

Լուծումը կեղծ կոդում:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Hadoop-ում MapReduce առաջադրանքների հաջորդականություն կատարելու համար բավական է պարզապես երկրորդ խնդրի մուտքային տվյալներ նշված ելքային պանակը առաջինից և հերթով գործարկել դրանք:

Գործնականում MapReduce առաջադրանքների շղթաները կարող են ներկայացնել բավականին բարդ հաջորդականություններ, որոնցում MapReduce առաջադրանքները կարող են լինել ինչպես հաջորդական, այնպես էլ զուգահեռային: Նման լուծումների կառավարման պարզեցման համար կան առանձնահատուկ գործիքներ, օրինակ oozie-ն և luigi-ն:

5.4 Distributed cache

Hadoop-ում կարևոր մեխանիզմ է Distributed Cache-ը: Distributed Cache-ը թույլ է տալիս ավելացնել ֆայլեր (օրինակ՝ տեքստային ֆայլեր, արխիվներ, jar-ֆայլեր) այն միջավայրում, որտեղ կատարվում է MapReduce խնդիր:

Կարելի է ավելացնել ֆայլեր, որոնք պահվում են HDFS-ում, լոկալ ֆայլեր (այն մեքենայի համար լոկալ, որտեղից կատարվում է խնդիրների գործարկումը): Ես արդեն անուղղակիորեն ցույց եմ տվել, թե ինչպես օգտագործել Distributed Cache-ը hadoop streaming-ի հետ՝ ավելացնելով -file տարբերակով mapper.py և reducer.py ֆայլեր: Իրականում հնարավոր է ավելացնել ոչ միայն mapper.py և reducer.py, այլ նաև առան<|disc_score_mid|>6նդ յուրաքանչուր ֆայլ, և չես կարող օգտվել դրանցից այնպես, ինչպես նրանք լինեն լոկալ պանակում:

Distributed Cache օգտագործում:

Native API

//կոնֆիգուրացիա Job-ի 
JobConf job = new JobConf(); 
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job); 
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); 
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); 
 
//օրինակ օգտագործման mapper-ում: 
public static class MapClass extends MapReduceBase   
implements Mapper<K, V, K, V> { 
 
 private Path[] localArchives; 
 private Path[] localFiles; 
  
 public void configure(JobConf job) { 
   // կերող ենք կաշացված տվյալները արխիվներից 
   File f = new File("./map.zip/some/file/in/zip.txt"); 
 } 
  
 public void map(K key, V value,  
             	OutputCollector<K, V> output, Reporter reporter)  
 throws IOException { 
   // տվյալները օգտագործում ենք այստեղ 
   // ... 
   // ... 
   output.collect(k, v); 
 } 
}
Hadoop Streaming

#նշում ենք ֆայլերը, որոնք անհրաժեշտ է ավելացնել distributed cache-ի պարամետր՝ –files մեջ: Պարամետր –files-ը պետք է լինի մյուս պարամետրերից առաջ:

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

օրինակ օգտագործման:

import sys 
# պարզապես կարդում ենք ֆայլը լոկալ պանակից 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#մուտքագրվածը մշակել 
#օգտագործել տվյալները այստեղ

5.5 Reduce Join

Նրանք, ովքեր սովոր են աշխատելու հարաբերական բազաների հետ, հաճախ օգտվում են շատ հարմար Join գործողությունից, որը թույլ է տալիս համատեղ մշակել որոշակի աղյուսակի բովանդակությունը՝ միավորելով դրանք որոշակի բանալիով։ Մեծ տվյալների հետ աշխատանքում նման խնդիր նույնպես երբեմն առաջանում է։ Դիտարկենք այս օրինակը:

Կան երկու web-server-ների լոգեր, յուրաքանչյուրը հետևյալ տեսքով՝:

t\t

Օրինակի լոգ պատճեն՝

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Անհրաժեշտ է հաշվել, թե IP հասցեներից յուրաքանչյուրն առավել հաճախ որ սերվերում է եղել։ Արդյունքը պետք է ներկայացվի այս տեսքով՝

\t

Բնօրինակի արդյունքի հատված՝

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Դժբախտաբար, հարաբերական բազաների նման, ընդհանուր դեպքում երկու լոգերի միավորումը բանալիով (մեր համապատասխանում՝ IP հասցեով) բավականին ծանր գործողություն է և լուծվում է 3 MapReduce և Reduce Join պատերնով:

ReduceJoin-ը այսպես է աշխատում։

1) Յուրաքանչյուր մուտքային լոգի համար գործարկվում է բացարձակ առանձին MapReduce (ընդամենը Map), որը մուտքային տվյալները վերածվում է հետևյալ տեսքով:

key -> (type, value

Որտեղ key -ը այն բանալին է, որով պետք է միավորել աղյուսակները, Type -ը աղյուսակա տեսակը (այս դեպքում՝ first կամ second), իսկ Value -ը ցանկացած լրացուցիչ տվյալներ բանալիի հետ կապված:

2) Երկու MapReduce-ի ելքերը փոխանցվում են 3-րդ MapReduce-ին, որն իրականացնում է միավորումը: Այս MapReduce-ն ունի դատարկ Mapper, որը պարզապես պատճենում է մուտքային տվյալները: Shuffle-ը տարածում է տվյալները բանալիների համար և փոխանցում է reducer-ին՝ այս տեսքով:

key -> [(type, value)]

Կարևոր է, որ այս պահին reducer-ի վրա հայտնվում են երկու լոգերի գրառումները, և կարելի է տեսակով որոշել, թե որ լոգից է գնահատվում արժեքը: Այդպիսով բավարար տվյալներ են խնդրի լուծման համար: Մեր դեպքում reducer-ը պարզապես պետք է հաշվի յուրաքանչյուր բանալիի գրառումներից, որը type-ը հանդիպել է ավելի հաճախ և դուրս բերի այդ type-ը:

5.6 MapJoin

ReduceJoin պատերը նկարագրում են ընդհանուր լոգերի միավորման ընդհանուր դեպքը բանալիով: Բայց կա մի հատված, որտեղ խնդիրը կարելի է զգալիորեն պարզեցնել և արագացնել: Դա այն դեպքն է, երբ լոգերից մեկը զգալիորեն փոքր է մյուսից: Դիտարկենք հետևյալ խնդիրը:

Կան երկու լոգեր։ Առաջին լոգը web-server-ի լոգ է (նախորդ խնդրում նման), իսկ երկրորդ ֆայլ (100Kb չափով) URL-> Թեմատիկա համապատասխանությունն է: 2-րդ ֆայլի համազեկությունը՝

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Յուրաքանչյուր IP հասցեից անհրաժեշտ է հաշվարկել, թե որ կատեգորիայի էջերը նույն IP հասցեից առավել հաճախ բեռնվել են:

Այս դեպքում նույնպես անհրաժեշտ է կատարել 2 լոգերի միավորում URL-ով: Սակայն այս դեպքում անհրաժեշտ չէ գործարկել 3 MapReduce, քանի որ երկրորդ լոգն ամբողջությամբ կտեղավորվի հիշողության մեջ: Այս խնդրի լուծման համար 1 MapReduce-ով պետք է բեռնենք երկրորդ լոգը Distributed Cache-ում, իսկ Mapper-ի սկսման ժամանակ պարզապես պահպանենք այն հիշողության մեջ ՝ տեղադրելով այն՝ -> topic բառարանում:

Այնուհետև խնդիրն այսպես է լուծվում:

Map:

# գտնում ենք առաջին լոգի էջերի թեման 
input_line -> [ip,  topic] 

Reduce:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce-ը ստանում է ip և բոլոր թեմաների ցուցակը, պարզապես հաշվարկում է, թե որ թեման ամենաշատը հանդիպել է։ Այսպիսով խնդիրը լուծված է մեկ MapReduce-ի միջոցով, իսկ սեփականապես Join-ը տեղի է ունենում map-ում (դրա պատճառով եթե անհրաժեշտ չէր լրացուցիչ ագրեգացիա բանալիի հետ՝ հնարավոր էր լուծել MapOnly job-ով).

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ