go里面几个并发案例

时间:2024-01-22 11:41:42

1、用golang 写一个 消息队列,通过channel 多协程实现,一个写队列多个读队列

type MessageQueue struct {
	mu      sync.Mutex
	queue   chan string
	readers []chan string
}

func NewMessageQueue() *MessageQueue {
	return &MessageQueue{
		queue:   make(chan string, 10), // Buffer size 10, adjust as needed
		readers: make([]chan string, 0),
	}
}

func (mq *MessageQueue) AddReader(readerChan chan string) {
	mq.mu.Lock()
	defer mq.mu.Unlock()
	mq.readers = append(mq.readers, readerChan)
}

func (mq *MessageQueue) Write(message string) {
	mq.queue <- message
}

func (mq *MessageQueue) StartReaders() {
	for _, readerChan := range mq.readers {
		go func(ch chan string) {
			for {
				message, ok := <-ch
				if !ok {
					break
				}
				fmt.Println("Received:", message)
			}
		}(readerChan)
	}
}

func main() {
	messageQueue := NewMessageQueue()

	// 启动多个 reader goroutine
	for i := 0; i < 3; i++ {
		readerChan := make(chan string, 10) // 缓冲区大小10,根据需要调整
		messageQueue.AddReader(readerChan)
	}

	// 启动 reader goroutine
	messageQueue.StartReaders()

	// 将消息写入队列
	for i := 1; i <= 5; i++ {
		messageQueue.Write(fmt.Sprintf("Message %d", i))
		time.Sleep(time.Second)
	}

	// 完成后关闭读者通道
	messageQueue.mu.Lock()
	for _, readerChan := range messageQueue.readers {
		close(readerChan)
	}
	messageQueue.mu.Unlock()

	// 留出时间让 reader goroutine 完成处理
	time.Sleep(time.Second)
}

2、用golang 写一个 消息队列,通过channel 多协程实现,多个写队列多个读队列

package main

import (
	"fmt"
	"sync"
)

type MessageQueue1 struct {
	mu    sync.Mutex
	queue []string
}

func NewMessageQueue1() *MessageQueue1 {
	return &MessageQueue1{
		queue: make([]string, 0),
	}
}

func (mq *MessageQueue1) Write(message string) {
	mq.mu.Lock()
	defer mq.mu.Unlock()
	mq.queue = append(mq.queue, message)
}

func (mq *MessageQueue1) Read() (string, bool) {
	mq.mu.Lock()
	defer mq.mu.Unlock()

	if len(mq.queue) > 0 {
		message := mq.queue[0]
		mq.queue = mq.queue[1:]
		return message, true
	}

	return "", false
}

func main() {
	messageQueue1 := NewMessageQueue1()
	var wg sync.WaitGroup
	done := make(chan struct{})

	// 启动多个写协程
	for i := 1; i <= 8; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			for j := 1; j <= 5; j++ {
				message := fmt.Sprintf("-- Writer %d: Message %d", id, j)
				messageQueue1.Write(message)
				//time.Sleep(time.Millisecond * 1)
			}
		}(i)
	}

	// 启动多个读协程
	for i := 0; i <= 7; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			for {
				message, ok := messageQueue1.Read()
				if !ok {
					break
				}
				fmt.Printf("Reader %d received: %s\n", id, message)
			}
		}(i)
	}

	// 等待所有协程完成
	go func() {
		wg.Wait()
		close(done)
	}()

	// 等待所有协程完成后结束程序
	<-done
}

3、并发安全的全局计数器编写一个 Go 程序,实现一个并发安全的全局计数器,要求能够支持并发的增加和获取计数。请使用互斥锁或其他并发安全的机制,确保多个 goroutine同时访问时不会出现竞态条件。

type Counter struct {
	Count int
	mx    sync.Mutex
}

func (c Counter) InCream() {
	c.mx.Lock()
	defer c.mx.Unlock()
	c.Count++
}

func (c Counter) CountNumber() int {
	c.mx.Lock() //并发安全
	defer c.mx.Unlock()
	return c.Count
}

func main() {
	count := Counter{}
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ { //并发的增加
		wg.Add(1)
		go func() {
			count.InCream()
			wg.Done()
		}()
	}
	wg.Wait()

	for i := 0; i < 20; i++ {
		go func(i int) {
			fmt.Printf("Count %v,%v \n", i, count.CountNumber())
		}(i)
	}
	time.Sleep(1 * time.Second)

}

4、并发安全的缓存
实现一个带有过期时间的并发安全的缓存系统。
缓存应该支持设置键值对、获取键值对和定期清理过期的键值对。使用互斥锁或其他并发安全的机制确保多个 goroutine 同时访问时不会出现竟态条件。

package main

import (
"sync"
"time"
)

// CacheItem 表示缓存中的一个键值对
type CacheItem struct {
	Value      interface{}
	Expiration int64 // 过期时间戳,单位秒
}

// ConcurrentCache 表示并发安全的缓存系统
type ConcurrentCache struct {
	cache map[string]CacheItem
	mutex sync.Mutex
}

// NewConcurrentCache 创建一个新的并发安全的缓存系统
func NewConcurrentCache() *ConcurrentCache {
	return &ConcurrentCache{
		cache: make(map[string]CacheItem),
	}
}

// Set 设置缓存中的键值对,并指定过期时间(秒)
func (c *ConcurrentCache) Set(key string, value interface{}, expirationSeconds int64) {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	expirationTime := time.Now().Unix() + expirationSeconds
	c.cache[key] = CacheItem{
		Value:      value,
		Expiration: expirationTime,
	}
}

// Get 获取缓存中指定键的值,如果键不存在或已过期则返回nil
func (c *ConcurrentCache) Get(key string) interface{} {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	item, exists := c.cache[key]
	if !exists || time.Now().Unix() > item.Expiration {
		// 键不存在或已过期
		return nil
	}

	return item.Value
}

// CleanExpired 清理过期的键值对
func (c *ConcurrentCache) CleanExpired() {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	currentTime := time.Now().Unix()
	for key, item := range c.cache {
		if currentTime > item.Expiration {
			delete(c.cache, key)
		}
	}
}

func main() {
	// 示例用法
	cache := NewConcurrentCache()

	// 设置键值对,并指定过期时间为10秒
	cache.Set("key1", "value1", 10)

	// 获取键值对
	result := cache.Get("key1")
	if result != nil {
		println(result.(string))
	} else {
		println("Key not found or expired.")
	}

	// 定期清理过期的键值对
	cache.CleanExpired()
}