博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
scala spark-streaming整合kafka (spark 2.3 kafka 0.10)
阅读量:5827 次
发布时间:2019-06-18

本文共 8872 字,大约阅读时间需要 29 分钟。

 Maven组件如下:   

 

 

org.apache.spark
spark-streaming-kafka-0-10_2.11
2.3.0

 官网代码如下:

/*  * Licensed to the Apache Software Foundation (ASF) under one or more  * contributor license agreements.  See the NOTICE file distributed with  * this work for additional information regarding copyright ownership.  * The ASF licenses this file to You under the Apache License, Version 2.0  * (the "License"); you may not use this file except in compliance with  * the License.  You may obtain a copy of the License at  *  *    http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ // scalastyle:off println package org.apache.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka010._ /**  * Consumes messages from one or more topics in Kafka and does wordcount.  * Usage: DirectKafkaWordCount 
*
is a list of one or more Kafka brokers *
is a list of one or more kafka topics to consume from * * Example: * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \ * topic1,topic2 */ object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s""" |Usage: DirectKafkaWordCount
|
is a list of one or more Kafka brokers |
is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(brokers, topics) = args // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Get the lines, split them into words, count the words and print val lines = messages.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start() ssc.awaitTermination() } } // scalastyle:on println

 

运行以上代码出现如下错误等:

 Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.

  由错误可见,是因为没有设置kafka相关参数。

 把官网代码修改如下:

package cn.xdf.userprofile.stream import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import scala.collection.mutable object DirectKafka {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println( s""" |Usage: DirectKafkaWordCount
|
is a list of one or more Kafka brokers |
is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } val Array(brokers,topics)=args var conf = new SparkConf() .setAppName("DirectKafka") .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) val topicsSet=topics.split(",").toSet val kafkaParams=mutable.HashMap[String,String]() //必须添加以下参数,否则会报错 kafkaParams.put("bootstrap.servers" ,brokers) kafkaParams.put("group.id", "group1") kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") val messages=KafkaUtils.createDirectStream [String,String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topicsSet,kafkaParams ) ) // Get the lines, split them into words, count the words and print val lines = messages.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start() ssc.awaitTermination() } }

 运行过程如下:

 启动kafka

 

   bin/kafka-server-start ./etc/kafka/server.properties &

[2018-10-22 11:24:14,748] INFO [GroupCoordinator 0]: Stabilized group group1 generation 1 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 11:24:14,761] INFO [GroupCoordinator 0]: Assignment received from leader for group group1 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 11:24:14,779] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __consumer_offsets-40. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-10-22 11:28:19,010] INFO [GroupCoordinator 0]: Preparing to rebalance group group1 with old generation 1 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 11:28:19,013] INFO [GroupCoordinator 0]: Group group1 with generation 2 is now empty (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 11:29:29,424] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 11 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-10-22 11:39:29,414] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-10-22 11:49:29,414] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

 

 

运行spark

 /usr/local/spark-2.3.0/bin/spark-submit --class cn.xdf.userprofile.stream.DirectKafka --master yarn --driver-memory 2g     --num-executors 1      --executor-memory 2g     --executor-cores 1  userprofile2.0.jar localhost:9092 test 

2018-10-22 11:28:16 INFO  DAGScheduler:54 - Submitting 1 missing tasks from ResultStage 483 (ShuffledRDD[604] at reduceByKey at DirectKafka.scala:46) (first 15 tasks are for partitions Vector(1))
2018-10-22 11:28:16 INFO  TaskSchedulerImpl:54 - Adding task set 483.0 with 1 tasks
2018-10-22 11:28:16 INFO  TaskSetManager:54 - Starting task 0.0 in stage 483.0 (TID 362, localhost, executor driver, partition 1, PROCESS_LOCAL, 7649 bytes)
2018-10-22 11:28:16 INFO  Executor:54 - Running task 0.0 in stage 483.0 (TID 362)
2018-10-22 11:28:16 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 1 blocks
2018-10-22 11:28:16 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 0 ms
2018-10-22 11:28:16 INFO  Executor:54 - Finished task 0.0 in stage 483.0 (TID 362). 1091 bytes result sent to driver
2018-10-22 11:28:16 INFO  TaskSetManager:54 - Finished task 0.0 in stage 483.0 (TID 362) in 4 ms on localhost (executor driver) (1/1)
2018-10-22 11:28:16 INFO  TaskSchedulerImpl:54 - Removed TaskSet 483.0, whose tasks have all completed, from pool 
2018-10-22 11:28:16 INFO  DAGScheduler:54 - ResultStage 483 (print at DirectKafka.scala:47) finished in 0.008 s
2018-10-22 11:28:16 INFO  DAGScheduler:54 - Job 241 finished: print at DirectKafka.scala:47, took 0.009993 s
-------------------------------------------
Time: 1540178896000 ms
-------------------------------------------

 

 启动生产者

[root@master kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

 

 

>  hello you

 

>  hello me

 

查看结果:

(hello,2)
(me,1)
(you,1)
2018-10-22 11:57:08 INFO  JobScheduler:54 - Finished job streaming job 1540180628000 ms.0 from job set of time 1540180628000 ms
2018-10-22 11:57:08 INFO  JobScheduler:54 - Total delay: 0.119 s for time 1540180628000 ms (execution: 0.072 s)
2018-10-22 11:57:08 INFO  ShuffledRDD:54 - Removing RDD 154 from persistence list
2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 153 from persistence list
2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 153
2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 154
2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 152 from persistence list
2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 152
2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 151 from persistence list
2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 151
2018-10-22 11:57:08 INFO  KafkaRDD:54 - Removing RDD 150 from persistence list
2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 150

 

转载地址:http://wuodx.baihongyu.com/

你可能感兴趣的文章
修改上一篇文章的node.js代码,支持默认页及支持中文
查看>>
我理想中的前端工作流
查看>>
Chrome 广告屏蔽功能不影响浏览器性能
查看>>
Android状态栏实现沉浸式模式
查看>>
使用Openfiler搭建ISCSI网络存储
查看>>
zabbix监控php状态(四)
查看>>
实战Django:小型CMS Part2
查看>>
原创]windows server 2012 AD架构试验系列 – 16更改DC计算机名
查看>>
统治世界的十大算法
查看>>
SSH中调用另一action的方法(chain,redirect)
查看>>
表格排序
查看>>
关于Android四大组件的学习总结
查看>>
java只能的round,ceil,floor方法的使用
查看>>
新开的博客,为自己祝贺一下
查看>>
【CQOI2011】放棋子
查看>>
采用JXL包进行EXCEL数据写入操作
查看>>
一周总结
查看>>
将txt文件转化为json进行操作
查看>>
线性表4 - 数据结构和算法09
查看>>
Online Patching--EBS R12.2最大的改进
查看>>