Streams and Their Operators in Dart and Flutter in Practice
Let’s jump in to interesting topic of Stream. Here I have provided an example app to practice working with Stream in Flutter. I don’t dive into details of Stream
and Asynchronous
programming. but I leverage its concept to write complex codes simpler.
What’s Stream?
In simple terms, a Stream
is like a pipe that data enters. You perform a chain of operations on it, and the manipulated data exits. Imagine passing a seed into the pipe and receiving a flower at the end.
The power of Stream
lies in their operators. You can chain operators to manipulate the data and pass it to listeners.
Then, at the other end, listeners subscribe to the emitted data and use it according to their needs.
So what’s the benefit for me as a developer?
- You write less code
- You code is predictable
- Write asynchronous code with full control of pause-resuming it.
Project
Here we develop an app that helps users to create simple college layouts from their photos:
What you practice:
- Use streams in tandem of system plugins like photos
- User operators to drive our app’s logic
- Handling user events
- Error handling
The sample project will help you create collage photos like this by practicing the use of streams. The structure of the project is quite simple: one page for creating collage photos, another page for selecting gallery photos, and a model that handles UI interactions for creating, saving, and loading photos.
Getting Started
Clone the project by opening terminal and paste below command:
git clone https://github.com/haashem/collage-neue
Then checkout starter tag:
git checkout starter
Now that you have checked out the starter commit, you already have premade code, so you can focus only on Stream
.
Our first exercise is to display a series of bundled images in a collage layout when tapping the ‘+’ button.
Add these two lines in the CollegeNeueModel
:
final _subscriptions = CompositeSubscription();
final _images = BehaviorSubject<List<ui.Image>>.seeded([]);
CompositeSubscription
is a utility from the RxDart library that helps you dispose of a list of StreamSubscription
instances at once. It also allows you to pause and resume subscriptions, though we don't need that functionality right now.
You can use Set<StreamSubscription>
as a container to hold all StreamSubscription
instances, but I used CompositeSubscription
to introduce you to the utilities in the RxDart library.
When we want to use the output of a stream, we need to listen to it. Listening to a stream returns a StreamSubscription
, and when we are done with the stream’s output, the subscription must be canceled.
To pass data to a stream, we need a StreamController
. BehaviorSubject
is a special StreamController
from the RxDart library that captures the latest emitted data. Whenever you start listening to its stream, the last emitted value is emitted again. You'll see its usage a bit further.
Now, a bit of theory out of the way — let’s continue. Add these lines of code to the add()
method:
// 1
final byteData = await rootBundle.load('assets/IMG_1907.jpg');
final codec = await ui.instantiateImageCodec(byteData.buffer.asUint8List());
final frameInfo = await codec.getNextFrame();
final newImage = frameInfo.image;
// 2
_images.add(_images.value..add(newImage));
- We load the asset image into memory.
Then, we add the new image to the_images
StreamController
, which holds a list of images added by the user. - Whenever the user taps the + button, the same
IMG_1907.jpg
is passed to the stream.
Next, to clear the selected photos, update the clear()
method with:
_images.add([]);
Clearing is as simple as emitting an empty array.
Now, let’s bind CreateCollegePhotoPage
to the _images
stream to display the result by adding the method below:
void bindMainView() {
// 1
final subscription = _images.stream
// 2
.asyncMap((images) => images.isEmpty
? Future.value()
: ImageUtils.collage(images, canvasSize))
// 3
.listen((collegeImage) => previewImage.value = collegeImage);
// 4
_subscriptions.add(subscription);
}
asyncMap
is an operator that receives a list of images and converts it into a single collage image using theImageUtils.collage(images, size)
method, which is defined in theimage_college.dart
file. If the images list is empty, we return no image.- By listening to the stream, a
StreamSubscription
is returned, which we assign to thesubscription
variable. We then use the output of the stream and assign it to thepreviewImage
value.previewImage
is aValueNotifier
object that calls its listener whenever its value changes. - Add
subscription
to the_subscriptions
container so we can dispose of subscriptions later.
Find the dispose()
method and add:
_subscriptions.dispose();
Now its time to display college image, open create_college_photo_page.dart
file and below line to the initState()
method:
@override
void initState() {
super.initState();
model.bindMainView();
}
We also need to dispose of the subscription when the widget’s state is disposed. So, update the _CreateCollegePhotoPageState
dispose()
method to dispose of any StreamSubscription
held by the model.
@override
void dispose() {
model.dispose();
super.dispose();
}
Now run the app and tap on + button multiple times:
We get the list of photos, convert them into a single collage photo, and assign it as a single image within one subscription.
Now, we want to update the page title to display the number of photos in the collage image, enable the save button only when there is an even number of photos, and disable the clear button when the collage photo is empty.
The updateUI(photosCount)
method in _CreateCollegePhotoPageState
handles this logic. We just need to create another stream that emits the collage photo count whenever the user adds a new photo.
Open CollegeNeueModel
and add this line below previewImage
:
final photosCount = ValueNotifier<int>(0);
We have exposed a ValueNotifier
that emits the number of images in the collage photo, so the view can observe the count and update accordingly.
In bindMainView()
, insert this operator before asyncMap
:
.doOnData((event) {
photosCount.value = event.length;
})
The doOnData
operator allows you to perform side effects when the stream emits a new event. This operator is called before asyncMap
.
You may wonder why we used ValueNotifier
. They act similarly to streams, but they don't have the operator methods needed to handle data manipulation. They are much simpler than streams, so we use them to hold a value.
Now, add the line below in the initState()
method:
model.photosCount.addListener(() {
updateUI(model.photosCount.value);
});
As you can see, when listening to a ValueNotifier
, it doesn't return a value like a StreamSubscription
. We also need to dispose of the ValueNotifier
when we no longer need it to release memory.
Add the following line in the CollegeNeueModel
dispose()
method:
photosCount.dispose();
run the app and see the result:
Initially, both the clear and save buttons are disabled, which is the correct initial state. The buttons will keep changing as you add more photos.
Add photos from Gallery
Now, let’s enable the user to pick a photo from the photo gallery. We want to allow the user to select some photos and pass them back to the previous page.
Open create_college_photo_page.dart
and update the + button's on-pressed callback to present the PhotoGalleryPage
.
onPressed: () {
model.add();
Navigator.of(context).push(
MaterialPageRoute(
builder: (context) => PhotoGalleryPage(
model: model,
),
),
);
},
Add following code in CollegeNeueModel
below // Displaying photos picker
comment:
// 1
var selectedPhotosSubject = PublishSubject<ui.Image>();
final _photos = PublishSubject<List<AssetEntity>>();
Stream<List<AssetEntity>> get photos => _photos.stream;
// 2
void bindPhotoPicker() {
loadPhotos().then((items) => _photos.add(items));
}
// 3
void unbindPhotoPicker() {
selectedPhotosSubject.close();
}
PublishSubject
is a broadcastStreamController
from RxDart, meaning it can have multiple listeners. Only_photos
needs to be of typebroadcast StreamController
, because each time the user opens thePhotoGalleryPage
, theStreamBuilder
listens to the same photo stream again. If it is not of typeBroadcast
, Flutter will throw an error.
You can rewrite above with:
var selectedPhotosSubject = StreamController<ui.Image>();
final _photos = StreamController<List<AssetEntity>>.broadcast();
But here for simplicity of code convention, I use PublishSubject
.
bindPhotoPicker()
is called byPhotoGalleryPage
to start loading photos.- We close the
selectedPhotosSubject
stream, so no further events can be added to it whenPhotoGalleryPage
dismissed.
Open PhotoGalleryPage
and call above methods:
@override
void initState() {
super.initState();
model.bindPhotoPicker();
}
@override
void dispose() {
model.unbindPhotoPicker();
super.dispose();
}
If you run the app you see the photos in the photo gallery are now shown:
Now, we need to subscribe to selectedPhotosSubject
and display the selected photos on the main page. Open CollegeNeueModel
and update the add()
method:
assert(canvasSize != Size.zero,
'Canvas size must be set before adding images');
final newPhotosSubscriptions = selectedPhotosSubject.stream
// 1
.map((image) => _images.value + [image])
// 2
.listen(_images.add);
// 3
_subscriptions.add(newPhotosSubscriptions);
- Append new selected image to the current list of images.
- Add new array of images to the
_images
stream. - Add new stream subscription to the
_subscriptions
container. The_subscriptions
will be disposed whenever the model is disposed.
As you can see, if you navigate a few times and try to select more photos, the newly selected photos are not shown!
Why is this happening?
The reason is that when you navigate back from the photo picker, we close the selectedPhotosSubject
in the unbindPhotoPicker()
method, so it won't receive any new events. To restart it, we need to instantiate it again in the add()
method:
selectedPhotosSubject = PublishSubject<ui.Image>();
Save college photo
Open CollegeNeueModel
and update save()
method:
// 1
final _savedPhotoIdSubject = PublishSubject<String>();
Stream<String> get savedPhotoId => _savedPhotoIdSubject.stream;
void save() {
final collegeImage = previewImage.value;
// 2
if (collegeImage == null) {
return;
}
final subscription = PhotoWriter.save(collegeImage)
// 3
.asStream()
.listen((id) {
// 4
_savedPhotoIdSubject.add(id);
clear();
},
// 5
onError: _savedPhotoIdSubject.addError);
_subscriptions.add(subscription);
}
- We created a
PublishSubject
(StreamController
) to add new saved photo IDs to it, and its stream is exposed viasavedPhotoId
. - If there is no collage image, we return early.
- We use the
save(Image)
method ofPhotoWriter
to save the photo.save
returns aFuture
, but we convert it to a stream usingasStream
. - We listen to the stream and pass the emitted saved photo ID to
_savedPhotoIdSubject
, then clear the canvas. - In case of an error, we add the thrown error to the
_savedPhotoIdSubject
.
💡 Remember, a Stream can emit both a value and an error.
Then, open PhotoGalleryPage
and add the method below:
// 1
StreamSubscription<String>? savedPhotoIdSubscription;
void listenToPhotoSaveResult(BuildContext context, CollegeNeueModel model) {
// 2
savedPhotoIdSubscription = model.savedPhotoId.listen((id) {
if (!context.mounted) {
return;
}
showAlertDialog(context,
title: 'Success', message: 'Photo saved with ID: $id');
}, onError: (error) {
if (!context.mounted) {
return;
}
showAlertDialog(context, title: 'Error', message: error.toString());
});
}
- Create
StreamSubscription
variable to hold a reference to thesavedPhotoId
listener. - We start listening to the
savedPhotoId
stream and in case of success or failure, display proper message.
Start listening to photo save result by adding below method add the end of initState()
method:
listenToPhotoSaveResult(context, model);
Let’s see it in action:
In order to see the failure alert, navigate to PhotoWriter
class and immediately throw an error:
static Future<AssetId> save(Image image) async {
throw CouldNotSavePhoto();
}
Try again and you see the error alert:
Operators in Action
Until now, you’ve practiced a good bit of reactive programming. Let’s try another operator that allows us to limit the collage photo to hold a maximum of 6 images.
Add the following line before map
in the add()
method:
.takeWhile((element) => _images.value.length < 6)
takeWhile
allows you to check events against a condition. If it passes the condition, it adds the value to the stream; otherwise, it drops it.
Run the app and try to add more than 6 images to the collage. You’ll see that it only accepts 6 images and no more.
Challenge
It’s now up to you. We want to show an error in case access to the photo gallery was not granted. On iOS, navigate to Settings, select the Collage Neue app, and disable photo access.
To help you open the PhotoGalleryPage
, replace the StreamBuilder
with a custom StreamListenableBuilder
, which has a listener callback. You can use this callback to be notified when the stream emits an error. For the listener callback, you can use the function below:
void _streamListener(
BuildContext context, AsyncSnapshot<List<AssetEntity>> snapshot) {
if (snapshot.hasError && snapshot.error != null) {
switch (snapshot.error) {
case CollegeNeueModelError.permissionNotGranted:
showAlertDialog(context,
title: 'No access to Camera Roll',
message: 'You can grant access in Settings app');
default:
showAlertDialog(context,
title: 'Error', message: snapshot.error!.toString());
}
}
}
Add this method to check if there photo gallery access:
Future<bool> isPhotoGalleryPermissionGranted() =>
PhotoManager.requestPermissionExtend().then((ps) => ps.hasAccess);
Also add this custom permission error in the CollegeNeueModel
:
enum CollegeNeueModelError { permissionNotGranted }
Now you just need to add above error in the photos stream in case you don’t have access to the gallery.
Think and try to do it on your own 💭
Doing it on your own helps to grasp the concept deeper and remember the topic for longer time.
You can always find the answer in the repository code.
Conclusion
Streams are simple to use, and this pattern of data processing is available in most modern languages. For example, in iOS, you can use the Combine framework to achieve the same result, or in ReactiveX, they introduced this paradigm to abstract asynchronous issues, like threading and concurrency, from developers.
I’ve tried to give you a taste of Streams and their operators in Dart and Flutter, which you can use in any layer of your application. In Flutter for UI layer, I recommend use BLoC
pattern and its popular framework in Bloc Library which is built on top of Streams
.
As always you can reach out to me on LinkedIn or X and don’t hesitate to to ask questions😉.