maven
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<scala.binary.version>2.10</scala.binary.version>
<spark.version>1.6.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- SPARK -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- SPARK STREAMING -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- JACKSON SCALA -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.4</version>
</dependency>
</dependencies>
kafka
public class DemoProducer {
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public DemoProducer(String topic){
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
public void start(){
for(int i=0;i<100;i++){
String messageStr = new String("Message_" + RandomUtils.nextInt(1,10));
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
System.out.println("send:"+messageStr);
}
}
}
kafka-spark
public class JavaKafkaWordCount implements Serializable{
private static final Pattern SPACE = Pattern.compile(" ");
private JavaKafkaWordCount() {
}
public static void main(String[] args) throws Exception {
String zkHost = "localhost:2181";
String topic = "wordTopic";
String group = "spark-demo";
ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
root.setLevel(Level.WARN);
//sh bin/kafka-topics.sh --create --topic wordTopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181
DemoProducer demoProducer = new DemoProducer(topic);
demoProducer.start();
JavaKafkaWordCount javaKafkaWordCount = new JavaKafkaWordCount();
javaKafkaWordCount.start(zkHost,group,topic,1);
}
public void start(String zkHost,String group,String topicList,int numThreads) throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("JavaKafkaWordCount");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = topicList.split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkHost,group,topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
System.out.println(tuple2);
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Arrays.asList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
输出
(Message_2,15)
(Message_8,9)
(Message_4,13)
(Message_6,10)
(Message_9,8)
(Message_3,12)
(Message_1,11)
(Message_7,8)
(Message_5,14)
ui
docs