Skip to content

Latest commit

 

History

History
118 lines (83 loc) · 10 KB

appD.asciidoc

File metadata and controls

118 lines (83 loc) · 10 KB

Appendix A: Streaming with Streamz and Dask

This book has been focused on using Dask to build batch applications, where data is collected from or provided by the user and then used for calculations. Another important group of use cases are the situations requiring you to process data as it becomes available.[1] Processing data as it becomes available is called streaming.

Streaming data pipelines and analytics are becoming more popular as people have higher expectations from their data-powered products. Think about how you would feel if a bank transaction took weeks to settle; it would seem archaically slow. Or if you block someone on social media, you expect that block to take effect immediately. While Dask excels at interactive analytics, we believe it does not (currently) excel at interactive responses to user queries.[2]

Streaming jobs are different from batch jobs in a number of important ways. They tend to have faster processing time requirements, and the jobs themselves often have no defined endpoint (besides when the company or service is shut down). One situation in which small batch jobs may not cut it includes dynamic advertising (tens to hundreds of milliseconds). Many other data problems may straddle the line, such as recommendations, where you want to update them based on user interactions but a delay of a few minutes is probably (mostly) OK.

As discussed in [ch08], Dask’s streaming component appears to be less frequently used than other components. Streaming in Dask is, to an extent, added on after the fact,[3] and there are certain places and times when you may notice this. This is most apparent when loading and writing data—everything must move through the main client program and is then either scattered or collected.

Warning

Streamz is not currently capable of handling more data per batch than can fit on the client computer in memory.

In this appendix, you will learn the basics of how Dask streaming is designed, its limitations, and how it compares to some other streaming systems.

Note

As of this writing, Streamz does not implement ipython_display in many places, which may result in error-like messages in Jupyter. You can ignore these (it falls back to repr).

Getting Started with Streamz on Dask

Streamz is straightforward to install. It’s available from PyPI, and you can use pip to install it, although as with all libraries, you must make it available on all the workers. Once you have installed Streamz, you just need to create a Dask client (even in local mode) and import it, as shown in Getting started with Streamz.

Example 1. Getting started with Streamz
link:./examples/dask/Streamz.py[role=include]
Note

When there are multiple clients, Streamz uses the most recent Dask client created.

Streaming Data Sources and Sinks

So far in this book, we’ve loaded data from either local collections or distributed filesystems. While these can certainly serve as sources for streaming data (with some limitations), there are some additional data sources that exist in the streaming world. Streaming data sources are distinct, as they do not have a defined end, and therefore behave a bit more like a generator than a list. Streaming sinks are conceptually similar to consumers of generators.

Note

Streamz has limited sink (or write destination) support, meaning in many cases it is up to you to write your data back out in a streaming fashion with your own function.

Some streaming data sources have the ability to replay or look back at messages that have been published (up to a configurable time period), which is especially useful for a re-compute–based approach to fault tolerance. Two of the popular distributed data sources (and sinks) are Apache Kafka and Apache Pulsar, both of which have the ability to look back at previous messages. An example streaming system that lacks this ability is RabbitMQ.

Streamz’s API documentation covers which sources are supported; for simplicity, we will focus here on Apache Kafka and the local iterable source. Streamz does all loading in the head process, and then you must scatter the result. Loading streaming data should look familiar, with loading a local collection shown in Loading a local iterator and loading from Kafka shown in Loading from Kafka.

Example 2. Loading a local iterator
link:./examples/dask/Streamz.py[role=include]
Example 3. Loading from Kafka
link:./examples/dask/Streamz.py[role=include]

In both of these examples, Streamz will start reading from the most recent message. If you want Streamz to go back to the start of the messages stored, you would add ``.

Warning

Streamz’s reading exclusively on a single-head process is a place you may encounter bottlenecks as you scale.

As with the rest of this book, we assume that you are using existing data sources. If that’s not the case, we encourage you to check out the Apache Kafka or Apache Pulsar documentation (along with the Kafka adapter), as well as the cloud offerings from Confluent.

Word Count

No streaming section would be complete without word count, but it’s important to note that our streaming word count in Streaming word count—in addition to the restriction with data loading—could not perform the aggregation in a distributed fashion.

Example 4. Streaming word count
link:./examples/dask/Streamz.py[role=include]

In the preceding example, you can see some of the current limitations of Streamz, as well as some familiar concepts (like map). If you’re interested in learning more, refer to the Streamz API documentation; note, however, that in our experience, some components will randomly not work on non-local streams.

GPU Pipelines on Dask Streaming

If you are working with GPUs, the cuStreamz project simplifies the integration of cuDF with Streamz. cuStreamz uses a number of custom components for performance, like loading data from Kafka into the GPU instead of having to first land in a Dask DataFrame and then convert it. cuStreamz also implements a custom version of checkpointing with more flexibility than the default Streamz project. The developers behind the project, who are largely employed by people hoping to sell you more GPUs, claim up to an 11x speed-up.

Limitations, Challenges, and Workarounds

Most streaming systems have some form of state checkpointing, allowing streaming applications to be restarted from the last checkpoint when the main control program fails. Streamz’s checkpointing technique is limited to not losing any unprocessed records, but accumulated state can be lost. It is up to you to build your own state checkpointing/restoration if you are building up state over time. This is especially important as the probability of encountering a single point of failure over a long enough window is ~100%, and streaming applications are often intended to run indefinitely.

This indefinite runtime leads to a number of other challenges. Small memory leaks can add up over time, but you can mitigate them by having the worker restart periodically.

Streaming programs that perform aggregations often have problems with late-arriving data. This means that, while you can define your window however you want, you may have records that should have been in that window but did not arrive in time for the process. Streamz has no built-in solution for late-arriving data. Your choices are to manually track the state inside of your process (and persist it somewhere), ignore late-arriving data, or use another stream-processing system with support for late-arriving data (including kSQL, Spark, or Flink).

In some streaming applications it is important that messages are processed exactly once (e.g., a bank account). Dask is generally not well suited to such situations due to re-computing on failure. This similarly applies to Streamz on Dask, where the only option is at-least-once execution. You can work around this by using an external system, such as a database, to keep track of which messages have been processed.

Conclusion

In our opinion, Streamz with Dask is off to an interesting start in support of streaming data inside of Dask. Its current limitations make it best suited to situations in which there is a small amount of streaming data coming in. That being said, in many situations the amount of streaming data is much smaller than the amount of batch-oriented data, and being able to stay within one system for both allows you to avoid duplicated code or logic. If Streamz does not meet your needs, there are many other Python streaming systems available. Some Python streaming systems you may want to check out include Ray streaming, Faust, or PySpark. In our experience, Apache Beam’s Python API has even more room to grow than Streamz.


1. Albeit generally with some (hopefully small) delay.
2. While these are both "interactive," the expectations of someone going to your website and placing an order versus those of a data scientist trying to come up with a new advertising campaign are very different.
3. The same is true for Spark streaming, but Dask’s streaming is even less integrated than Spark’s streaming.