最近因为工作需要,研究了下spark,因为scala还不熟,所以先学习了java的spark程序写法,下面是我的简单测试程序的代码,大部分函数的用法已在注释里
面注明。
我的环境:hadoop 2.2.0
spark-0.9.0
scala-2.10.3
jdk1.7
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class mysparktest {
public static void main(String[] args) throws Exception {
//context ,用于读文件 ,类似于scala的sc
//格式为:
// JavaSparkContext(master: String, appName: String, sparkHome: String, jars: Array[String], environment: Map[String, String])
JavaSparkContext ctx = new JavaSparkContext("yarn-standalone", "JavaWordCount",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(mysparktest.class));
//也可以使用ctx获取环境变量,例如下面的语句
System.out.println("spark home:"+ctx.getSparkHome());
//一次一行,String类型 ,还有hadoopfile,sequenceFile什么的 ,可以直接用sc.textFile("path")
JavaRDD<String> lines = ctx.textFile(args[1], 1); //java.lang.String path, int minSplits
lines.cache(); //cache,暂时放在缓存中,一般用于哪些可能需要多次使用的RDD,据说这样会减少运行时间
//collect方法,用于将RDD类型转化为java基本类型,如下
List<String> line = lines.collect();
for(String val:line)
System.out.println(val);
//下面这些也是RDD的常用函数
// lines.collect(); List<String>
// lines.union(); javaRDD<String>
// lines.top(1); List<String>
// lines.count(); long
// lines.countByValue();
/**
* filter test
* 定义一个返回bool类型的函数,spark运行filter的时候会过滤掉那些返回只为false的数据
* String s,中的变量s可以认为就是变量lines(lines可以理解为一系列的String类型数据)的每一条数据
*/
JavaRDD<String> contaninsE = lines.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return (s.contains("they"));
}
});
System.out.println("--------------next filter's result------------------");
line = contaninsE.collect();
for(String val:line)
System.out.println(val);
/**
* sample test
* sample函数使用很简单,用于对数据进行抽样
* 参数为:withReplacement: Boolean, fraction: Double, seed: Int
*
*/
JavaRDD<String> sampletest = lines.sample(false,0.1,5);
System.out.println("-------------next sample-------------------");
line = sampletest.collect();
for(String val:line)
System.out.println(val);
/**
*
* new FlatMapFunction<String, String>两个string分别代表输入和输出类型
* Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
*
* flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
* 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
* 可以这样写 :
*/
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
String[] words=s.split(" ");
return Arrays.asList(words);
}
});
/**
* map 键值对 ,类似于MR的map方法
* pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
* 需要重写call方法实现转换
*/
JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
//A two-argument function that takes arguments
// of type T1 and T2 and returns an R.
/**
* reduceByKey方法,类似于MR的reduce
* 要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
*/
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) { //reduce阶段,key相同的value怎么处理的问题
return i1 + i2;
}
});
//备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
// reduce方法会对输入进来的所有数据进行两两运算
/**
* sort,顾名思义,排序
*/
JavaPairRDD<String,Integer> sort = counts.sortByKey();
System.out.println("----------next sort----------------------");
/**
* collect方法其实之前已经出现了多次,该方法用于将spark的RDD类型转化为我们熟知的java常见类型
*/
List<Tuple2<String, Integer>> output = sort.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1 + ": " + tuple._2());
}
/**
* 保存函数,数据输出,spark为结果输出提供了很多接口
*/
sort.saveAsTextFile("/tmp/spark-tmp/test");
// sort.saveAsNewAPIHadoopFile();
// sort.saveAsHadoopFile();
System.exit(0);
}
}
代码编写完成之后,打包上传到Linux上,编写spark程序的执行脚本:
#! /bin/bash
export YARN_CONF_DIR=/usr/lib/cloud/hadoop/hadoop-2.2.0/etc/hadoop
export SPARK_JAR=/usr/lib/cloud/spark/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
/usr/lib/cloud/spark/spark-0.9.0-incubating-bin-hadoop2/bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar mysparktest.jar \
--class mysparktest.jar \
--args yarn-standalone \
--args /user/zhangdeyang/testspark \
--num-workers 3 \
--master-memory 485m \
--worker-memory 485m \
--worker-cores 2
其中输入数据保存在
/user/zhangdeyang/testspark中,测试数据如下:
Look! at the window there leans an old maid. She plucks the
withered leaf from the balsam, and looks at the grass-covered rampart,
on which many children are playing. What is the old maid thinking
of? A whole life drama is unfolding itself before her inward gaze.
"The poor little children, how happy they are- how merrily they
play and romp together! What red cheeks and what angels' eyes! but
they have no shoes nor stockings. They dance on the green rampart,
just on the place where, according to the old story, the ground always
sank in, and where a sportive, frolicsome child had been lured by
means of flowers, toys and sweetmeats into an open grave ready dug for
it, and which was afterwards closed over the child; and from that
moment, the old story says, the ground gave way no longer, the mound
remained firm and fast, and was quickly covered with the green turf.
The little people who now play on that spot know nothing of the old
tale, else would they fancy they heard a child crying deep below the
earth, and the dewdrops on each blade of grass would be to them
tears of woe. Nor do they know anything of the Danish King who here,
in the face of the coming foe, took an oath before all his trembling
courtiers that he would hold out with the citizens of his capital, and
die here in his nest; they know nothing of the men who have fought
here, or of the women who from here have drenched with boiling water
the enemy, clad in white, and 'biding in the snow to surprise the
city.
.
运行我们编写的运行脚本,可得结果如下:
spark home:Optional.of(/usr/lib/cloud/spark/spark-0.9.0-incubating-bin-hadoop2)
Look! at the window there leans an old maid. She plucks the
withered leaf from the balsam, and looks at the grass-covered rampart,
on which many children are playing. What is the old maid thinking
of? A whole life drama is unfolding itself before her inward gaze.
"The poor little children, how happy they are- how merrily they
play and romp together! What red cheeks and what angels' eyes! but
they have no shoes nor stockings. They dance on the green rampart,
just on the place where, according to the old story, the ground always
sank in, and where a sportive, frolicsome child had been lured by
means of flowers, toys and sweetmeats into an open grave ready dug for
it, and which was afterwards closed over the child; and from that
moment, the old story says, the ground gave way no longer, the mound
remained firm and fast, and was quickly covered with the green turf.
The little people who now play on that spot know nothing of the old
tale, else would they fancy they heard a child crying deep below the
earth, and the dewdrops on each blade of grass would be to them
tears of woe. Nor do they know anything of the Danish King who here,
in the face of the coming foe, took an oath before all his trembling
courtiers that he would hold out with the citizens of his capital, and
die here in his nest; they know nothing of the men who have fought
here, or of the women who from here have drenched with boiling water
the enemy, clad in white, and 'biding in the snow to surprise the
city.
--------------next filter's result------------------
"The poor little children, how happy they are- how merrily they
they have no shoes nor stockings. They dance on the green rampart,
tale, else would they fancy they heard a child crying deep below the
tears of woe. Nor do they know anything of the Danish King who here,
die here in his nest; they know nothing of the men who have fought
-------------next sample-------------------
"The poor little children, how happy they are- how merrily they
it, and which was afterwards closed over the child; and from that
in the face of the coming foe, took an oath before all his trembling
----------next sort----------------------
: 27
"The: 1
'biding: 1
A: 1
Danish: 1
King: 1
Look!: 1
Nor: 1
She: 1
The: 1
They: 1
What: 2
a: 2
according: 1
afterwards: 1
all: 1
always: 1
an: 3
and: 12
angels': 1
anything: 1
are: 1
are-: 1
at: 2
balsam,: 1
be: 1
been: 1
before: 2
below: 1
blade: 1
boiling: 1
but: 1
by: 1
capital,: 1
cheeks: 1
child: 2
child;: 1
children: 1
children,: 1
citizens: 1
city.: 1
clad: 1
closed: 1
coming: 1
courtiers: 1
covered: 1
crying: 1
dance: 1
deep: 1
dewdrops: 1
die: 1
do: 1
drama: 1
drenched: 1
dug: 1
each: 1
earth,: 1
else: 1
enemy,: 1
eyes!: 1
face: 1
fancy: 1
fast,: 1
firm: 1
flowers,: 1
foe,: 1
for: 1
fought: 1
frolicsome: 1
from: 3
gave: 1
gaze.: 1
grass: 1
grass-covered: 1
grave: 1
green: 2
ground: 2
had: 1
happy: 1
have: 3
he: 1
heard: 1
her: 1
here: 2
here,: 2
his: 3
hold: 1
how: 2
in: 4
in,: 1
into: 1
inward: 1
is: 2
it,: 1
itself: 1
just: 1
know: 3
leaf: 1
leans: 1
life: 1
little: 2
longer,: 1
looks: 1
lured: 1
maid: 1
maid.: 1
many: 1
means: 1
men: 1
merrily: 1
moment,: 1
mound: 1
nest;: 1
no: 2
nor: 1
nothing: 2
now: 1
oath: 1
of: 9
of?: 1
old: 5
on: 5
open: 1
or: 1
out: 1
over: 1
people: 1
place: 1
play: 2
playing.: 1
plucks: 1
poor: 1
quickly: 1
rampart,: 2
ready: 1
red: 1
remained: 1
romp: 1
sank: 1
says,: 1
shoes: 1
snow: 1
sportive,: 1
spot: 1
stockings.: 1
story: 1
story,: 1
surprise: 1
sweetmeats: 1
tale,: 1
tears: 1
that: 3
the: 26
them: 1
there: 1
they: 7
thinking: 1
to: 3
together!: 1
took: 1
toys: 1
trembling: 1
turf.: 1
unfolding: 1
was: 2
water: 1
way: 1
what: 1
where: 1
where,: 1
which: 2
white,: 1
who: 4
whole: 1
window: 1
with: 3
withered: 1
woe.: 1
women: 1
would: 3