
"Stream Processing with Apache Flink: Fundamentals, Implementation, and Operation of Streaming Applications" By Fabian Hueske and Vasiliki Kalavri
Chapter 1. Introduction to Stateful Stream Processing
- Stateful Stream Processing
- "Apache Flink stores the application state locally in memory or in an embedded database. Since Flink is a distributed system, the local state needs to be protected against failures to avoid data loss in case of application or machine failure. Flink guarantees this by periodically writing a consistent checkpoint of the application state to a remote and durable storage."
- Event-Driven Applications
- "Event-driven applications are stateful streaming applications that ingest event streams and process the events with application-specific business logic."
- "Event-driven applications offer several benefits compared to transactional applications or microservices. Local state access provides very good performance compared to reading and writing queries against remote datastores. Scaling and fault tolerance are handled by the stream processor, and by leveraging an event log as the input source the complete input of an application is reliably stored and can be deterministically replayed. Furthermore, Flink can reset the state of an application to a previous savepoint, making it possible to evolve or rescale an application without losing its state."
- "Moreover, exactly-once state consistency and the ability to scale an application are fundamental requirements for event-driven applications. Apache Flink checks all these boxes and is a very good choice to run this class of applications."
- Data Pipelines
- "Ingesting, transforming, and inserting data with low latency is another common use case for stateful stream processing applications. This type of application is called a data pipeline. Data pipelines must be able to process large amounts of data in a short time. A stream processor that operates a data pipeline should also feature many source and sink connectors to read data from and write data to various storage systems. Again, Flink does all of this."
- Streaming Analytics
- "Instead of waiting to be periodically triggered, a streaming analytics application continuously ingests streams of events and updates its result by incorporating the latest events with low latency. This is similar to the maintenance techniques database systems use to update materialized views. Typically, streaming applications store their result in an external data store that supports efficient updates, such as a database or key-value store."
- A Quick Look at Flink
- "Event-time semantics provide consistent and accurate results despite out-of-order events."
- "Exactly-once state consistency guarantees."
- "Flink applications can be scaled to run on thousands of cores."
- "This book covers the DataStream API and process functions, which provide primitives for common stream processing operations, such as windowing and asynchronous operations, and interfaces to precisely control state and time."
- "Connectors to the most commonly used storage systems such as Apache Kafka, Apache Cassandra, Elasticsearch, JDBC, Kinesis, and (distributed) filesystems such as HDFS and S3."
- "Ability to run streaming applications 24/7 with very little downtime due to its highly available setup (no single point of failure), tight integration with Kubernetes, YARN, and Apache Mesos, fast recovery from failures, and the ability to dynamically scale jobs."
- "Ability to update the application code of jobs and migrate jobs to different Flink clusters without losing the state of the application."
- "Detailed and customizable collection of system and application metrics to identify and react to problems ahead of time."
- "Last but not least, Flink is also a full-fledged batch processor."
Chapter 2. Stream Processing Fundamentals
- Dataflow Graphs
- "Dataflow programs are commonly represented as directed graphs, where nodes are called operators and represent computations and edges represent data dependencies. Operators are the basic functional units of a dataflow application. They consume data from inputs, perform a computation on them, and produce data to outputs for further processing. Operators without input ports are called data sources and operators without output ports are called data sinks. A dataflow graph must have at least one data source and one data sink."
- Data Exchange Strategies
- "The forward strategy sends data from a task to a receiving task. If both tasks are located on the same physical machine (which is often ensured by task schedulers), this exchange strategy avoids network communication."
- "The broadcast strategy sends every data item to all parallel tasks of an operator. Because this strategy replicates data and involves network communication, it is fairly expensive."
- "The key-based strategy partitions data by a key attribute and guarantees that data items having the same key will be processed by the same task. In Figure 2-2, the output of the “Extract hashtags” operator is partitioned by the key (the hashtag), so that the count operator tasks can correctly compute the occurrences of each hashtag."
- The random strategy uniformly distributes data items to operator tasks in order to evenly distribute the load across computing tasks."
- LATENCY
- "Latency indicates how long it takes for an event to be processed."
- "Modern stream processors, like Apache Flink, can offer latencies as low as a few milliseconds."
- THROUGHPUT
- "Throughput is a measure of the system’s processing capacity—its rate of processing. That is, throughput tells us how many events the system can process per time unit."
- DATA INGESTION AND DATA EGRESS
- "Data ingestion is the operation of fetching raw data from external sources and converting it into a format suitable for processing. Operators that implement data ingestion logic are called data sources. A data source can ingest data from a TCP socket, a file, a Kafka topic, or a sensor data interface. Data egress is the operation of producing output in a form suitable for consumptionby external systems. Operators that perform data egress are called data sinks and examples include files, databases, message queues, and monitoring interfaces."
- ROLLING AGGREGATIONS
- "A rolling aggregation is an aggregation, such as sum, minimum, and maximum, that is continuously updated for each input event."
- WINDOW OPERATIONS
- "Window operations continuously create finite sets of events called buckets from an unbounded event stream and let us perform computations on these finite sets."
- "A watermark is a global progress metric that indicates the point in time when we are confident that no more delayed events will arrive."
- "Watermarks are essential for both event-time windows and operators handling out-of-order events."
Chapter 3. The Architecture of Apache Flink
- "The JobManager is the master process that controls the execution of a single application—each application is controlled by a different JobManager. The JobManager receives an application for execution."
- "The JobManager requests the necessary resources (TaskManager slots) to execute the tasks from the ResourceManager."
- "The ResourceManager is responsible for managing TaskManager slots, Flink’s unit of processing resources. When a JobManager requests TaskManager slots, the ResourceManager instructs a TaskManager with idle slots to offer them to the JobManager. If the ResourceManager does not have enough slots to fulfill the JobManager’s request, the ResourceManager can talk to a resource provider to provision containers in which TaskManager processes are started. The ResourceManager also takes care of terminating idle TaskManagers to free compute resources."
- "Each TaskManager provides a certain number of slots. The number of slots limits the number of tasks a TaskManager can execute. After it has been started, a TaskManager registers its slots to the ResourceManager. When instructed by the ResourceManager, the TaskManager offers one or more of its slots to a JobManager. The JobManager can then assign tasksto the slots to execute them."
- "The REST interface enables the dispatcher to serve as an HTTP entry point to clusters that are behind a firewall. The dispatcher also runs a web dashboard to provide information about job executions."
- Framework style deployment: "In this mode, Flink applications are packaged into a JAR file and submitted by a client to a running service."
- Library style deployment: "In this mode, the Flink application is bundled in an application-specific container image, such as a Docker image. The image also includes the code to run a JobManager and ResourceManager. When a container is started from the image, it automatically launches the ResourceManager and JobManager and submits the bundled job for execution."
- "The framework style follows the traditional approach of submitting an application (or query) via a client to a running service. In the library style, there is no Flink service. Instead, Flink is bundled as a library together with the application in a container image. This deployment mode is common for microservices architectures."
- "A streaming application cannot continue processing if the responsible JobManager process disappears. This makes the JobManager a single point of failure for applications in Flink. To overcome this problem, Flink supports a high-availability mode that migrates the responsibility and metadata for a job to another JobManager in case the original JobManager disappears."
- "Flink’s high-availability mode is based on Apache ZooKeeper, a system for distributed services that require coordination and consensus."
- "Flink implements a credit-based flow control mechanism that works as follows. A receiving task grants some credit to a sending task, the number of network buffers that are reserved to receive its data. Once a sender receives a credit notification, it ships as many buffers as it was granted and the size of its backlog—the number of network buffers that are filled and ready to be shipped. The receiver processes the shipped data with the reserved buffers and uses the sender’s backlog size to prioritize the next credit grants for all its connected senders."
- "Credit-based flow control reduces latency because senders can ship data as soon as the receiver has enough resources to accept it."
- "Flink features an optimization technique called task chaining that reduces the overhead of local communication under certain conditions. In order to satisfy the requirements for task chaining, two or more operators must be configured with the same parallelism and connected by local forward channels."
- "Task chaining can significantly reduce the communication costs between local tasks, but there are also cases when it makes sense to execute a pipeline without chaining. For example, it can make sense to break a long pipeline of chained tasks or break a chain into two tasks to schedule an expensive function to different slots."
- "Watermarks have two basic properties:
- They must be monotonically increasing to ensure the event- time clocks of tasks are progressing and not going backward.
- They are related to record timestamps. A watermark with a timestamp T indicates that all subsequent records should have timestamps > T."
- "As soon as one partition does not advance its watermarks or becomes completely idle and does not ship any records or watermarks, the event-time clock of a task will not advance and the timers of the task will not trigger. This situation can be problematic for time-based operators that rely on an advancing clock to perform computations and clean up their state. Consequently, the processing latencies and state size of time-based operators can significantly increase if a task does not receive new watermarks from all input tasks at regular intervals."
- "Operator state is scoped to an operator task. This means that all records processed by the same parallel task have access to the same state. Operator state cannot be accessed by another task of the same or a different operator."
- Broadcast state: "Designed for the special case where the state of each task of an operator is identical. This property can be leveraged during checkpoints and when rescaling an operator."
- "Keyed state is maintained and accessed with respect to a key defined in the records of an operator’s input stream. Flink maintains one state instance per key value and partitions all records with the same key to the operator task that maintains the state for this key. When a task processes a record, it automatically scopes the state access to the key of the current record."
- "A TaskManager process (and with it, all tasks running on it) may fail at any point in time. Hence, its storage must be considered volatile. A state backend takes care of checkpointing the state of a task to a remote and persistent storage."
- "In this section, we present Flink’s checkpointing and recovery mechanism to guarantee exactly-once state consistency."
- "1. Pause the ingestion of all input streams. 2. Wait for all in-flight data to be completely processed, meaning all tasks have processed all their input data. 3. Take a checkpoint by copying the state of each task to a remote, persistent storage. The checkpoint is complete when all tasks have finished their copies. 4. Resume the ingestion of all streams."
- "An application is recovered in three steps:
- Restart the whole application.
- Reset the states of all stateful tasks to the latest checkpoint.
- Resume the processing of all tasks."
- This checkpointing and recovery mechanism can provide exactly-once consistency for application state, given that all operators checkpoint and restore all of their states and that all input streams are reset to the position up to which they were consumed when the checkpoint was taken"
- "Flink implements checkpointing based on the Chandy–Lamport algorithm for distributed snapshots."
- "Flink’s checkpointing algorithm uses a special type of record called a checkpoint barrier. Similar to watermarks, checkpoint barriers are injected by source operators into the regular stream of records and cannot overtake or be passed by other records."
- "When a task receives abarrier for a new checkpoint, it waits for the arrival of barriers from all its input partitions for the checkpoint. While it is waiting, it continues processing records from stream partitions that did not provide a barrier yet."
- "As soon as a task has received barriers from all its input partitions, it initiates a checkpoint at the state backend and broadcasts the checkpoint barrier to all of its downstream connected tasks as shown in Figure 3-23. Once all checkpoint barriers have been emitted, the task starts to process the buffered records."
- "Flink’s checkpointing algorithm produces consistent distributed checkpoints from streaming applications without stopping the whole application. However, it can increase the processing latency of an application."
- "Since the purpose of checkpoints is to ensure an application can be restarted in case of a failure, they are deleted when an application is explicitly canceled."
- "Given an application and a compatible savepoint, you can start the application from the savepoint. This will initialize the state of the application to the state of the savepoint and run the application from the point at which the savepoint was taken."
Chapter 4. Setting Up a Development Environment for Apache Flink
Chapter 5. The DataStream API (v1.7)
- "A common requirement of many applications is to process groups of events that share a certain property together. The DataStream API features the abstraction of a KeyedStream, which is a DataStream that has been logically partitioned into disjoint substreams of eventsthat share the same key."
- "The keyBy transformation converts a DataStream into a KeyedStream by specifying a key. Based on the key, the events of the stream are assigned to partitions, so that all events with the same key are processed by the same task of the subsequent operator."
- "Rolling aggregation transformations are applied on a KeyedStream and produce a DataStream of aggregates, such as sum, minimum, and maximum. A rolling aggregate operator keeps an aggregated value for every observed key. For each incoming event, the operator updates the corresponding aggregate value and emits an event with the updated value."
- "The reduce transformation is a generalization of the rolling aggregation. It applies a ReduceFunction on a KeyedStream, which combines each incoming event with the current reduced value, and produces a DataStream."
- "The DataStream.union() method merges two or more DataStreams of the same type and produces a new DataStream of the same type."
- "The DataStream API provides the connect transformation to support such use cases.1 The DataStream.connect() method receives a DataStream and returns a ConnectedStreams object, which represents the two connected streams"
- "By default, connect() does not establish a relationship between the events of both streams so events of both streams are randomly assigned to operator instances. This behavior yields nondeterministic results and is usually undesirable. In order to achieve deterministic transformationson ConnectedStreams, connect() can be combined with keyBy() or broadcast(). We first show the keyBy() case"
- "Split is the inverse transformation to the union transformation. It divides an input stream into two or more output streams of the same type as the input stream. Each incoming event can be routed to zero, one, or more output streams. Hence, split can also be used to filter or replicate events."
- "When building applications with the DataStream API the system automatically chooses data partitioning strategies and routes data to the correct destination depending on the operation semantics and the configured parallelism. Sometimes it is necessary or desirable to control the partitioning strategies at the application level or define custom partitioners."
- "In general, it is a good idea to define the parallelism of your operators relative to the default parallelism of the environment. This allows you to easily scale the application by adjusting its parallelism via the submission client."
Chapter 6. Time-Based and Window Operators
- "ProcessingTime specifies that operators determine the current time of the data stream according to the system clock of the machine where they are being executed."
- "EventTime specifies that operators determine the current time by using information from the data itself."
- "IngestionTime specifies the processing time of the source operator as an event time timestamp to each ingested record and automatically generates watermarks"
- "Timestamps and watermarks are specified in milliseconds since the epoch of 1970-01-01T00:00:00Z."
- "Assigning watermarks periodically means that we instruct the system to emit watermarks and advance the event time in fixed intervals of machine time."
- "The other common case of periodic watermark generation is when you know the maximum lateness that you will encounter in the input stream—the maximum difference between an element’s timestamp and the largest timestamp of all perviously ingested elements."
- "If you generate loose watermarks—where the watermarks are far behind the timestamps of the processed records—you increase the latency of the produced results. You may have been able to generate a result earlier but you had to wait for the watermark. Moreover the state size typically increases because the application needs to buffer more data until it can perform a computation. However, you can be quite certain all relevant data is available when you perform a computation."
- "On the other hand, if you generate very tight watermarks—watermarks that might be larger than the timestamps of some later records—time-based computations might be performed before all relevant data has arrived. While this might yield incomplete or inaccurate results, the results are produced in a timely fashion with lower latency."
- "Window operators provide a way to group events in buckets of finite size and apply computations on the bounded contents of these buckets."
- "Incremental aggregation functions are directly applied when an element is added to a window and hold and update a single value as window state. These functions are typically very space-efficient and eventually emit the aggregated value as a result."
- "Full window functions collect all elements of a window and iterate over the list of all collected elements when they are evaluated. Full window functions usually require more space but allow for more complex logic than incremental aggregation functions."
- "A ReduceFunction accepts two values of the same type and combines them into a single value of the same type."
- "The WindowAssigner determines for each arriving element to which windows it is assigned."
- "Triggers define when a window is evaluated and its results are emitted. A trigger can decide to fire based on progress in time- or data-specific conditions, such as element count or certain observed element values."
- "The Evictor is an optional component in Flink’s windowing mechanism. It can remove elements from a window before or after the window function is evaluated."
- "The interval join joins events from two streams that have a common key and that have timestamps not more than specified intervals apart from each other."
- Window Join: "Elements of both input streams are assigned to common windows and joined (or cogrouped) when a window is complete."
- "A late element is an element that arrives at an operator when a computation to which it would need to contribute has already been performed."
- "The easiest way to handle late events is to simply discard them."
- "Late events can also be redirected into another DataStream usingthe side-output feature. From there, the late events can be processed or emitted using a regular sink function."
- "Instead of dropping or redirecting late events, another strategy is to recompute an incomplete result and emit an update."
- "When a late element arrives within the allowed lateness period it is handled like an on-time element and handed to the trigger. When the watermark passes the window’s end timestamp plus the lateness interval, the window is finally deleted and all subsequent late elements are discarded."
Chapter 7. Stateful Operators and Applications
- "In order to increase or decrease the parallelism of a function with operator state, the operator state needs to be redistributed to a larger or smaller number of task instances. This requires splitting or merging of state objects. Since the logic for splitting and merging of state is custom for every stateful function, this cannot be automatically done for arbitrary types of state."
- "A shorter checkpointing interval causes higher overhead during regular processing but can enable faster recovery because less data needs to be reprocessed."
- "The state of an application that was running for several weeks can be expensive or even impossible to recompute."
- "In order to prevent increasing resource consumption of an application over time, it is important that the size of the operator state be controlled. Since the handling of state directly affects the semantics of an operator, Flink cannot automatically clean up state and free storage. Instead, all stateful operators must control the size of their state and have to ensure it is not infinitely growing."
- "Apache Flink features queryable state to address use cases that usually would require an external datastore to share data. In Flink, any keyed state can be exposed to external applications as queryable state and act as a read-only key-value store. The stateful streaming application processes events as usual and stores and updates its intermediate or final results in a queryable state. External applications can request the state for a key while the streaming application is running."
Chapter 8. Reading from and Writing to External Systems
- "The combination of Flink’s checkpointing and recovery mechanism and resettable source connectors guarantees that an application will not lose any data. However, the application might emit results twice because all results that have been emitted after the last successful checkpoint (the one to which the application falls back in the case of a recovery) will be emitted again. Therefore, resettable sources and Flink’s recovery mechanism are not sufficient to provide end-to-end exactly-once guarantees even though the application state is exactly-once consistent."
- "There are two techniques that sink connectors can apply in different situations to achieve exactly-once guarantees: idempotent writes and transactional writes."
- "Flink provides two building blocks to implement transactional sink connectors—a generic write-ahead-log (WAL) sink and a two-phase-commit (2PC) sink."
- "For instance, Flink’s Kafka producer implements the TwoPhaseCommitSinkFunction interface. As mentioned before, the connector might lose data if a transaction is rolled back due to a timeout."
Chapter 9. Setting Up Flink for Streaming Applications
- "YARN is the resource manager component of Apache Hadoop."
- "Flink can run on YARN in two modes: the job mode and the session mode."
- "Apache Flink can be deployed on Kubernetes as well"
- "A Flink HA setup requires a running Apache ZooKeeper cluster and a persistent remote storage, such as HDFS, NFS, or S3."
Chapter 10. Operating Flink and Streaming Applications
- "A common example is invalid or corrupt input data the application is not able to handle. In such a situation, an application would end up in an infinite recovery cycle consuming lots of resources without a chance of ever getting back into regular processing. Flink features three restartstrategies to address this problem:
- The fixed-delay restart strategy restarts an application a fixed number of times and waits a configured time before a restart attempt.
- The failure-rate restart strategy restarts an application as long as a configurable failure rate is not exceeded. The failure rate is specified as the maximum number of failures within a time interval. For example, you can configure that an application be restarted as long as it did not fail more than three times in the last ten minutes.
- The no-restart strategy does not restart an application, but fails it immediately."
- "Flink collects several system and application metrics by default. Metrics are gathered per operator, TaskManager, or JobManager. Here we describe some of the most commonly used metrics and refer you to Flink’s documentation for a full list of available metrics."
Chapter 11. Where to Go from Here?
"MAILING LISTS * user@flink.apache.org : user support and questions * dev@flink.apache.org : development, release, and community discussions * community@flink.apache.org : community news and meetups BLOGS * https://flink.apache.org/blog * https://www.ververica.com/blog MEETUPS AND CONFERENCES * https://flink-forward.org * https://www.meetup.com/topics/apache-flink "