# Pytanie 34: Analityka danych strumieniowych ## Pytanie **"ScharakteryzowaΔ‡ rozwiΔ…zania analityczne dziaΕ‚ajΔ…ce na danych o charakterze strumieniowym."** Przedmiot: PSD (Przetwarzanie Strumieniowe Danych) --- ## πŸ“š OdpowiedΕΊ gΕ‚Γ³wna ### 1. Charakterystyka danych strumieniowych ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ DANE STRUMIENIOWE vs BATCH β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”‚ BATCH: β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Dane statyczne β”‚ β†’ Przetwarzanie β†’ Wynik β”‚ β”‚ β”‚ (caΕ‚y zbiΓ³r) β”‚ (jednorazowe) β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β”‚ STREAMING: β”‚ β”‚ ─────●────●──●─────●──●────●───────→ (nieskoΕ„czony strumieΕ„) β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ ↓ ↓ ↓ ↓ ↓ ↓ β”‚ β”‚ [Przetwarzanie ciΔ…gΕ‚e] β†’ Wyniki w czasie rzeczywistym β”‚ β”‚ β”‚ β”‚ Cechy strumieni: β”‚ β”‚ β€’ Nieograniczone (unbounded) β”‚ β”‚ β€’ CiΔ…gΕ‚e napΕ‚ywanie β”‚ β”‚ β€’ Brak moΕΌliwoΕ›ci przechowania wszystkiego β”‚ β”‚ β€’ Wymagana niska latencja β”‚ β”‚ β€’ Dane mogΔ… byΔ‡ nieuporzΔ…dkowane (out-of-order) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` --- ### 2. Modele przetwarzania #### Event Time vs Processing Time ``` Event Time: Kiedy zdarzenie faktycznie nastΔ…piΕ‚o Processing Time: Kiedy zdarzenie dotarΕ‚o do systemu Timeline: Event time: ─●─────●───●─────────●───→ E1 E2 E3 E4 Processing: ───●───────●──●──●───────→ E1 E3 E2 E4 (rΓ³ΕΌna kolejnoΕ›Δ‡!) Watermark: znacznik postΔ™pu event time "Wszystkie zdarzenia do czasu T juΕΌ dotarΕ‚y" ``` #### Windowing (okna czasowe) ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ TUMBLING WINDOW (rozΕ‚Δ…czne): β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”‚ Window1β”‚β”‚ Window2β”‚β”‚ Window3β”‚β”‚ Window4β”‚ β”‚ β”‚ β”‚ β”‚ SLIDING WINDOW (nakΕ‚adajΔ…ce siΔ™): β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”‚ β”‚ SESSION WINDOW (oparte na aktywnoΕ›ci): β”‚ β”‚ β”œβ”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”œβ”€β”€β”€β”€ β”œβ”€β”€β”€β”€β”€β”€ β”‚ β”‚ session1 session2 s3 session4 β”‚ β”‚ gap gap gap β”‚ β”‚ β”‚ β”‚ GLOBAL WINDOW: β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β†’ β”‚ β”‚ (jedno okno, trigger decyduje kiedy emitowaΔ‡) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` --- ### 3. Platformy Stream Processing #### Apache Kafka Streams ```java StreamsBuilder builder = new StreamsBuilder(); KStream source = builder.stream("input-topic"); KTable, Long> counts = source .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(); counts.toStream().to("output-topic"); // Cechy: // β€’ Library (nie cluster) // β€’ Exactly-once semantics // β€’ Stateful processing z RocksDB // β€’ Integracja z Kafka ecosystem ``` #### Apache Flink ```java DataStream stream = env.addSource(new FlinkKafkaConsumer<>(...)); DataStream result = stream .keyBy(event -> event.getKey()) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .aggregate(new MyAggregateFunction()) .process(new MyProcessFunction()); // Cechy: // β€’ True streaming (nie micro-batch) // β€’ Event time processing // β€’ Exactly-once state // β€’ Savepoints & checkpoints // β€’ Complex Event Processing (CEP) ``` #### Apache Spark Structured Streaming ```python df = spark.readStream \ .format("kafka") \ .option("subscribe", "events") \ .load() result = df \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window("timestamp", "5 minutes"), "userId" ) \ .count() query = result.writeStream \ .outputMode("append") \ .format("console") \ .start() # Cechy: # β€’ Micro-batch (domyΕ›lnie) lub Continuous # β€’ Unified batch + streaming API # β€’ Catalyst optimizer # β€’ Integracja z Spark ecosystem ``` --- ### 4. PorΓ³wnanie platform | Cecha | Kafka Streams | Flink | Spark Streaming | |-------|---------------|-------|-----------------| | **Model** | True streaming | True streaming | Micro-batch | | **Deployment** | Library | Cluster | Cluster | | **Latency** | Niska | Bardzo niska | Średnia (~100ms) | | **State** | RocksDB | RocksDB/heap | In-memory/external | | **Exactly-once** | Tak | Tak | Tak | | **SQL** | KSQL | Flink SQL | Spark SQL | | **CEP** | Ograniczone | Tak (FlinkCEP) | Nie natywnie | --- ### 5. Algorytmy strumieniowe #### Approximate counting - HyperLogLog ``` Problem: Zlicz unikalne elementy w strumieniu (bez przechowywania wszystkich) HyperLogLog: β€’ O(1) space (kilka KB) β€’ ~2% error dla 12-bit registers β€’ UΕΌywa hash β†’ trailing zeros PrzykΕ‚ad: Redis PFADD, PFCOUNT ``` #### Frequency estimation - Count-Min Sketch ``` Problem: Estymuj czΔ™stoΕ›Δ‡ elementΓ³w Count-Min Sketch: β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ h1: [3][0][2][5][1][0][4][2] β”‚ β”‚ h2: [1][2][0][3][4][1][0][2] β”‚ β”‚ h3: [2][1][3][0][2][1][3][0] β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ Query(x): min(h1[hash1(x)], h2[hash2(x)], h3[hash3(x)]) β€’ Overestimates (never underestimates) β€’ Tunable accuracy vs space ``` #### Sampling - Reservoir Sampling ``` Problem: RΓ³wnomiernie prΓ³bkuj k elementΓ³w ze strumienia o nieznanej dΕ‚ugoΕ›ci Algorithm: 1. Zachowaj pierwsze k elementΓ³w 2. Dla i-tego elementu (i > k): - Z prawdopodobieΕ„stwem k/i zamieΕ„ losowy element w reservoir Gwarancja: KaΕΌdy element ma szansΔ™ k/n ``` --- ### 6. ObsΕ‚uga opΓ³ΕΊnieΕ„ i Out-of-Order ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ WATERMARKS + LATE DATA β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”‚ Stream: ─●(t=1)──●(t=5)──●(t=3)──●(t=8)──●(t=2)───→ β”‚ β”‚ ↑ ↑ β”‚ β”‚ out-of-order late data β”‚ β”‚ β”‚ β”‚ Watermark: "Prawdopodobnie wszystkie events do t=X dotarΕ‚y" β”‚ β”‚ β”‚ β”‚ Strategie dla late data: β”‚ β”‚ 1. DROP: Ignoruj spΓ³ΕΊnione (najprostsze) β”‚ β”‚ 2. RECOMPUTE: Przelicz okno (kosztowne) β”‚ β”‚ 3. SIDE OUTPUT: Zapisz do osobnego strumienia β”‚ β”‚ 4. ALLOWED LATENESS: Czekaj dodatkowo N czasu β”‚ β”‚ β”‚ β”‚ Flink: β”‚ β”‚ .allowedLateness(Time.minutes(5)) β”‚ β”‚ .sideOutputLateData(lateOutputTag) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` --- ### 7. Exactly-Once Semantics ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ GWARANCJE PRZETWARZANIA: β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚ β”‚ AT-MOST-ONCE: β”‚ β”‚ Fire-and-forget, moΕΌliwa utrata danych β”‚ β”‚ β”‚ β”‚ AT-LEAST-ONCE: β”‚ β”‚ Retry przy failure, moΕΌliwe duplikaty β”‚ β”‚ β”‚ β”‚ EXACTLY-ONCE: β”‚ β”‚ KaΕΌde zdarzenie przetworzone dokΕ‚adnie raz β”‚ β”‚ β”‚ β”‚ Implementacja (Flink/Kafka): β”‚ β”‚ β€’ Checkpointing (periodic snapshots) β”‚ β”‚ β€’ Transactional sinks (Kafka transactions) β”‚ β”‚ β€’ Barrier alignment β”‚ β”‚ β€’ Idempotent operations β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` --- ### 8. Use Cases | Use Case | Technologia | Opis | |----------|-------------|------| | **Fraud detection** | Flink CEP | Pattern matching w czasie rzeczywistym | | **IoT analytics** | Kafka Streams | Agregacja danych z sensorΓ³w | | **Real-time dashboards** | Spark + Druid | Metryki biznesowe | | **Log analysis** | ELK + Kafka | Centralizacja logΓ³w | | **Recommendations** | Flink | Real-time personalizacja | --- ## 🧠 Mnemoniki ### "TWSS = Tumbling, Window, Sliding, Session": Cztery typy okien czasowych ### "Flink = Fast, Spark = Safe": Flink najszybszy (true streaming), Spark bezpieczny (micro-batch) ### "HLL = Hash, Leading zeros, Log": HyperLogLog do zliczania unikalnych --- ## ❓ Pytania dodatkowe ### Q1: "Kiedy micro-batch a kiedy true streaming?" **OdpowiedΕΊ:** True streaming (Flink): ultra-low latency, CEP, event-time critical. Micro-batch (Spark): wyΕΌsza przepustowoΕ›Δ‡, Ε‚atwiejsza integracja z batch, mniej wraΕΌliwe na anomalie. ### Q2: "Jak obsΕ‚uΕΌyΔ‡ skoki danych (spikes)?" **OdpowiedΕΊ:** Backpressure (Flink automatycznie), buffering, auto-scaling (Kubernetes), rate limiting na ΕΊrΓ³dle, spillage to disk. ### Q3: "Co to jest checkpointing?" **OdpowiedΕΊ:** Periodic snapshots stanu (Flink). Przy failure - restart od ostatniego checkpoointu. Barrier synchronizuje snapshot miΔ™dzy operatorami. Incremental checkpoints dla duΕΌych stanΓ³w. --- ## 🎯 Kluczowe punkty 1. **Streaming:** Unbounded, continuous, low latency 2. **Windowing:** Tumbling, Sliding, Session, Global 3. **Event time vs Processing time:** Watermarks 4. **Platforms:** Flink (true), Spark (micro-batch), Kafka Streams (library) 5. **Exactly-once:** Checkpointing + transactions --- ## πŸ“– ΕΉrΓ³dΕ‚a 1. Kleppmann - "Designing Data-Intensive Applications" 2. Apache Flink Documentation 3. Spark Structured Streaming Guide 4. Kafka Streams Documentation