From 611b5084258371e2edcc3675c97df89033f5d7bd Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sun, 17 Jun 2018 12:57:47 +0200 Subject: [PATCH] Add thread pool implementation --- LICENSE | 19 +++++++++++ README.md | 1 + threadpool/threadpool.go | 63 +++++++++++++++++++++++++++++++++++ threadpool/threadpool_test.go | 49 +++++++++++++++++++++++++++ 4 files changed, 132 insertions(+) create mode 100644 LICENSE create mode 100644 threadpool/threadpool.go create mode 100644 threadpool/threadpool_test.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b23aaeb --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2018 Gregory Eremin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index e69de29..ec638bb 100644 --- a/README.md +++ b/README.md @@ -0,0 +1 @@ +# Go tool belt diff --git a/threadpool/threadpool.go b/threadpool/threadpool.go new file mode 100644 index 0000000..56fd45b --- /dev/null +++ b/threadpool/threadpool.go @@ -0,0 +1,63 @@ +package threadpool + +import ( + "context" + "log" + "sync" +) + +// ThreadPool implements a thread pool model. It allocates a pool of threads +// ready to perform tasks concurrently. +type ThreadPool struct { + Logger interface { + Printf(f string, args ...interface{}) + } + queue chan func() + wg sync.WaitGroup +} + +// New creates a thread pool with a given number of workers. +func New(size int) *ThreadPool { + tp := &ThreadPool{ + Logger: &log.Logger{}, + queue: make(chan func(), size), + } + + tp.wg.Add(size) + for i := 0; i < size; i++ { + go tp.worker() + } + + return tp +} + +// Enqueue adds a task to queue. +func (tp *ThreadPool) Enqueue(ctx context.Context, task func()) { + select { + case tp.queue <- task: + case <-ctx.Done(): + } +} + +// Close waits for all currently accepted tasks to be processed and returns. +// Attempts to enqueue a task after calling Close would result in a panic. +func (tp *ThreadPool) Close() { + close(tp.queue) + tp.wg.Wait() +} + +func (tp *ThreadPool) worker() { + defer tp.wg.Done() + for task := range tp.queue { + tp.perform(task) + } +} + +func (tp *ThreadPool) perform(task func()) { + defer func() { + if err := recover(); err != nil { + tp.Logger.Printf("Thread pool task recovered from panic: %v", err) + } + }() + task() +} diff --git a/threadpool/threadpool_test.go b/threadpool/threadpool_test.go new file mode 100644 index 0000000..413ace1 --- /dev/null +++ b/threadpool/threadpool_test.go @@ -0,0 +1,49 @@ +package threadpool + +import ( + "bytes" + "context" + "fmt" + "sync/atomic" + "testing" +) + +func TestThreadPool(t *testing.T) { + const n = 100 + var s int64 + ctx := context.Background() + + pool := New(n / 10) + for i := 0; i < n; i++ { + pool.Enqueue(ctx, func() { atomic.AddInt64(&s, 1) }) + } + pool.Close() + + if s != n { + t.Errorf("Thread pool result doesn't match: expected %d, got %d", n, s) + } +} + +func TestThreadPoolPanicHandling(t *testing.T) { + logger := &bufLogger{buf: bytes.NewBuffer(nil)} + ctx := context.Background() + + pool := New(1) + pool.Logger = logger + pool.Enqueue(ctx, func() { panic("oh no!") }) + pool.Close() + + out := logger.buf.String() + exp := "Thread pool task recovered from panic: oh no!" + if out != exp { + t.Errorf("Expected logger to receive message %q, got %q", exp, out) + } +} + +type bufLogger struct { + buf *bytes.Buffer +} + +func (b *bufLogger) Printf(f string, args ...interface{}) { + b.buf.WriteString(fmt.Sprintf(f, args...)) +}