Hjälp utvecklingen av webbplatsen och dela artikeln med vänner!

Introduktion till gnisttransformationer

En transformation är en funktion som returnerar en ny RDD genom att modifiera befintliga RDD(ar). RDD-ingången ändras inte eftersom RDD:er är oföränderliga. Alla transformationer exekveras av Spark på ett lat sätt- Resultaten beräknas inte direkt. Beräkning av transformationerna sker endast när en viss åtgärd utförs på RDD.

Typer of Transformations in Spark

De är brett kategoriserade i två typer:

  • Narrow Transformation: All data som krävs för att beräkna poster i en partition finns i en partition av den överordnade RDD:n. Det inträffar vid följande metoder:

map(), flatMap(), filter(), sample(), union() etc.

  • Wide Transformation: All data som krävs för att beräkna poster i en partition finns i mer än en partition i de överordnade RDD:erna. Det inträffar vid följande metoder:

distinct(), groupByKey(), reduceByKey(), join() , repartition() etc.

Exempel på gnisttransformationer

Här diskuterar vi de exempel som nämns nedan.

1. Smal omvandling

  • map(): Denna funktion tar en funktion som en parameter och tillämpar denna funktion på varje element i RDD.

Kod:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelize(Array(10,15,50,100))
"println(Bas RDD är:)
">"rdd.foreach(x=print(x+ ))
println()
>val rddNew=rdd.map(x=x+10)
"println(RDD efter tillämpning av MAP-metoden:)
""rddNew.foreach(x=>print(x+ ))

Utdata:

I ovanstående MAP-metoden lägger vi till varje element med 10 och det återspeglas i utdata.

  • FlatMap(): Det liknar map men det kan generera flera utdataobjekt som motsvarar ett indataobjekt. Funktionen måste alltså returnera en sekvens istället för ett enda objekt.

Kod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array(1:2:3,4:5:6))
">"val rddNew=rdd.flatMap(x=x.split(:))
"rddNew.foreach(x=>print(x+ ))

Utdata:

Denna funktion som skickas som parameter delar upp varje inmatning med ":" och returnerar en array och FlatMap-metoden plattar ut arrayen.

  • filter(): Det tar en funktion som en parameter och returnerar alla element i RDD för vilka funktionen returnerar sant.

Kod:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd=sc.parallelize(Array(com.whatsapp.prod,com.facebook.prod,com.instagram.prod,com.whatsapp.test))
""println(Bas RDD är:)
">"rdd.foreach(x=print(x+ ))
println()
>"val rddNew=rdd.filter (x=!x.contains(test))
"println(RDD efter tillämpning av MAP-metoden:)
""rddNew.foreach(x=>print(x+ ))

Utdata:

I koden ovan tar vi strängar som inte har ordet "test".

  • sample(): Den returnerar en bråkdel av datan, med eller utan ersättning, med hjälp av ett givet slumptalsgeneratorfrö (det här är dock valfritt).

Kod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallellize(Array(1,2,3,4,5,6,7,10,11,12,15,20,50))
val rddNew=rdd.sample(false,.5)
"rddNew.foreach(x=>print(x+ ))

Utdata:

I koden ovan får vi slumpmässiga prov utan ersättning.

  • union(): Den returnerar föreningen av käll-RDD och RDD som skickas som parameter.

Kod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5))
val rdd2=sc.parallelize(Array(-1,-2,-3,-4,-5))
val rddUnion=rdd.union(rdd2)
"rddUnion.foreach(x=>print(x+ ))

Utdata:

Den resulterande RDD rddUnion innehåller alla element från rdd och rdd2.

2. Vida transformationer

  • distinct(): Denna metod returnerar de distinkta elementen i RDD.

Kod:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelize(Array(1,1,3,4,5,5,5))
"println(Bas RDD är:)
">"rdd.foreach(x=print(x+ ))
println()
val rddNew=rdd.distinct()
"println(RDD efter tillämpning av MAP-metoden:)
""rddNew.foreach(x=>print(x+ ))

Utdata:

vi får de distinkta elementen 4,1,3,5 i utgången.

  • groupByKey(): Denna funktion är tillämplig på parvisa RDD:er. En parvis RDD är en vars varje element är en tupel där det första elementet är nyckeln och det andra elementet är värdet. Denna funktion grupperar alla värden som motsvarar en nyckel.

Kod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallellize(Array((a,1),(b,2),(a,3),(b,10),(a,100)))
"val rddNew=rdd.groupByKey()
"rddNew.foreach(x=>print(x+ ))

Utdata:

Som förväntat är alla värden för tangenterna "a" och "b" grupperade tillsammans.

  • reduceByKey(): Denna operation är även tillämplig på parvisa RDD:er. Den aggregerar värdena för varje nyckel enligt en medföljande reduceringsmetod som måste vara av typen (v,v)=v.

Kod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallellize(Array((a,1),(b,2),(a,3),(b,10),(a,100),(c,50)))
">val rddNew=rdd.reduceByKey((x,y)=x+y )
"rddNew.foreach(x=>print(x+ ))

Utdata:

I ovanstående fall summerar vi alla värden för en nyckel.

  • join(): Join-operationen är tillämplig på parvisa RDD:er. Joinmetoden kombinerar två datauppsättningar baserade på nyckeln.

Kod:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd1=sc.parallelize(Array((nyckel1,10),(nyckel2,15),(nyckel3,100)))
""val rdd2=sc.parallelize(Array((nyckel2,11),(nyckel2,20),(nyckel1,75)))
"val rddJoined=rdd1.join(rdd2)
"println(RDD efter join:)
""rddJoined.foreach(x=>print(x+ ))

Utdata:

  • repartition(): Den blandar om data i RDD:n slumpmässigt till antalet partitioner som skickas som parameter. Det kan både öka och minska partitionerna.

Kod:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallellize(Array(1,2,3,4,5,10,15,18,243,50),10)
"println(Partitioner före: +rdd.getNumPartitions)
"val rddNew=rdd.repartition(15)
"println(Partitioner efter: +rddNew.getNumPartitions)"

Utdata:

I ovanstående fall ökar vi partitionerna från 10 till 15.

Hjälp utvecklingen av webbplatsen och dela artikeln med vänner!