Scalable Stream Processing - Spark Streaming and Flink
actions): 1. Input operations 2. Transformation 3. Output operations 11 / 79 Operations on DStreams ▶ Input operations ▶ Transformation ▶ Output operations 12 / 79 Input Operations ▶ Every input .flatMap(_.split(" ")) 18 / 79 Operations on DStreams ▶ Input operations ▶ Transformation ▶ Output operations 19 / 79 Transformations (1/4) ▶ Transformations on DStreams are still lazy! ▶ Now through a given function. ▶ flatMap • Similar to map, but each input item can be mapped to 0 or more output items. ▶ filter • Returns a new DStream by selecting only the records of the source DStream on0 码力 | 113 页 | 1.22 MB | 1 年前3Cardinality and frequency estimation - CS 591 K1: Data Stream Processing and Analytics Spring 2020
to use multiple hash functions and combine their estimates: • Using many hash functions for a high-rate stream is expensive • Finding many random and independent hash functions is difficult ??? Vasiliki | Boston University 2020 LogLog algorithm Input: stream S, array of m counters, hash fiction h Output: cardinality of S for j=0 to m-1 do: COUNT[j] = 0 for x in S do: i = h(x) j = getLeftBits(i rank(getRightBits(i, M-p)) COUNT[j] = max(COUNT[j], r) R = average(COUNT) // average of all j counters output a * m * 2R // a is a constant, a 0.39701, for m 64. ≈ ≥ 12 ??? Vasiliki Kalavri | Boston University0 码力 | 69 页 | 630.01 KB | 1 年前3PyFlink 1.15 Documentation
outputs as following: # Use --input to specify file input. # Printing result to stdout. Use --output to specify output path. # +I[To, 1] # +I[be,, 1] # +I[or, 1] # +I[not, 1] # +I[to, 1] # +I[be,--that, 1] # "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__ ˓→file__)))" # It will output a path like the following: # /path/to/python/site-packages/pyflink # Check the logging under the the jar packages under the lib directory ls -lh /path/to/python/site-packages/pyflink/lib # It will output a list of jar packages as following: # -rw-r--r-- 1 dianfu staff 190K 10 18 20:43 flink-cep-1.150 码力 | 36 页 | 266.77 KB | 1 年前3PyFlink 1.16 Documentation
outputs as following: # Use --input to specify file input. # Printing result to stdout. Use --output to specify output path. # +I[To, 1] # +I[be,, 1] # +I[or, 1] # +I[not, 1] # +I[to, 1] # +I[be,--that, 1] # "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__ ˓→file__)))" # It will output a path like the following: # /path/to/python/site-packages/pyflink # Check the logging under the the jar packages under the lib directory ls -lh /path/to/python/site-packages/pyflink/lib # It will output a list of jar packages as following: # -rw-r--r-- 1 dianfu staff 190K 10 18 20:43 flink-cep-1.150 码力 | 36 页 | 266.80 KB | 1 年前3Windows and triggers - CS 591 K1: Data Stream Processing and Analytics Spring 2020
Vasiliki Kalavri | Boston University 2020 Window functions define the computation that is performed on the elements of a window • Incremental aggregation functions are applied when an element is added to a the aggregated value as the result. • ReduceFunction and AggregateFunction • Full window functions collect all elements of a window and iterate over the list of all collected elements when evaluated: evaluated: • They require more space but support more complex logic. • ProcessWindowFunction Window functions 14 Vasiliki Kalavri | Boston University 2020 val minTempPerWindow: DataStream[(String, Double)]0 码力 | 35 页 | 444.84 KB | 1 年前3Skew mitigation - CS 591 K1: Data Stream Processing and Analytics Spring 2020
frequency of item δ: user-defined threshold, so that freq(x)≥ δ*N,δ∈(0,1) ε: user-defined error Output: All items with frequency greater than or equal to δ*N. No item with frequency less than (δ-ε)*N - 1 N = N + 1 Delete step Iterate over D and remove every element x with fx + εx ≤ wcur Output: elements in D with fx ≥ (δ - ε) * N 7 ??? Vasiliki Kalavri | Boston University 2020 Example workers at random and send the item to the least loaded of those two • the system uses two hash functions, H1 and H2 and checks the load of the two sampled workers: P(k) = arg mini(Li(t): H1(k)=i ∨ H2(k)=i)0 码力 | 31 页 | 1.47 MB | 1 年前3Streaming languages and operator semantics - CS 591 K1: Data Stream Processing and Analytics Spring 2020
Transforming languages define transformations specifying operations that process input streams and produce output streams. • Declarative languages specify the expected results of the computation rather than the guarantees • elements have to be of the same type • Difference operators take two streams and output elements present in the first but not in the second • it is blocking and must be defined over input and return sequences (streams) as output: For each new input tuple in S, G adds zero, one, or several tuples to the output. Let Gj(S) be the cumulative output produced by G up to step j. S G G(S)0 码力 | 53 页 | 532.37 KB | 1 年前3Stream processing fundamentals - CS 591 K1: Data Stream Processing and Analytics Spring 2020
append-only • what if packets arrive late? • we might need to revise the computed total traffic, i.e. output stream might contain updates to previously emitted items 12:01 12:02 12:00 18 32 8 32 32 32 disadvantages of each representation? 27 Vasiliki Kalavri | Boston University 2020 Reconstitution functions Insert (append-only): The reconstitution function ins starts with an empty bag and then inserts Logic Query Plan Deployment 39 Vasiliki Kalavri | Boston University 2020 source sink input port output port dataflow graph Dataflow graph • operators are nodes, data channels are edges • channels0 码力 | 45 页 | 1.22 MB | 1 年前3State management - CS 591 K1: Data Stream Processing and Analytics Spring 2020
distributed across all parallel tasks of the function’s operator. Keyed state can only be used by functions that are applied on a KeyedStream: • When the processing method of a function with keyed input final Object lock = ctx.getCheckpointLock(); while (isRunning) { // output and state update are atomic synchronized (lock) { ctx.collect(offset); for (Long s : state) offset = s; } } A stateful source 23 get a lock to make output and state update atomic Vasiliki Kalavri | Boston University 2020 • Working with State: https://ci0 码力 | 24 页 | 914.13 KB | 1 年前3Streaming in Apache Flink
your cluster grows and shrinks • queryable: Flink state can be queried via a REST API Rich Functions • open(Configuration c) • close() • getRuntimeContext() DataStream> ProcessWindowFunction< SensorReading, // input type Tuple3 , // output type String, // key type TimeWindow> { // window 0 码力 | 45 页 | 3.00 MB | 1 年前3
共 19 条
- 1
- 2