Scalable Stream Processing - Spark Streaming and Flinkpairs where the values for each key are aggregated using the given reduce function. ▶ countByValue • Returns a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of pairs where the values for each key are aggregated using the given reduce function. ▶ countByValue • Returns a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of pairs where the values for each key are aggregated using the given reduce function. ▶ countByValue • Returns a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of0 码力 | 113 页 | 1.22 MB | 1 年前3
Stream processing fundamentals - CS 591 K1: Data Stream Processing and Analytics Spring 2020System • ad-hoc queries, data manipulation tasks • insertions, updates, deletions of single row or groups of rows Data Stream Management System • continuous queries • sequential data access, make to obtain r2, r3, ..., etc. • as a replacement sequence where some attribute A denotes a key and an arriving tuple t replaces any existing tuple with the same t(A) value to form a new relation else ins_u(P). Insert-Replace: If the stream has a key, the reconstitution function ins_r guarantees that only the most recent item with a given key is included: • ins_r([]) = Ø • ins_r(P:i) = insert(i0 码力 | 45 页 | 1.22 MB | 1 年前3
PyFlink 1.15 Documentationpyflink.table import DataTypes table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], DataTypes.ROW([DataTypes.FIELD("id", DataTypes. ˓→TINYINT()), DataTypes.FIELD("data", DataTypes. ˓→STRING())])) StreamTableEnvironment.create(env) ds = env.from_collection([(1, 'Hi'), (2, 'Hello')], type_info=Types.ROW_NAMED( ["id", "data"], [Types.BYTE(), Types.STRING()])) table = t_env.from_data_stream(ds, Schema contents of the current Table to local client. [12]: list(table.execute().collect()) [12]: [<Row(1, 'Hi')>, <Row(2, 'Hello')>] [13]: table.execute().print() +----+----------------------+--------------------------------+0 码力 | 36 页 | 266.77 KB | 1 年前3
PyFlink 1.16 Documentationpyflink.table import DataTypes table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], DataTypes.ROW([DataTypes.FIELD("id", DataTypes. ˓→TINYINT()), DataTypes.FIELD("data", DataTypes. ˓→STRING())])) StreamTableEnvironment.create(env) ds = env.from_collection([(1, 'Hi'), (2, 'Hello')], type_info=Types.ROW_NAMED( ["id", "data"], [Types.BYTE(), Types.STRING()])) table = t_env.from_data_stream(ds, Schema contents of the current Table to local client. [12]: list(table.execute().collect()) [12]: [<Row(1, 'Hi')>, <Row(2, 'Hello')>] [13]: table.execute().print() +----+----------------------+--------------------------------+0 码力 | 36 页 | 266.80 KB | 1 年前3
Flow control and load shedding - CS 591 K1: Data Stream Processing and Analytics Spring 2020contains materialized load shedding plans ordered by how much load shedding they will cause. • Each row contains a plan with • expected cycle savings • locations for drop operations • drop amounts0 码力 | 43 页 | 2.42 MB | 1 年前3
State management - CS 591 K1: Data Stream Processing and Analytics Spring 2020Keyed state is scoped to a key defined in the operator’s input records • Flink maintains one state instance per key value and partitions all records with the same key to the operator task that maintains maintains the state for this key • State access is automatically scoped to the key of the current record so that all records with the same key access the same state State management in Apache Flink Vasiliki Kalavri | Boston University 2020 RocksDB 10 RocksDB is an LSM-tree storage engine with key/value interface, where keys and values are arbitrary byte streams. https://rocksdb.org/ https://www0 码力 | 24 页 | 914.13 KB | 1 年前3
Windows and triggers - CS 591 K1: Data Stream Processing and Analytics Spring 2020University 2020 non-overlapping buckets of fixed size 12:10 12:00 12:20 fixed time interval key 3 key 2 key 1 Tumbling windows 8 Vasiliki Kalavri | Boston University 2020 val sensorData: DataStream[SensorReading] University 2020 overlapping buckets of fixed size fixed length slide 12:10 12:00 12:20 key 3 key 2 key 1 Sliding windows 10 Vasiliki Kalavri | Boston University 2020 val sensorData: DataStream[SensorReading] Boston University 2020 a period of activity followed by a period of inactivity session gap key 3 key 2 key 1 Session windows 12 Vasiliki Kalavri | Boston University 2020 // event-time session windows0 码力 | 35 页 | 444.84 KB | 1 年前3
Fault-tolerance demo & reconfiguration - CS 591 K1: Data Stream Processing and Analytics Spring 2020location for each key in the checkpoint, so that tasks locate and read the matching keys only • Avoids reading irrelevant data • Requires a materialized index for all keys, i.e. a key-to-read-offset Reconfiguring keyed stateful operators requires preserving the key semantics: • Existing state for a particular key and all future events with this key must be routed to the same parallel instance • Some Some kind of hashing is typically used • Maintaining routing tables or an index for all key mappings is usually impractical • Skewed load is challenging to handle with hashing 24 ??? Vasiliki Kalavri0 码力 | 41 页 | 4.09 MB | 1 年前3
Skew mitigation - CS 591 K1: Data Stream Processing and Analytics Spring 2020Kalavri | Boston University 2020 Key partitioning 2 w2 w1 w3 round-robin hash-based • Items are perfectly balanced among workers • No routing table required • Key semantics are not preserved: same key might be routed to different workers • Workers are responsible for roughly the same amount of keys • No routing table is required • Key semantics preserved: values of the same key are partitioning key values. • We can then use a hybrid partitioning function that treats normal keys and popular keys differently. • Keeping exact counts is impractical for large key domains, e.g.0 码力 | 31 页 | 1.47 MB | 1 年前3
Streaming in Apache Flink// access the state for this key MovingAverage average = averageState.value(); // create a new MovingAverage (with window size 2) if none exists for this key if (average == null) average element.getCreationTime(); } } Windows (Not the OS) Global Vs Keyed Windows stream. .keyBy(<key selector>) .window() .reduce|aggregate|process( ) stream. ◦EventTimeSessionWindows.withGap(Time.minutes(30)) DataStream input = ... input .keyBy(“key”) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .process(new MyWastefulMax()); public 0 码力 | 45 页 | 3.00 MB | 1 年前3
共 18 条
- 1
- 2













