Documentation
¶
Index ¶
- func StreamMap[T any, R any](ctx context.Context, concurrency int, name string, startIndex int64, ...) chan *OutputItem[T, R]
- type InputItem
- type MapperFunc
- type OperationType
- type OutputItem
- type ParallelMapImpl
- func (m *ParallelMapImpl[T, R]) Close() error
- func (m *ParallelMapImpl[T, R]) ReadMapperResult(chOutput chan *OutputItem[T, R]) ResultsMap[T, R]
- func (m *ParallelMapImpl[T, R]) Start(inputs []T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
- func (m *ParallelMapImpl[T, R]) StartWithChan(startIndex int64, chData chan T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
- type ReducerFunc
- type ResultsMap
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StreamMap ¶
func StreamMap[T any, R any](ctx context.Context, concurrency int, name string, startIndex int64, chInput chan T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
StreamMap Infinite map, Similar to a goroutines pool, read data from chInput, after handled by mapper, write the result to out chan.
Types ¶
type MapperFunc ¶
MapperFunc map input T to output R In the process of mapping, whether to continue processing subsequent items is determined by OperationType, not by error
type OperationType ¶
type OperationType uint8
const ( Continue OperationType = iota // 继续处理后续 Item, 即使有错也忽略 Stop // 立即停止处理 -- 比如文件拷贝任务,但当目标磁盘满时需要立刻停止 )
func Reduce ¶
func Reduce[T any](ctx context.Context, identity T, inputs []T, accumulator ReducerFunc[T]) (T, OperationType, error)
func (OperationType) String ¶
func (op OperationType) String() string
type OutputItem ¶
type OutputItem[T any, R any] struct { Index int64 Item T OpType OperationType Result R Err error }
func (*OutputItem[T, R]) String ¶
func (oi *OutputItem[T, R]) String() string
type ParallelMapImpl ¶
func NewParallelMapImpl ¶
func (*ParallelMapImpl[T, R]) Close ¶
func (m *ParallelMapImpl[T, R]) Close() error
func (*ParallelMapImpl[T, R]) ReadMapperResult ¶
func (m *ParallelMapImpl[T, R]) ReadMapperResult(chOutput chan *OutputItem[T, R]) ResultsMap[T, R]
func (*ParallelMapImpl[T, R]) Start ¶
func (m *ParallelMapImpl[T, R]) Start(inputs []T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
func (*ParallelMapImpl[T, R]) StartWithChan ¶
func (m *ParallelMapImpl[T, R]) StartWithChan(startIndex int64, chData chan T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
type ReducerFunc ¶
type ReducerFunc[T any] func(context.Context, T, T) (T, OperationType, error)
type ResultsMap ¶
type ResultsMap[T any, R any] map[int64]*OutputItem[T, R]
ResultsMap will save all input item's index map to result Item(OutputItem) The reason is: ParallelMap and StreamMap might return the results not same as input order. and their might exist error.
func Map ¶
func Map[T any, R any](ctx context.Context, inputs []T, mapper MapperFunc[T, R]) ResultsMap[T, R]
Map Serial map function
func ParallelMap ¶
func ParallelMap[T any, R any](ctx context.Context, inputs []T, concurrency int, name string, mapper MapperFunc[T, R]) ResultsMap[T, R]
ParallelMap Concurrent map, will start 2 + concurrency goroutines to handle all items in inputs
func (*ResultsMap[T, R]) ConvertResult ¶
func (r *ResultsMap[T, R]) ConvertResult() ([]R, []error, OperationType)
ConvertResult Convert the map to ordered slice as input order