Streams and Their Operators in Dart and Flutter in Practice

Hashem Abounajmi
10 min readFeb 2, 2025

--

Let’s jump in to interesting topic of Stream. Here I have provided an example app to practice working with Stream in Flutter. I don’t dive into details of Stream and Asynchronous programming. but I leverage its concept to write complex codes simpler.

What’s Stream?

In simple terms, a Stream is like a pipe that data enters. You perform a chain of operations on it, and the manipulated data exits. Imagine passing a seed into the pipe and receiving a flower at the end.

The power of Stream lies in their operators. You can chain operators to manipulate the data and pass it to listeners.

Operators in Stream

Then, at the other end, listeners subscribe to the emitted data and use it according to their needs.

Stream subscription

So what’s the benefit for me as a developer?

  1. You write less code
  2. You code is predictable
  3. Write asynchronous code with full control of pause-resuming it.

Project

Here we develop an app that helps users to create simple college layouts from their photos:

College Photo app

What you practice:

  • Use streams in tandem of system plugins like photos
  • User operators to drive our app’s logic
  • Handling user events
  • Error handling

The sample project will help you create collage photos like this by practicing the use of streams. The structure of the project is quite simple: one page for creating collage photos, another page for selecting gallery photos, and a model that handles UI interactions for creating, saving, and loading photos.

Getting Started

Clone the project by opening terminal and paste below command:

git clone https://github.com/haashem/collage-neue 

Then checkout starter tag:

git checkout starter

Now that you have checked out the starter commit, you already have premade code, so you can focus only on Stream.

Our first exercise is to display a series of bundled images in a collage layout when tapping the ‘+’ button.

Add these two lines in the CollegeNeueModel:

final _subscriptions = CompositeSubscription();
final _images = BehaviorSubject<List<ui.Image>>.seeded([]);

CompositeSubscription is a utility from the RxDart library that helps you dispose of a list of StreamSubscriptioninstances at once. It also allows you to pause and resume subscriptions, though we don't need that functionality right now.

You can use Set<StreamSubscription> as a container to hold all StreamSubscription instances, but I used CompositeSubscription to introduce you to the utilities in the RxDart library.

When we want to use the output of a stream, we need to listen to it. Listening to a stream returns a StreamSubscription, and when we are done with the stream’s output, the subscription must be canceled.

To pass data to a stream, we need a StreamController. BehaviorSubject is a special StreamController from the RxDart library that captures the latest emitted data. Whenever you start listening to its stream, the last emitted value is emitted again. You'll see its usage a bit further.

Now, a bit of theory out of the way — let’s continue. Add these lines of code to the add() method:

// 1
final byteData = await rootBundle.load('assets/IMG_1907.jpg');
final codec = await ui.instantiateImageCodec(byteData.buffer.asUint8List());
final frameInfo = await codec.getNextFrame();
final newImage = frameInfo.image;

// 2
_images.add(_images.value..add(newImage));
  1. We load the asset image into memory.
    Then, we add the new image to the _images StreamController, which holds a list of images added by the user.
  2. Whenever the user taps the + button, the same IMG_1907.jpg is passed to the stream.

Next, to clear the selected photos, update the clear() method with:

_images.add([]);

Clearing is as simple as emitting an empty array.

Now, let’s bind CreateCollegePhotoPage to the _images stream to display the result by adding the method below:

void bindMainView() {
// 1
final subscription = _images.stream
// 2
.asyncMap((images) => images.isEmpty
? Future.value()
: ImageUtils.collage(images, canvasSize))
// 3
.listen((collegeImage) => previewImage.value = collegeImage);

// 4
_subscriptions.add(subscription);
}
  1. asyncMap is an operator that receives a list of images and converts it into a single collage image using the ImageUtils.collage(images, size) method, which is defined in the image_college.dart file. If the images list is empty, we return no image.
  2. By listening to the stream, a StreamSubscription is returned, which we assign to the subscription variable. We then use the output of the stream and assign it to the previewImage value. previewImage is a ValueNotifier object that calls its listener whenever its value changes.
  3. Add subscription to the _subscriptions container so we can dispose of subscriptions later.

Find the dispose() method and add:

_subscriptions.dispose();

Now its time to display college image, open create_college_photo_page.dart file and below line to the initState() method:

@override
void initState() {
super.initState();
model.bindMainView();
}

We also need to dispose of the subscription when the widget’s state is disposed. So, update the _CreateCollegePhotoPageState dispose() method to dispose of any StreamSubscription held by the model.

 @override
void dispose() {
model.dispose();
super.dispose();
}

Now run the app and tap on + button multiple times:

Display emitted images

We get the list of photos, convert them into a single collage photo, and assign it as a single image within one subscription.

Now, we want to update the page title to display the number of photos in the collage image, enable the save button only when there is an even number of photos, and disable the clear button when the collage photo is empty.

The updateUI(photosCount) method in _CreateCollegePhotoPageState handles this logic. We just need to create another stream that emits the collage photo count whenever the user adds a new photo.

Open CollegeNeueModel and add this line below previewImage:

final photosCount = ValueNotifier<int>(0);

We have exposed a ValueNotifier that emits the number of images in the collage photo, so the view can observe the count and update accordingly.

In bindMainView(), insert this operator before asyncMap:

.doOnData((event) {
photosCount.value = event.length;
})

The doOnData operator allows you to perform side effects when the stream emits a new event. This operator is called before asyncMap.

You may wonder why we used ValueNotifier. They act similarly to streams, but they don't have the operator methods needed to handle data manipulation. They are much simpler than streams, so we use them to hold a value.

Now, add the line below in the initState() method:

model.photosCount.addListener(() {
updateUI(model.photosCount.value);
});

As you can see, when listening to a ValueNotifier, it doesn't return a value like a StreamSubscription. We also need to dispose of the ValueNotifier when we no longer need it to release memory.

Add the following line in the CollegeNeueModel dispose() method:

photosCount.dispose();

run the app and see the result:

Show count of displayed images

Initially, both the clear and save buttons are disabled, which is the correct initial state. The buttons will keep changing as you add more photos.

Add photos from Gallery

Now, let’s enable the user to pick a photo from the photo gallery. We want to allow the user to select some photos and pass them back to the previous page.

Open create_college_photo_page.dart and update the + button's on-pressed callback to present the PhotoGalleryPage.

onPressed: () {
model.add();
Navigator.of(context).push(
MaterialPageRoute(
builder: (context) => PhotoGalleryPage(
model: model,
),
),
);
},

Add following code in CollegeNeueModel below // Displaying photos picker comment:

// 1
var selectedPhotosSubject = PublishSubject<ui.Image>();
final _photos = PublishSubject<List<AssetEntity>>();
Stream<List<AssetEntity>> get photos => _photos.stream;

// 2
void bindPhotoPicker() {
loadPhotos().then((items) => _photos.add(items));
}

// 3
void unbindPhotoPicker() {
selectedPhotosSubject.close();
}
  1. PublishSubject is a broadcast StreamController from RxDart, meaning it can have multiple listeners. Only _photosneeds to be of type broadcast StreamController, because each time the user opens the PhotoGalleryPage, the StreamBuilder listens to the same photo stream again. If it is not of type Broadcast, Flutter will throw an error.

You can rewrite above with:

var selectedPhotosSubject = StreamController<ui.Image>();
final _photos = StreamController<List<AssetEntity>>.broadcast();

But here for simplicity of code convention, I use PublishSubject.

  1. bindPhotoPicker() is called by PhotoGalleryPage to start loading photos.
  2. We close the selectedPhotosSubject stream, so no further events can be added to it when PhotoGalleryPagedismissed.

Open PhotoGalleryPage and call above methods:

@override
void initState() {
super.initState();
model.bindPhotoPicker();
}

@override
void dispose() {
model.unbindPhotoPicker();
super.dispose();
}

If you run the app you see the photos in the photo gallery are now shown:

Photo Gallery

Now, we need to subscribe to selectedPhotosSubject and display the selected photos on the main page. Open CollegeNeueModel and update the add() method:

assert(canvasSize != Size.zero,
'Canvas size must be set before adding images');
final newPhotosSubscriptions = selectedPhotosSubject.stream
// 1
.map((image) => _images.value + [image])
// 2
.listen(_images.add);

// 3
_subscriptions.add(newPhotosSubscriptions);
  1. Append new selected image to the current list of images.
  2. Add new array of images to the _images stream.
  3. Add new stream subscription to the _subscriptions container. The _subscriptions will be disposed whenever the model is disposed.
Select images from gallery

As you can see, if you navigate a few times and try to select more photos, the newly selected photos are not shown!

Why is this happening?

The reason is that when you navigate back from the photo picker, we close the selectedPhotosSubject in the unbindPhotoPicker() method, so it won't receive any new events. To restart it, we need to instantiate it again in the add() method:

selectedPhotosSubject = PublishSubject<ui.Image>();
Display selected images from gallery

Save college photo

Open CollegeNeueModel and update save() method:

// 1
final _savedPhotoIdSubject = PublishSubject<String>();
Stream<String> get savedPhotoId => _savedPhotoIdSubject.stream;

void save() {
final collegeImage = previewImage.value;
// 2
if (collegeImage == null) {
return;
}

final subscription = PhotoWriter.save(collegeImage)
// 3
.asStream()
.listen((id) {
// 4
_savedPhotoIdSubject.add(id);
clear();
},
// 5
onError: _savedPhotoIdSubject.addError);

_subscriptions.add(subscription);
}
  1. We created a PublishSubject (StreamController) to add new saved photo IDs to it, and its stream is exposed via savedPhotoId.
  2. If there is no collage image, we return early.
  3. We use the save(Image) method of PhotoWriter to save the photo. save returns a Future, but we convert it to a stream using asStream.
  4. We listen to the stream and pass the emitted saved photo ID to _savedPhotoIdSubject, then clear the canvas.
  5. In case of an error, we add the thrown error to the _savedPhotoIdSubject.

💡 Remember, a Stream can emit both a value and an error.

Then, open PhotoGalleryPage and add the method below:


// 1
StreamSubscription<String>? savedPhotoIdSubscription;
void listenToPhotoSaveResult(BuildContext context, CollegeNeueModel model) {

// 2
savedPhotoIdSubscription = model.savedPhotoId.listen((id) {
if (!context.mounted) {
return;
}
showAlertDialog(context,
title: 'Success', message: 'Photo saved with ID: $id');
}, onError: (error) {
if (!context.mounted) {
return;
}
showAlertDialog(context, title: 'Error', message: error.toString());
});
}
  1. Create StreamSubscription variable to hold a reference to the savedPhotoId listener.
  2. We start listening to the savedPhotoId stream and in case of success or failure, display proper message.

Start listening to photo save result by adding below method add the end of initState() method:

listenToPhotoSaveResult(context, model);

Let’s see it in action:

Save college photo

In order to see the failure alert, navigate to PhotoWriter class and immediately throw an error:

static Future<AssetId> save(Image image) async {
throw CouldNotSavePhoto();
}

Try again and you see the error alert:

Save Photo Failure

Operators in Action

Until now, you’ve practiced a good bit of reactive programming. Let’s try another operator that allows us to limit the collage photo to hold a maximum of 6 images.

Add the following line before map in the add() method:

.takeWhile((element) => _images.value.length < 6)

takeWhile allows you to check events against a condition. If it passes the condition, it adds the value to the stream; otherwise, it drops it.

Run the app and try to add more than 6 images to the collage. You’ll see that it only accepts 6 images and no more.

Challenge

It’s now up to you. We want to show an error in case access to the photo gallery was not granted. On iOS, navigate to Settings, select the Collage Neue app, and disable photo access.

Disable Photos Gallery Access

To help you open the PhotoGalleryPage, replace the StreamBuilder with a custom StreamListenableBuilder, which has a listener callback. You can use this callback to be notified when the stream emits an error. For the listener callback, you can use the function below:

void _streamListener(
BuildContext context, AsyncSnapshot<List<AssetEntity>> snapshot) {
if (snapshot.hasError && snapshot.error != null) {
switch (snapshot.error) {
case CollegeNeueModelError.permissionNotGranted:
showAlertDialog(context,
title: 'No access to Camera Roll',
message: 'You can grant access in Settings app');
default:
showAlertDialog(context,
title: 'Error', message: snapshot.error!.toString());
}
}
}

Add this method to check if there photo gallery access:

Future<bool> isPhotoGalleryPermissionGranted() =>
PhotoManager.requestPermissionExtend().then((ps) => ps.hasAccess);

Also add this custom permission error in the CollegeNeueModel:

enum CollegeNeueModelError { permissionNotGranted }

Now you just need to add above error in the photos stream in case you don’t have access to the gallery.

Think and try to do it on your own 💭

Doing it on your own helps to grasp the concept deeper and remember the topic for longer time.

You can always find the answer in the repository code.

Conclusion

Streams are simple to use, and this pattern of data processing is available in most modern languages. For example, in iOS, you can use the Combine framework to achieve the same result, or in ReactiveX, they introduced this paradigm to abstract asynchronous issues, like threading and concurrency, from developers.

I’ve tried to give you a taste of Streams and their operators in Dart and Flutter, which you can use in any layer of your application. In Flutter for UI layer, I recommend use BLoC pattern and its popular framework in Bloc Library which is built on top of Streams .

As always you can reach out to me on LinkedIn or X and don’t hesitate to to ask questions😉.

--

--

Hashem Abounajmi
Hashem Abounajmi

No responses yet