-키/값 페어로 작업하기


-기본

키/값을 가지는 RDD에 대한 연산을 제공하고 이런 RDD를 페어 RDD라고 명명함. 

페어 RDD 들은 각 키에 대해 병렬로 처리하거나, 데이터를 다시 그룹핑할 수 있으므로 유용함. 

텍스트 파일의 각 라인의 첫 단어가 키가 된다.


-생성

스칼라와 파이선의 경우는 rdd.map()으로 생성 자바인 경우는 mapRoPair()를 호출


-트랜스포메이션 함수 정리 


페어 RDD에 대한 트랜스포메이션과 페어RDD간의 트랜스 포메이션으로 나뉠 수 있다.


-페어 RDD에 대한 트랜스포메이션 ((Func)은 함수를 인자로 받는 트랜스포메이션)

reduceByKey(Func) : 동일 키에 대한 값들을 합친다. 

groupByKey() : 동일 키에 대한 값들을 그룹화한다.

combineByKey() : 다른 결과 타입을 써서 동일 키의 값들을 합친다.

mapValues (Func) : 키의 변경 없이 페어 RDD의 각 값에 함수를 적용한다.

flatMapValues (Func) : 페어 RDD의 각 값에 대해 반복자를 리턴하는 함수를 적용하고, 리턴받은 값들에 대해 기존키를 써서 키/값 쌍을 만든다. (토큰분리에 종종 이용)

Keys() : 키값을 돌려준다.

values() : 값들을 돌려준다.

sortBykey() : 키로 정렬된 RDD를 돌려준다.


-페어 RDD에 간의 트랜스포메이션

subtractByKey : 다른 쪽 RDD에 있는 키를 써서 RDD의 데이터를 삭제한다.

join : 두 RDD에 대해 inner join을 수행

rightOuterJoin : 첫번째 RDD의 키를 중심으로 조인

leftOuterJoin : 다른 쪽 RDD의 키를 중심으로 조인

cogroup : 동일키에 대해서 양쪽 RDD를 그룹화한다.









'hadoop,yarn, Hive > Spark' 카테고리의 다른 글

Pair RDD기본  (0) 2018.07.10
Spark 기본  (0) 2018.07.10
spark + intellij +maven 환경설정 및 기본예제  (0) 2018.07.10

1. 스파크 컴포넌트 

 - 스파크 코어 : 작업 스케쥴링, 메모리 관리, 장애복구, 저장 장치 연동, 분산데이터세트(RDD) 를 정의하는 API의 기본  

 - 스파크 SQL : 정형데이터를 처리하기 위한 패키지. 하이브 테이블, 파케이, JSON 지원. 

 - 스파크 스트리밍 : 실시간 데이터 스트림을 처리하는 컴포넌트 

 - MLlib: 머신러닝 라이브러리. 분류 회귀 클러스터링 협업필터링 

 - 그래프X : 그래프를 다루기 위한 라이브러리 

 - 클러스터 매니저 : yarn, 메소스의 클러스터 매이저 위에서 동작 가능 


2. 스파크 핵심 개념 

  - 클러스터 위에서 다양한 병렬 연산을 수행하는 드라이버 프로그램으로 구성

  - 드라이버 프로그램은 main 함수를 가지고 있으며, 분산데이터세트를 정의하고 그 데이터 세트에 연산 작업을 수행 

  - SparkContext 객체(연산클러스터에 대한 연결을 담당) 를 만들고 RDD를 생성함 

  - 연산의 실행을 위해 드라이버 프로그램은 익스큐터를 관리 

  

3. RDD(Resilient Distributed Dataset) 기초 

  - 분산된 데이터 요소의 모임 

  - 새로운 RDD를 만들어내거나 존재하는 RDD의 변형 또는 연산 

  - 내부적으로 스파크는 RDD에 있는 데이터들을 클러스터에 분배하고 연산들을 병렬화함

  - RDD는 사용자정의 클래스, python, java, scala 의 어떤 타압의 객체도 가질 수 있음. 

  - 한번 만들어진 RDD는 트랜스포메이션과 액션의 두가지 연산을 지원함. 

  - 트랜스포메이션은 존재하는 RDD에서 새로운 RDD를 만들어내는 것. 

     액션은 값을 계산(first() 등 )하고 드라이버 프로그램에 돌려주거나 HDFS에 저장하는 작업등. 


4.  RDD 생성, 연산 및 저장

  생성 - SparkContext.testFile(), parallelize() 

ex)

val  new SparkConf().setAppName("wordCount").setMaster(args(0))
val sc = new SparkContext(conf)
val filePath =conf = "/spark/README.txt"
val inputRDD = sc.textFile(filePath)

    연산 - filter(), map(), flatMap(), union(), take(), count(), distict(), intersection(), subtract(), cartesian()  등 

    저장 - saveAsTextFile(), saveAsSequenceFile() 



'hadoop,yarn, Hive > Spark' 카테고리의 다른 글

Pair RDD기본  (0) 2018.07.10
Spark 기본  (0) 2018.07.10
spark + intellij +maven 환경설정 및 기본예제  (0) 2018.07.10

1. 환경설정

http://kysepark.blogspot.com/2016/03/intellij-spark.html


2. pom.xml 에 Spark dependency 추가


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cyberx</groupId>
<artifactId>IntellijMavenProject</artifactId>
<version>1.0</version>
<name>${project.artifactId}</name>
<description>My wonderfull scala app</description>
<inceptionYear>2015</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>

<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.5</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<args>

</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>


3. scala object 생성 후 Edit configuration - program arguments에 local[*] 추가 

 


4. 기본 예제 작성 (New Scala class - kind 는 object)


import org.apache.spark.{SparkContext, SparkConf}

object wordCount {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf().setAppName("wordCount").setMaster(args(0))
val sc = new SparkContext(conf)
val filePath = "/spark/README.txt"
val inputRDD = sc.textFile(filePath)
val matchTerm = "spark"

val numMatches = inputRDD.filter(line => line.contains(matchTerm)).count()
val lines = inputRDD.count()
val firstLine = inputRDD.first()

println("%s lines in %s contains %s".format(numMatches, filePath, matchTerm))
println("%s lines".format(lines))
println("%s is first line".format(firstLine))
System.exit(0)
}
}


5. 실행 결과 확인



 




'hadoop,yarn, Hive > Spark' 카테고리의 다른 글

Pair RDD기본  (0) 2018.07.10
Spark 기본  (0) 2018.07.10
spark + intellij +maven 환경설정 및 기본예제  (0) 2018.07.10

+ Recent posts