Spark Streaming Session 2 | Word Count Example
Session 2: Spark Streaming Topics: 1. Word Count Example Editor - IntelliJ IDEA Community Version Download Download Scala Plugin in Intellij Code Snippet package demo.sparkStreaming import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} object wordCount { Logger.getLogger("org").setLevel(Level.OFF) val BATCH_DURATION = 10 /*** * After a context is defined, you have to do the following. 1. Define the input sources by creating input DStreams. 2. Define the streaming computations by applying transformation and output operations to DStreams. 3. Start receiving data and processing it using streamingContext.start(). 4. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination(). 5. The processing can be manually stopped using streamingContext.stop(). Points to remember: - Once a context has been started, no new streaming computations can be set up or added to it. - Once a context has been stopped, it cannot be restarted. - Only one StreamingContext can be active in a JVM at the same time. - stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false. - A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created. * @return */ def getStreamingContext: StreamingContext = { val spark = SparkSession .builder() .master("local[*]") .appName("theBigDataShow") .getOrCreate() val sc: SparkContext = spark.sparkContext val ssc: StreamingContext = new StreamingContext( sparkContext = sc, batchDuration = Seconds(BATCH_DURATION) ) ssc } /*** * 1. Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. * 2. It represents a continuous stream of data, either the input data stream received from source, * or the processed data stream generated by transforming the input stream. * 3. Internally, a DStream is represented by a continuous series of RDDs * 4. Each RDD in a DStream contains data from a certain interval * 5. Any operation applied on a DStream translates to operations on the underlying RDDs. * For example, in the earlier example of converting a stream of lines to words, * the flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. * Check The picture. * 6. These underlying RDD transformations are computed by the Spark engine. * The DStream operations hide most of these details and provide the developer with a higher-level API for convenience. */ def getInputStream(ssc: StreamingContext): ReceiverInputDStream[String] = { // Input DStreams are DStreams representing the stream of input data received from streaming sources. // Here lines is an input DStream. It represented the stream of data received from // the netcat server(localhost with port 9999). val lines: ReceiverInputDStream[String] = ssc.socketTextStream( hostname = "localhost", port = 9999, ) lines } def main(args: Array[String]): Unit = { val ssc = getStreamingContext // Create a DStream that will connect to hostname:port, like localhost:9999 val lines: ReceiverInputDStream[String] = getInputStream(ssc) // Split each line into words val words = lines.flatMap(w = w.split(" ")) // // Count each word in each batch val wordsFrequencyMap = words.map(w = (w, 1)) val wordCount = wordsFrequencyMap.reduceByKey(_ + _) wordCount.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }
Download
0 formatsNo download links available.