| /* |
| * Copyright (C) 2023 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_ |
| #define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_ |
| |
| #include <functional> |
| #include <memory> |
| #include <vector> |
| |
| #include "perfetto/base/status.h" |
| #include "perfetto/ext/base/status_or.h" |
| #include "perfetto/ext/base/threading/future.h" |
| #include "perfetto/ext/base/threading/stream_combinators.h" |
| |
| namespace perfetto { |
| namespace base { |
| |
| // Creates a Stream<T> from P, a subclass of StreamPollable<T>. |
| // |
| // This function follows the same pattern of std::make_unique, std::make_shared |
| // etc. |
| template <typename P, typename... Args> |
| Stream<typename P::PollT> MakeStream(Args... args) { |
| return Stream<typename P::PollT>( |
| std::unique_ptr<StreamPollable<typename P::PollT>>( |
| new P(std::forward<Args>(args)...))); |
| } |
| |
| // An asynchronous iterator for values of type T. |
| // |
| // If Future<T> is an asynchronous version of T, Stream<T> is an asynchronous |
| // version of Iterator<T>. Long-running compute/IO operations which return |
| // multiple values can be represented with a Stream<T>. |
| // |
| // Refer to the class documentation for Future<T> as most of the features and |
| // implementation of Future<T> also apply to Stream<T>. |
| template <typename T> |
| class Stream { |
| public: |
| using PollableItem = T; |
| |
| // Creates a Stream from a |StreamPollable<T>|. Prefer using |MakeStream| |
| // instead of this function. |
| explicit Stream(std::unique_ptr<StreamPollable<T>> pollable) |
| : pollable_(std::move(pollable)) {} |
| |
| // Converts a Stream<T> to Stream<U>. This works by applying |map_fn| to each |
| // element in T and then polling the returned Future<U> to completion. |
| template <typename Function /* = Future<U>(T) */> |
| Stream<FutureReturn<Function, T>> MapFuture(Function map_fn) && { |
| return MakeStream<MapFutureStreamImpl<Function, T>>(std::move(*this), |
| std::move(map_fn)); |
| } |
| |
| // Creates a stream which fully polls |this| and then polls |concat| to |
| // completion. |
| Stream<T> Concat(Stream<T> concat) && { |
| return MakeStream<ConcatStreamImpl<T>>(std::move(*this), std::move(concat)); |
| } |
| |
| // Converts a Stream<T> to Future<U> by collecting elements using |collector|. |
| // See documentation on |Collector| for how to implement one. |
| template <typename U> |
| Future<U> Collect(std::unique_ptr<Collector<T, U>> collector) && { |
| return MakeFuture<CollectImpl<T, U>>(std::move(*this), |
| std::move(collector)); |
| } |
| |
| // Checks if the computation backing this Stream<T> has finished. |
| // |
| // Returns a StreamPollResult<T> which is a essentially a |
| // variant<PendingPollResult, DonePollResult T>. If PendingPollResult is |
| // returned, |ctx| will be used to register interest in the various fds which |
| // are "blocking" this future from finishing. If DonePollResult is returned, |
| // Poll *must not* be called again. |
| StreamPollResult<T> PollNext(PollContext* ctx) { |
| return pollable_->PollNext(ctx); |
| } |
| |
| private: |
| std::unique_ptr<StreamPollable<T>> pollable_; |
| }; |
| |
| // Alias to shorten type defintions for Stream<Status> which is common in |
| // the codebase. |
| using StatusStream = Stream<Status>; |
| |
| // Alias to shorten type defintions for Stream<StatusOr<T>> which is common |
| // in the codebase. |
| template <typename T> |
| using StatusOrStream = Stream<StatusOr<T>>; |
| |
| // Creates a Stream<T> which returns the next value inside |vector| every time |
| // Stream<T>::Poll is called. |
| template <typename T> |
| Stream<T> StreamFrom(std::vector<T> vector) { |
| return MakeStream<ImmediateStreamImpl<T>>(std::move(vector)); |
| } |
| |
| // Creates a Stream<T> which immediately returns DonePollResult when polled. |
| template <typename T> |
| Stream<T> EmptyStream() { |
| return StreamFrom(std::vector<T>()); |
| } |
| |
| // Creates a Stream<T> which returns |first| and each of |rest| in sequence when |
| // polled. |
| template <typename T, typename... Ts> |
| Stream<T> StreamOf(T first, Ts... rest) { |
| std::vector<T> values; |
| AddAllToVector(values, std::forward<T>(first), std::forward<Ts>(rest)...); |
| return StreamFrom(std::move(values)); |
| } |
| |
| // Creates a Stream<T> which returns the value of |future| before completing. |
| template <typename T> |
| Stream<T> StreamFromFuture(Future<T> future) { |
| return StreamOf(std::move(future)).MapFuture([](Future<T> value) { return value; }); |
| } |
| |
| // Creates a stream which returns no elements but calls |fn| in the destructor |
| // of the returned stream. |
| // |
| // This function can be used to do resource management for a stream by making |
| // the passed |fn| own the resources used by any "upstream" sources and then |
| // Concat-ing this stream with the upstream. |
| template <typename T, typename Function> |
| Stream<T> OnDestroyStream(Function fn) { |
| return MakeStream<OnDestroyStreamImpl<T, Function>>(std::move(fn)); |
| } |
| |
| // Creates a Stream<T> returning values generated by each stream in |streams| as |
| // soon as they are produced without preserving ordering. |
| // |
| // The returned Stream<T> keeps the amount of Poll calls to the inner |streams|, |
| // to a minimum only calling Poll for the Streams which are marked are ready |
| // in the PollContext. |
| template <typename T> |
| Stream<T> FlattenStreams(std::vector<Stream<T>> streams) { |
| return MakeStream<FlattenImpl<T>>(std::move(streams)); |
| } |
| |
| // Collector for Stream<Status>::Collect() which immediately resolves the |
| // returned Future when an error status is detected. Resolves with |
| // OkStatus once the entire stream finishes after returning all OkStatus(). |
| inline std::unique_ptr<Collector<Status, Status>> AllOkCollector() { |
| return std::make_unique<AllOkCollectorImpl>(); |
| } |
| |
| // Collector for Stream<T>::Collect() which ensures the stream returns *exactly* |
| // one T before completing. Crashes if either a) no values are produced by |
| // the Stream, b) more than one value is produced by the Stream. |
| template <typename T> |
| inline std::unique_ptr<Collector<T, T>> ToFutureCheckedCollector() { |
| return std::make_unique<FutureCheckedCollectorImpl<T>>(); |
| } |
| |
| // Collector for Stream<StatusOr<T>>::Collect() which returns a vector |
| // containing all the successful results from the stream. If any element is an |
| // error, short-circuits the stream with the error. |
| template <typename T> |
| inline std::unique_ptr< |
| Collector<StatusOr<T>, StatusOr<std::vector<T>>>> |
| StatusOrVectorCollector() { |
| return std::make_unique<StatusOrVectorCollectorImpl<T>>(); |
| } |
| |
| } // namespace base |
| } // namespace perfetto |
| |
| #endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_ |