package storage

import (
	"encoding/json"
	"strconv"
	"strings"
	"time"

	"bitbucket.org/ww/cabinet"
)

const (
	stateMetaKey      = "state"
	stateSaveInterval = 1 // seconds
)

type (
	Storage struct {
		kyoto    *cabinet.KCDB
		counters map[string]*counter
	}
)

func New(path string) (s *Storage, err error) {
	kyoto := cabinet.New()
	if err = kyoto.Open(path, cabinet.KCOWRITER|cabinet.KCOCREATE); err != nil {
		return
	}

	s = &Storage{
		kyoto:    kyoto,
		counters: make(map[string]*counter),
	}

	return
}

func (s *Storage) Get(queue string, done <-chan struct{}) (message []byte, ok bool) {
	if _, exist := s.counters[queue]; !exist {
		return
	}
	if size := s.counters[queue].distance(); size == 0 {
		return
	}

	var index uint
	select {
	case index = <-s.counters[queue].stream:
	case <-done:
		return
	}

	key := makeKey(queue, index)
	message, err := s.kyoto.Get(key)
	if err != nil {
		panic(err)
	}
	if err := s.kyoto.Remove(key); err != nil {
		panic(err)
	}
	ok = true

	return
}

func (s *Storage) Put(queue string, message []byte) (err error) {
	if _, ok := s.counters[queue]; !ok {
		s.counters[queue] = newCounter(0, 0)
	}

	s.counters[queue].tryWrite(func(index uint) bool {
		key := makeKey(queue, index)
		err = s.kyoto.Set(key, message)

		return (err == nil)
	})

	return
}

func (s *Storage) Flush(queue string) (messages [][]byte) {
	done := make(chan struct{})

	for {
		if msg, ok := s.Get(queue, done); ok {
			messages = append(messages, msg)
		} else {
			return
		}
	}

	return
}

func (s *Storage) QueueSizes() map[string]uint {
	info := make(map[string]uint)

	for queue, c := range s.counters {
		info[queue] = c.distance()
	}

	return info
}

func (s *Storage) Info() map[string]interface{} {
	info := make(map[string]interface{})
	status, err := s.kyoto.Status()
	if err != nil {
		panic(err)
	}

	status = status[:len(status)-1] // Removing trailing new line
	tokens := strings.Split(status, "\n")
	for _, t := range tokens {
		tt := strings.Split(t, "\t")
		num, err := strconv.Atoi(tt[1])
		if err != nil {
			info[tt[0]] = tt[1]
		} else {
			info[tt[0]] = num
		}
	}

	return info
}

func (s *Storage) Close() (err error) {
	if err = s.kyoto.Sync(true); err != nil {
		return
	}
	err = s.kyoto.Close()

	return
}

// State

func (s *Storage) saveState() (err error) {
	state := make(map[string]map[string]uint)
	for queue, ctr := range s.counters {
		state[queue] = map[string]uint{
			"wi": ctr.write,
			"ri": ctr.read,
		}
	}

	jsn, _ := json.Marshal(state)
	err = s.kyoto.Set([]byte(stateMetaKey), jsn)

	return
}

func (s *Storage) loadState() (err error) {
	var (
		jsn   []byte
		state = make(map[string]map[string]uint)
	)

	if jsn, err = s.kyoto.Get([]byte(stateMetaKey)); err != nil {
		return
	}
	if err = json.Unmarshal(jsn, &state); err != nil {
		return
	}

	for queue, meta := range state {
		s.counters[queue] = newCounter(meta["wi"], meta["ri"])
	}

	return
}

func (s *Storage) keepStatePersisted() {
	t := time.NewTicker(stateSaveInterval * time.Second)

	for {
		select {
		case <-t.C:
			if err := s.saveState(); err != nil {
				panic("Failed to persist state")
			}
			if err := s.kyoto.Sync(false); err != nil {
				panic("Failed to sync storage")
			}
		}
	}
}

func makeKey(queue string, index uint) []byte {
	// TODO: There should be a faster way
	return []byte(strings.Join([]string{queue, strconv.FormatUint(uint64(index), 10)}, "_"))
}