mirror of https://github.com/mum4k/termdash.git
232 lines
5.0 KiB
Go
232 lines
5.0 KiB
Go
// Copyright 2018 Google Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package eventqueue provides an unboud FIFO queue of events.
|
|
package eventqueue
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mum4k/termdash/internal/terminalapi"
|
|
)
|
|
|
|
// node is a single data item on the queue.
|
|
type node struct {
|
|
prev *node
|
|
next *node
|
|
event terminalapi.Event
|
|
}
|
|
|
|
// Unbound is an unbound FIFO queue of terminal events.
|
|
// Unbound must not be copied, pass it by reference only.
|
|
// This implementation is thread-safe.
|
|
type Unbound struct {
|
|
first *node
|
|
last *node
|
|
// mu protects first and last.
|
|
mu sync.Mutex
|
|
|
|
// cond is used to notify any callers waiting on a call to Pull().
|
|
cond *sync.Cond
|
|
|
|
// condMU protects cond.
|
|
condMU sync.RWMutex
|
|
|
|
// done is closed when the queue isn't needed anymore.
|
|
done chan struct{}
|
|
}
|
|
|
|
// New returns a new Unbound queue of terminal events.
|
|
// Call Close() when done with the queue.
|
|
func New() *Unbound {
|
|
u := &Unbound{
|
|
done: make(chan (struct{})),
|
|
}
|
|
u.cond = sync.NewCond(&u.condMU)
|
|
go u.wake() // Stops when Close() is called.
|
|
return u
|
|
}
|
|
|
|
// wake periodically wakes up all goroutines waiting at Pull() so they can
|
|
// check if the context expired.
|
|
func (u *Unbound) wake() {
|
|
const spinTime = 250 * time.Millisecond
|
|
t := time.NewTicker(spinTime)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
u.cond.Broadcast()
|
|
case <-u.done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Empty determines if the queue is empty.
|
|
func (u *Unbound) Empty() bool {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
return u.empty()
|
|
}
|
|
|
|
// empty determines if the queue is empty.
|
|
func (u *Unbound) empty() bool {
|
|
return u.first == nil
|
|
}
|
|
|
|
// Push pushes an event onto the queue.
|
|
func (u *Unbound) Push(e terminalapi.Event) {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
u.push(e)
|
|
}
|
|
|
|
// push is the implementation of Push.
|
|
// Caller must hold u.mu.
|
|
func (u *Unbound) push(e terminalapi.Event) {
|
|
n := &node{
|
|
event: e,
|
|
}
|
|
if u.empty() {
|
|
u.first = n
|
|
u.last = n
|
|
} else {
|
|
prev := u.last
|
|
u.last.next = n
|
|
u.last = n
|
|
u.last.prev = prev
|
|
}
|
|
u.cond.Signal()
|
|
}
|
|
|
|
// Pop pops an event from the queue. Returns nil if the queue is empty.
|
|
func (u *Unbound) Pop() terminalapi.Event {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
if u.empty() {
|
|
return nil
|
|
}
|
|
|
|
n := u.first
|
|
u.first = u.first.next
|
|
|
|
if u.empty() {
|
|
u.last = nil
|
|
}
|
|
return n.event
|
|
}
|
|
|
|
// Pull is like Pop(), but blocks until an item is available or the context
|
|
// expires. Returns a nil event if the context expired.
|
|
func (u *Unbound) Pull(ctx context.Context) terminalapi.Event {
|
|
if e := u.Pop(); e != nil {
|
|
return e
|
|
}
|
|
|
|
u.cond.L.Lock()
|
|
defer u.cond.L.Unlock()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
if e := u.Pop(); e != nil {
|
|
return e
|
|
}
|
|
u.cond.Wait()
|
|
}
|
|
}
|
|
|
|
// Close should be called when the queue isn't needed anymore.
|
|
func (u *Unbound) Close() {
|
|
close(u.done)
|
|
}
|
|
|
|
// Throttled is an unbound and throttled FIFO queue of terminal events.
|
|
// Throttled must not be copied, pass it by reference only.
|
|
// This implementation is thread-safe.
|
|
type Throttled struct {
|
|
queue *Unbound
|
|
max int
|
|
}
|
|
|
|
// NewThrottled returns a new Throttled queue of terminal events.
|
|
//
|
|
// This queue scans the queue content on each Push call and won't Push the
|
|
// event if there already is a continuous chain of exactly the same events
|
|
// en queued. The argument maxRep specifies the maximum number of repetitive
|
|
// events.
|
|
//
|
|
// Call Close() when done with the queue.
|
|
func NewThrottled(maxRep int) *Throttled {
|
|
t := &Throttled{
|
|
queue: New(),
|
|
max: maxRep,
|
|
}
|
|
return t
|
|
}
|
|
|
|
// Empty determines if the queue is empty.
|
|
func (t *Throttled) Empty() bool {
|
|
return t.queue.empty()
|
|
}
|
|
|
|
// Push pushes an event onto the queue.
|
|
func (t *Throttled) Push(e terminalapi.Event) {
|
|
t.queue.mu.Lock()
|
|
defer t.queue.mu.Unlock()
|
|
|
|
if t.queue.empty() {
|
|
t.queue.push(e)
|
|
return
|
|
}
|
|
|
|
var same int
|
|
for n := t.queue.last; n != nil; n = n.prev {
|
|
if reflect.DeepEqual(e, n.event) {
|
|
same++
|
|
} else {
|
|
break
|
|
}
|
|
|
|
if same > t.max {
|
|
return // Drop the repetitive event.
|
|
}
|
|
}
|
|
t.queue.push(e)
|
|
}
|
|
|
|
// Pop pops an event from the queue. Returns nil if the queue is empty.
|
|
func (t *Throttled) Pop() terminalapi.Event {
|
|
return t.queue.Pop()
|
|
}
|
|
|
|
// Pull is like Pop(), but blocks until an item is available or the context
|
|
// expires. Returns a nil event if the context expired.
|
|
func (t *Throttled) Pull(ctx context.Context) terminalapi.Event {
|
|
return t.queue.Pull(ctx)
|
|
}
|
|
|
|
// Close should be called when the queue isn't needed anymore.
|
|
func (t *Throttled) Close() {
|
|
close(t.queue.done)
|
|
}
|