flutter
/

BLoC Streams: Real-Time Data Handling with Bloc & Stream

Last Sync: Today

On this page

10
0%
5 min read
Remaining
5 minleft

Click any section to jump — progress syncs automatically

flutter

BLoC Streams: Real-Time Data Handling with Bloc & Stream

What Are Streams in BLoC?

Streams are a core part of reactive programming in Dart. In the BLoC pattern, streams are used to handle asynchronous events and state updates. While BLoC already uses streams internally (events in, states out), you can also integrate external streams—like WebSockets, Firebase Realtime Database, or sensor data—into your BLoC to build real‑time features.

Why Use Streams with BLoC?

  • Real‑Time Updates – React to data changes instantly (e.g., live scores, chat messages).
  • Event‑Driven Architecture – Handle continuous data flows elegantly.
  • Decoupling – Separate data sources from UI logic.
  • Cancellation Support – Manage subscriptions to avoid memory leaks.
  • Composability – Combine, transform, and filter streams using Dart’s stream operators.

Basic Stream Integration in BLoC

The simplest way to integrate a stream into a BLoC is to listen to it inside the BLoC and emit states in response to stream events. You can do this using emit.forEach (introduced in flutter_bloc 8.0) or by manually managing a StreamSubscription.

DARTRead-only
1
class TimerBloc extends Bloc<TimerEvent, TimerState> {
  TimerBloc() : super(TimerInitial()) {
    on<TimerStarted>(_onTimerStarted);
  }

  Future<void> _onTimerStarted(TimerStarted event, Emitter<TimerState> emit) async {
    await emit.forEach(
      _getTimerStream(event.duration),
      onData: (duration) => TimerRunInProgress(duration),
      onError: (error, stackTrace) => TimerRunError(error.toString()),
    );
  }

  Stream<int> _getTimerStream(int duration) async* {
    for (int i = duration; i >= 0; i--) {
      await Future.delayed(Duration(seconds: 1));
      yield i;
    }
  }
}
DARTRead-only
1
class ChatBloc extends Bloc<ChatEvent, ChatState> {
  late final StreamSubscription _subscription;

  ChatBloc({required Stream<Message> messageStream}) : super(ChatInitial()) {
    _subscription = messageStream.listen(
      (message) => add(MessageReceived(message)),
      onError: (error) => add(ChatError(error.toString())),
    );

    on<MessageReceived>((event, emit) {
      emit(state.copyWith(messages: [...state.messages, event.message]));
    });
  }

  @override
  Future<void> close() {
    _subscription.cancel();
    return super.close();
  }
}

WebSocket Integration with BLoC

WebSockets provide a persistent connection for real‑time two‑way communication. Here’s how to integrate a WebSocket into a BLoC using web_socket_channel.

DARTRead-only
1
import 'package:web_socket_channel/web_socket_channel.dart';

class WebSocketBloc extends Bloc<WebSocketEvent, WebSocketState> {
  final WebSocketChannel _channel;
  late final StreamSubscription _subscription;

  WebSocketBloc({required String url}) : _channel = WebSocketChannel.connect(Uri.parse(url)), super(WebSocketInitial()) {
    _subscription = _channel.stream.listen(
      (data) => add(WebSocketDataReceived(data)),
      onError: (error) => add(WebSocketError(error.toString())),
    );

    on<WebSocketDataReceived>((event, emit) {
      emit(WebSocketDataLoaded(event.data));
    });

    on<WebSocketSendMessage>((event, emit) {
      _channel.sink.add(event.message);
    });
  }

  @override
  Future<void> close() {
    _subscription.cancel();
    _channel.sink.close();
    return super.close();
  }
}

Combining Multiple Streams

Sometimes you need to listen to multiple streams and combine their latest values. You can use Rx.combineLatest from rxdart or StreamZip to merge streams and emit a combined state.

DARTRead-only
1
import 'package:rxdart/rxdart.dart';

class CombinedDataBloc extends Bloc<CombinedEvent, CombinedState> {
  CombinedDataBloc({
    required Stream<int> streamA,
    required Stream<String> streamB,
  }) : super(CombinedInitial()) {
    final combinedStream = Rx.combineLatest2(
      streamA,
      streamB,
      (int a, String b) => CombinedData(a, b),
    );

    await emit.forEach(
      combinedStream,
      onData: (data) => CombinedLoaded(data),
      onError: (error) => CombinedError(error.toString()),
    );
  }
}

Stream Transformations with BLoC Events

You can also use streams to trigger events in your BLoC. For example, a search text field can be debounced and transformed before adding an event.

DARTRead-only
1
class SearchBloc extends Bloc<SearchEvent, SearchState> {
  SearchBloc() : super(SearchInitial()) {
    on<SearchQueryChanged>(
      _onSearchQueryChanged,
      transformer: debounce(const Duration(milliseconds: 500)),
    );
  }

  void _onSearchQueryChanged(SearchQueryChanged event, Emitter<SearchState> emit) async {
    // Perform search
  }

  EventTransformer<SearchQueryChanged> debounce(Duration duration) {
    return (events, mapper) => events.debounceTime(duration).flatMap(mapper);
  }
}

Error Handling and Retry

Streams can fail; you must handle errors gracefully. Use onError in the subscription or emit.forEach’s onError callback. For retry logic, consider using Stream.retry from rxdart or manually re‑subscribing.

DARTRead-only
1
class RetryBloc extends Bloc<RetryEvent, RetryState> {
  RetryBloc() : super(RetryInitial()) {
    on<RetryRequested>((event, emit) async {
      await emit.forEach(
        _getDataStream().retryWhen((errors, stackTrace) => errors.delay(Duration(seconds: 2))),
        onData: (data) => RetrySuccess(data),
        onError: (error) => RetryFailed(error.toString()),
      );
    });
  }
}

Best Practices

  • Always cancel subscriptions – Override close() and cancel any StreamSubscription to prevent memory leaks.
  • Use emit.forEach when possible – It automatically handles subscription management and cancellation.
  • Avoid long‑running operations in BLoC constructors – Move them to event handlers or use on<...> with async actions.
  • Keep streams inside BLoC – External data sources (like WebSockets) should be injected so the BLoC remains testable.
  • Use rxdart for advanced transformations – Debouncing, throttling, combining, and retrying become much easier.
  • Test stream integration – Mock the stream source and verify state emissions.

Common Mistakes

  • ❌ Not canceling subscriptions – Causes memory leaks and unexpected behavior. ✅ Cancel in close().
  • ❌ Using listen without error handling – Unhandled errors crash the app. ✅ Always provide onError callback.
  • ❌ Creating new streams inside BLoC without disposal – If you create a StreamController inside the BLoC, close it in close().
  • ❌ Blocking the BLoC with long‑running tasks – Avoid awaiting inside the main event handler if it blocks other events. Use emit.forEach or isolate work.
  • ❌ Ignoring backpressure – Rapid stream events can overwhelm the UI. Use debounce or throttle.

Conclusion

Integrating streams with BLoC allows you to build highly responsive, real‑time applications. Whether you're working with WebSockets, Firebase, or custom data streams, the BLoC pattern provides a clean way to manage state while ensuring subscriptions are properly handled. By leveraging emit.forEach, manual subscriptions, and rxdart operators, you can create robust, testable, and performant reactive Flutter apps.

Try it yourself

import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';

void main() => runApp(MyApp());

// Events
abstract class TimerEvent {}
class TimerStarted extends TimerEvent {
  final int duration;
  TimerStarted(this.duration);
}

// States
abstract class TimerState {}
class TimerInitial extends TimerState {}
class TimerRunInProgress extends TimerState {
  final int duration;
  TimerRunInProgress(this.duration);
}
class TimerRunComplete extends TimerState {}

// BLoC
class TimerBloc extends Bloc<TimerEvent, TimerState> {
  TimerBloc() : super(TimerInitial()) {
    on<TimerStarted>(_onTimerStarted);
  }

  Future<void> _onTimerStarted(TimerStarted event, Emitter<TimerState> emit) async {
    await emit.forEach(
      _getTimerStream(event.duration),
      onData: (duration) => TimerRunInProgress(duration),
    );
    emit(TimerRunComplete());
  }

  Stream<int> _getTimerStream(int duration) async* {
    for (int i = duration; i >= 0; i--) {
      await Future.delayed(Duration(seconds: 1));
      yield i;
    }
  }
}

// UI
class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: BlocProvider(
        create: (_) => TimerBloc(),
        child: TimerPage(),
      ),
    );
  }
}

class TimerPage extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('BLoC Stream Timer')),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: [
            BlocBuilder<TimerBloc, TimerState>(
              builder: (context, state) {
                if (state is TimerInitial) {
                  return ElevatedButton(
                    onPressed: () => context.read<TimerBloc>().add(TimerStarted(10)),
                    child: Text('Start 10s Timer'),
                  );
                } else if (state is TimerRunInProgress) {
                  return Text('Time left: ${state.duration}s', style: TextStyle(fontSize: 24));
                } else if (state is TimerRunComplete) {
                  return Text('Timer finished!', style: TextStyle(fontSize: 24));
                }
                return Container();
              },
            ),
          ],
        ),
      ),
    );
  }
}

Test Your Knowledge

Q1
of 3

Which method in flutter_bloc automatically manages a stream subscription and emits states?

A
emit.onData
B
emit.forEach
C
emit.map
D
emit.fromStream
Q2
of 3

What should you do in a BLoC's `close()` method when you manually created a `StreamSubscription`?

A
Nothing – it auto‑cleans up
B
Call `dispose()` on the BLoC
C
Cancel the subscription
D
Re‑initialize the subscription
Q3
of 3

Which package is commonly used for advanced stream transformations like debouncing and combining?

A
bloc_concurrency
B
rxdart
C
async
D
stream_transform

Frequently Asked Questions

What is the difference between `emit.forEach` and manually listening to a stream?

emit.forEach automatically manages the subscription for you and cancels it when the BLoC is closed. Manual listening gives you more control but requires explicit cancellation in close().

How do I handle a stream that might emit errors?

Use the onError parameter in emit.forEach or provide an onError callback when calling listen.

Can I use `StreamController` inside a BLoC?

Yes, but you must close it in close() to prevent memory leaks. Alternatively, use a Stream from an external source that you don't own.

How do I debounce a stream before triggering an event?

Use an EventTransformer with debounceTime from rxdart. Example: on<SearchEvent>(..., transformer: (events, mapper) => events.debounceTime(Duration(milliseconds: 500)).flatMap(mapper));

Is it safe to use `BlocBuilder` with streams directly?

Yes, but you usually want the BLoC to convert stream data into states. Then BlocBuilder reacts to state changes, not the raw stream.

Previous

bloc session management

Next

bloc stream transform

Related Content

Need help?

Explore our comprehensive docs or start a chat with our tech experts.