최근 SSAFY(Samsung Software Academy For Youth)에서 3번째 프로젝트를 시작하였습니다. 이번 프로젝트에서는 실시간으로 여러 데이터를 주고 받아야하는데요. 내용물만 다른 여러 개의 지속적으로 보내야 하고 하나의 요청에서 오류가 나더라도 다음 요청이 계속 진행되어야 합니다. 동시에 이런 작업을 진행하는 중에 유저와의 상호작용이 멈추면 안되죠. 이런 이유로 기존에 사용하던 데이터 처리 방식과는 다른 더 효율적인 방법이 없을까하는 고민에 빠졌습니다. 그러다가 예전에 인터넷에서 보았었던 스트림(Stream)이라는 개념이 떠올랐습니다.

이름을 포함해 처음 스트림 패턴을 접했을 때는 계속 아래로 흐르는 시냇물과 같았습니다. 출처: copilot

스트림(Stream)

스트림은 대용량 데이터 처리나 비동기 I/O(Input/ Output) 작업을 효율적으로 다루기 위한 수단입니다. 말 그대로 모든 형태의 데이터를 흐름("Stream")으로 처리하는 방법입니다. 순차적으로 나열된 데이터인 연속적인 데이터 요소의 시퀸스를 처리하는 개념입니다. 데이터를 추상화하여 순차적 혹은 병렬적으로 데이터를 처리할 수 있는 방법이죠. 스트림은 흐름으로 이어진 데이터 각각의 개별 요소를 하나씩 차례대로 처리합니다. 정리하자면, 스트림이란 데이터의 연속적인 흐름을 효율적으로 처리하는 것에 초점을 맞춘 프로그래밍 패턴입니다.

스트림 패턴은 마치 공장의 컨베이어 벨트와 비슷한 면이 있습니다. 출처: copilot

연속적인 데이터 요소는 네트워크 통신에서 데이터패킷의 정렬, 배열과 리스트와 같은 자료형을 의미합니다. 해당 형식의 데이터에 스트림을 사용한다면 마치 공장의 컨베이어 벨트를 지나면서 가공되는 공산품처럼 복잡한 데이터 처리 로직을 흐름에 따라 간결하게 표현할 수 있습니다. 하나의 공장에 여러 개의 컨베이어 벨트를 둘 수 있는 것처럼 멀티 쓰레드 환경에서 병렬적으로 스트림을 사용하면 한 번에 여러 개의 스트림이 작동하므로 데이터 처리 속도를 향상시킬 수 있습니다.

 

스트림의 장점은 여기서 끝이 아닙니다. 스트림에서 발생하는 오류를 감지하고 적절히 처리할 수 있는 방법이 존재합니다. 스트림을 통해 데이터가 순차적으로 처리되기 때문에 오류가 발생하면 즉시 감지하여 처리할 수 있습니다. 동시에 발생한 오류들이 프로그램의 실행에 미치는 영향을 국소화할 수 있죠. 오류가 발생한 데이터는 데이터 흐름의 다른 부분으로 보내어 정상적으로 작동하는 부분과 다른 경로로 보내버려 전체 프로그램의 실행이 멈추지 않는 방식입니다. 여기에 더해 스트림에서는 오류가 발생한 부분의 데이터를 저장하였다가 나중에 재시도하거나 복구하는 것 또한 가능합니다.

 

스트림 패턴의 예시

사실 스트림은 여러 프로그래밍 언어에서 쉽게 볼 수 있습니다. 배열(스트림) 내의 각 요소에 대해 특정 연산을 적용하고, 그 결과로 새로운 배열을 생성하는 map 메서드나 주어진 조건에 맞는 요소만을 추출하여 새로운 배열을 생성하는 filter 메서드 등이 스트림 패턴에 속합니다. 스트림이 이런 메서드들과 관련이 있는건 스트림이  함수형 프로그래밍 패러다임과 밀접하게 관련되어 있기 때문이기도 하죠. 스트림과 함수형 프로그래밍은 조금 뒤에 알아보기로 하고 예시 코드를 한 번 보겠습니다.

 

다음은 스트림 패턴을 기본적으로 제공하는 Flutter를 사용해 코드를 작성한 것입니다. 해당 위젯을 main.dart에서 불러와 사용해 보세요.

import 'dart:async';
import 'dart:math';

import 'package:flutter/material.dart';

class RepeatedRequestWidget extends StatefulWidget {
  const RepeatedRequestWidget({super.key});

  @override
  RepeatedRequestWidgetState createState() => RepeatedRequestWidgetState();
}

class RepeatedRequestWidgetState extends State<RepeatedRequestWidget> {
  final Random random = Random(); // 랜덤 값을 생성하기 위한 Random 클래스 인스턴스
  final StreamController<void> _requestController =
      StreamController(); // 요청을 관리하기 위한 스트림 컨트롤러
  final StreamController<Map<String, dynamic>> _responseController =
      StreamController(); // 응답을 관리하기 위한 스트림 컨트롤러

  late Map<String, dynamic> lastData = {
    'requestTime': DateTime.now().toIso8601String(),
    'responseTime': DateTime.now().toIso8601String(),
    'delay': 0,
    'count': 0
  }; // 이전 데이터를 저장
  int requestCount = 0; // 요청 횟수를 카운트하는 변수

  // 초기 상태 설정
  @override
  void initState() {
    super.initState();

    // 0.5초마다 요청을 보내는 타이머 설정
    Timer.periodic(const Duration(milliseconds: 500), (timer) {
      _requestController.add(null); // _requestController에 빈 객체 추가하여 요청 전달
    });

    // 요청을 처리하고 응답을 _responseController에 추가하는 리스너 등록
    _requestController.stream.listen((_) {
      // 서버로부터 데이터를 가져오는 비동기 함수 호출
      requestCount++; // 요청 번호 증가
      debugPrint('$requestCount번 요청 보냄');
      fetchDataFromServer(requestCount, DateTime.now().toIso8601String()).then(
        (data) {
          // 최신 응답의 요청 이후에 보낸 요청에 대해서만 실행
          if (DateTime.parse(data['requestTime'])
              .isAfter(DateTime.parse(lastData['requestTime']))) {
            data['responseTime'] = DateTime.now().toIso8601String();
            _responseController.add(data); // 응답 컨트롤러에 데이터 추가
            lastData = data; // 최신 데이터를 lastData에 저장
          }
        },
        onError: (error) {
          _responseController.add(
              {...lastData, 'error': error}); // 오류 발생 시 이전 데이터를 응답 컨트롤러에 추가
        },
      );
    });
  }

  // 서버로부터 데이터를 비동기적으로 가져오는 함수
  Future<Map<String, dynamic>> fetchDataFromServer(
      int requestCount, String requestTime) async {
    int delayMilliseconds = 500 + random.nextInt(5000); // 지연 시간을 랜덤으로 결정
    await Future.delayed(
      Duration(milliseconds: delayMilliseconds),
    ); // 지연 시간만큼 대기

    if (random.nextDouble() < 0.1) {
      throw '에러 발생'; // 10%의 확률로 서버 오류를 발생시킴
    }

    return {
      'count': requestCount, // 요청 횟수
      'requestTime': requestTime, // 요청 시간 기록
      'delay': delayMilliseconds, // 지연 시간 기록
    };
  }

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: const Text('주기적 서버 요청'),
        ),
        body: _buildResponseWidget(), // 응답 위젯을 구성하는 메서드 호출
      ),
    );
  }

  // 응답을 처리하는 위젯을 생성하는 함수
  Widget _buildResponseWidget() {
    return StreamBuilder<Map<String, dynamic>>(
      stream: _responseController.stream, // _responseController의 스트림 사용
      builder: _buildResponseBuilder, // 응답을 처리하는 빌더 함수 호출
    );
  }

  @override
  void dispose() {
    _requestController.close(); // _requestController 스트림을 종료
    _responseController.close(); // _responseController 스트림을 종료
    super.dispose(); // 위젯의 상태를 정리하는 메서드를 호출
  }
}

// 응답 데이터를 처리하는 빌더 함수
Widget _buildResponseBuilder(
    BuildContext context, AsyncSnapshot<Map<String, dynamic>> snapshot) {
  if (snapshot.connectionState == ConnectionState.waiting) {
    return const Center(
      child: CircularProgressIndicator(),
    ); // 데이터 대기 중 로딩 인디케이터 표시
  }
  if (!snapshot.hasData) {
    return const Center(
      child: Text('데이터를 기다리는 중...'),
    ); // 데이터가 아직 없을 경우 대기 메시지를 표시
  }
  Map<String, dynamic> data = snapshot.data!;
  debugPrint('${data['count']}번 응답');
  return Center(
    child: Column(
      mainAxisAlignment: MainAxisAlignment.center,
      children: [
        data['error'] != null
            ? Text(data['error'])
            : const Text(''), // 오류가 있으면 화면에 표시
        Text('요청 시간: ${data['requestTime']}'), // 서버 요청 시간을 표시
        Text('응답 시간: ${data['responseTime']}'), // 서버 응답 시간을 표시
        Text('지연: ${data['delay']} ms'), // 요청과 응답 사이의 지연 시간을 밀리초 단위로 표시
        Text('요청 번호: ${data['count']}'), // 현재까지의 요청 횟수를 표시
      ],
    ),
  );
}

위의 코드는 된 서버에 stream 패턴을 통해 0.5초마다 서버에 요청을 보내고 받은 데이터를 UI에 표시하는 코드입니다. 요청에 대한 응답이 오는 시간은 모두 다르며 최소 0.1초에서 최대 5초의 지연이 발생할 수 있습니다. 그리고 해당 서버는 10% 확율로 오류가 발생합니다. 실제 서버로 요청은 보내지 않고 fetchDataFromServer가 그 역할을 대신합니다.

 

실제로 코드를 실행한다면 항상 최신의 요청에 대한 응답만 UI에 표시할 것입니다. 단 최신 응답을 자동으로 표시하지는 않고 요청을 보낸 시간을 기록하여 현재 받은 응답 중 가장 최신 요청에 대한 응답과 비교하는 과정이 필요합니다. 그리고 오류가 발생하여도 가장 최신 요청에 대한 응답을 responseController에 넣어주어 오류가 나지 않은 것처럼 동작하고 있습니다. 만약 실제 어플리케이션을 만든다면 오류 처리용 스트림을 만들어 따로 관리해줄 수도 있습니다.

 

SnapShot 객체

snapshotStream의 현재 상태를 캡처하는 데 사용됩니다. 이 메소드를 호출하면 스냅샷 객체가 반환되는데, 이 객체에는 스트림의 현재 상태와 값을 포함하는 정보가 담겨 있습니다. 스냅샷 객체는 스트림의 현재 상태를 한 번에 얻는데 사용됩니다. 즉 현재 스트림이 처리하고 있는 데이터와 데이터를 상태를 나타내는 것이죠.

 

스냅샷 객체는 Stream 클래스에 있는 다양한 메소드와 속성을 제공합니다. 주요 메소드 중 하나는 hasError(hasError는 getter), 스트림에서 오류가 발생했는지 여부를 확인할 수 있습니다. 물론 위의 코드에서는  오류가 _requestController에서 처리되고 있으므로 snapShot 객체에서 오류를 확인하지 않습니다. 현재 스트림에서 처리하는 데이터의 현재 값을 알고 싶다면 data 속성에 접근할 수 있습니다. 그리고 data에 접근하기전에 hasData를 통해 스트림이 데이터를 포함하고 있는지 알 수 있습니다.

 

ConnectionState는 스트림의 연결 상태를 나타내는 열거형(Enum)입니다. 스트림이 현재 어떤 작업을 수행하고 있는지를 나타내죠.

  • none: 스트림이 아무 작업도 하지 않는 상태입니다. 초기 상태일 수 있습니다.
  • waiting: 스트림이 이벤트를 기다리는 중인 상태입니다. 이벤트가 발생할 때까지 대기합니다.
  • active: 스트림이 현재 이벤트를 전달하고 있는 상태입니다.
  • done: 스트림이 이벤트 발생을 완료한 상태입니다.

스트림은 데이터를 쌓지 않습니다.

스트림을 사용하려고 공부하던 중 만약 엄청 오랜 시간 스트림을 사용한다면 계속 데이터가 쌓여 메모리가 부족해질 것이라는 생각을 하였습니다. 하지만 결론부터 말하자면 Dart의 Stream은 데이터를 쌓지 않습니다.

 

스트림은 데이터 흐름과 같기 때문에, 처리가 끝난 데이터는 스트림에서 버려집니다.  예를 들어, 스트림을 통해 정수 시퀀스를 생성하는 경우, 스트림은 각 정수를 만들고, 만들어진 정수는 스트림의 파이프 라인 끝에 도달하면 스트림 내에서 사라집니다. 이는 데이터 흐름의 특성입니다. 새로운 데이터가 끊임없이 추가되는 경우, 이전 데이터를 모두 저장하는 것은 비효율적이고 메모리 낭비가 발생합니다. 그래서 스트림은 효율성을 위해 현재 데이터에만 집중하도록 설계되었습니다. 따라서 스트림은 각 데이터를 즉시 처리하는 경우에 사용하며, 저장이 필요하다면 스트림이 끝나기 전에 상태 등 다른 곳으로 저장하여야 합니다. 보통 파일 읽기, 서버로부터 데이터 수신, 사용자 입력, 센서 데이터의 실시간 수집과 같은 곳에 사용하죠.

+ Recent posts