【Redis】实战指南:从入门到精通


问题背景

Redis 作为一个高性能的内存数据库,已经成为现代应用架构中不可或缺的组件。它不仅可以作为缓存层提升系统性能,还能作为消息队列、分布式锁等多种用途。然而,如何在实际项目中正确高效地使用 Redis,需要深入理解其特性并结合具体业务场景。本文将从实战角度出发,介绍 Redis 在各种场景下的应用方法和最佳实践。

1. Redis 实战基础

1.1 环境搭建

在开始 Redis 实战之前,我们需要搭建一个可用的 Redis 环境。以下是几种常见的方式:

Docker 方式(推荐)

# 拉取 Redis 镜像
docker pull redis:latest

# 启动 Redis 容器
docker run --name my-redis -p 6379:6379 -d redis

# 连接到 Redis
docker exec -it my-redis redis-cli

直接安装

# Ubuntu/Debian
sudo apt-get update
sudo apt-get install redis-server

# CentOS/RHEL
sudo yum install redis

# macOS
brew install redis

1.2 连接 Redis

在 Java 应用中,我们通常使用 Jedis、Lettuce 或 Redisson 客户端连接 Redis。以下是使用 Spring Boot 集成 Redis 的示例:

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
# application.yml
spring:
  redis:
    host: localhost
    port: 6379
    database: 0
    timeout: 10000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        
        // 设置key的序列化方式
        template.setKeySerializer(new StringRedisSerializer());
        // 设置value的序列化方式
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        
        return template;
    }
}

2. 缓存实战

2.1 缓存设计模式

缓存穿透防护

缓存穿透是指查询一个不存在的数据,导致请求直接落到数据库上。

public User getUserById(Long id) {
    // 从缓存获取
    String key = "user:" + id;
    User user = (User) redisTemplate.opsForValue().get(key);
    
    // 缓存命中,直接返回
    if (user != null) {
        return user;
    }
    
    // 缓存未命中,查询数据库
    user = userMapper.selectById(id);
    
    // 防止缓存穿透:即使数据库中不存在该记录,也缓存空值
    if (user == null) {
        redisTemplate.opsForValue().set(key, new NullValueObject(), 5, TimeUnit.MINUTES);
        return null;
    }
    
    // 缓存查询结果
    redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
    return user;
}

缓存击穿防护

缓存击穿是指热点数据过期时,大量请求同时打到数据库。

public User getUserById(Long id) {
    String key = "user:" + id;
    User user = (User) redisTemplate.opsForValue().get(key);
    
    if (user != null) {
        return user;
    }
    
    // 使用分布式锁防止缓存击穿
    String lockKey = "lock:user:" + id;
    boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
    
    try {
        if (locked) {
            // 双重检查,防止其他线程已经缓存了数据
            user = (User) redisTemplate.opsForValue().get(key);
            if (user != null) {
                return user;
            }
            
            // 查询数据库
            user = userMapper.selectById(id);
            if (user != null) {
                redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
            } else {
                redisTemplate.opsForValue().set(key, new NullValueObject(), 5, TimeUnit.MINUTES);
            }
        } else {
            // 未获取到锁,短暂休眠后重试
            Thread.sleep(50);
            return getUserById(id);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } finally {
        // 释放锁
        redisTemplate.delete(lockKey);
    }
    
    return user;
}

缓存雪崩防护

缓存雪崩是指大量缓存同时过期,导致请求全部落到数据库。

// 设置随机过期时间,避免同时过期
private void setWithRandomExpire(String key, Object value) {
    // 基础过期时间30分钟
    int baseTime = 30 * 60;
    // 随机增加0~5分钟的过期时间
    int randomTime = new Random().nextInt(5 * 60);
    redisTemplate.opsForValue().set(key, value, baseTime + randomTime, TimeUnit.SECONDS);
}

2.2 缓存预热

系统启动时,提前加载热点数据到缓存中。

@Component
public class CachePreheater implements ApplicationRunner {
    
    @Autowired
    private ProductService productService;
    
    @Override
    public void run(ApplicationArguments args) {
        // 系统启动时预热缓存
        log.info("开始预热商品缓存...");
        List<Product> hotProducts = productService.findHotProducts();
        for (Product product : hotProducts) {
            String key = "product:" + product.getId();
            redisTemplate.opsForValue().set(key, product, 1, TimeUnit.HOURS);
        }
        log.info("商品缓存预热完成,共预热{}个商品", hotProducts.size());
    }
}

3. 分布式锁实战

3.1 基于 Redis 的分布式锁实现

public class RedisDistributedLock {
    
    private RedisTemplate<String, String> redisTemplate;
    private String lockKey;
    private String lockValue;
    private long expireTime;
    
    public RedisDistributedLock(RedisTemplate<String, String> redisTemplate, String lockKey, long expireTime) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey;
        this.lockValue = UUID.randomUUID().toString();
        this.expireTime = expireTime;
    }
    
    public boolean tryLock() {
        return redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
    }
    
    public boolean unlock() {
        // 使用Lua脚本保证原子性操作
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
        Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), lockValue);
        return result != null && result == 1;
    }
}

3.2 分布式锁的实际应用

@Service
public class OrderService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    public boolean createOrder(Long userId, Long productId) {
        // 创建分布式锁
        String lockKey = "lock:product:" + productId;
        RedisDistributedLock lock = new RedisDistributedLock(redisTemplate, lockKey, 10000);
        
        try {
            // 尝试获取锁
            if (lock.tryLock()) {
                // 检查库存
                int stock = getProductStock(productId);
                if (stock <= 0) {
                    return false;
                }
                
                // 扣减库存
                boolean result = reduceStock(productId);
                if (result) {
                    // 创建订单
                    createOrderRecord(userId, productId);
                    return true;
                }
                return false;
            } else {
                // 获取锁失败,提示用户稍后重试
                return false;
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    
    // 其他方法...
}

4. 消息队列实战

4.1 基于 Redis List 实现简单消息队列

// 生产者
public void sendMessage(String message) {
    redisTemplate.opsForList().rightPush("message:queue", message);
}

// 消费者
@Scheduled(fixedRate = 1000)
public void consumeMessage() {
    String message = redisTemplate.opsForList().leftPop("message:queue");
    if (message != null) {
        // 处理消息
        processMessage(message);
    }
}

4.2 基于 Redis Pub/Sub 实现消息广播

// 配置Redis消息监听器
@Configuration
public class RedisMessageConfig {
    
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
    
    @Bean
    public MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }
    
    @Bean
    public ChannelTopic topic() {
        return new ChannelTopic("messageChannel");
    }
    
    @Bean
    public RedisMessageListenerContainer redisContainer() {
        RedisMessageListenerContainer container = container(redisConnectionFactory());
        container.addMessageListener(messageListener(), topic());
        return container;
    }
}

// 消息订阅者
public class RedisMessageSubscriber implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String receivedMessage = new String(message.getBody());
        System.out.println("接收到消息: " + receivedMessage);
        // 处理消息
    }
}

// 消息发布者
@Service
public class RedisMessagePublisher {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private ChannelTopic topic;
    
    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

5. 排行榜实战

5.1 基于 Sorted Set 实现实时排行榜

@Service
public class LeaderboardService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String LEADERBOARD_KEY = "leaderboard:scores";
    
    // 更新用户分数
    public void updateScore(String userId, double score) {
        redisTemplate.opsForZSet().add(LEADERBOARD_KEY, userId, score);
    }
    
    // 增加用户分数
    public void incrementScore(String userId, double increment) {
        redisTemplate.opsForZSet().incrementScore(LEADERBOARD_KEY, userId, increment);
    }
    
    // 获取用户排名(从0开始)
    public Long getUserRank(String userId) {
        return redisTemplate.opsForZSet().reverseRank(LEADERBOARD_KEY, userId);
    }
    
    // 获取用户分数
    public Double getUserScore(String userId) {
        return redisTemplate.opsForZSet().score(LEADERBOARD_KEY, userId);
    }
    
    // 获取前N名用户
    public Set<Object> getTopN(int n) {
        return redisTemplate.opsForZSet().reverseRange(LEADERBOARD_KEY, 0, n - 1);
    }
    
    // 获取用户分数和排名
    public Map<String, Object> getUserScoreAndRank(String userId) {
        Map<String, Object> result = new HashMap<>();
        Double score = getUserScore(userId);
        Long rank = getUserRank(userId);
        
        result.put("userId", userId);
        result.put("score", score);
        result.put("rank", rank != null ? rank + 1 : null);
        
        return result;
    }
}

## 6. 限流器实战

### 6.1 基于Redis的限流器实现

```java
@Service
public class RateLimiter {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 简单的计数器限流
     * @param key 限流key
     * @param limit 限制次数
     * @param period 时间窗口(秒)
     * @return 是否允许访问
     */
    public boolean isAllowed(String key, int limit, int period) {
        String countKey = "ratelimit:" + key;
        
        Long count = redisTemplate.opsForValue().increment(countKey, 1);
        
        if (count == 1) {
            // 设置过期时间
            redisTemplate.expire(countKey, period, TimeUnit.SECONDS);
        }
        
        return count <= limit;
    }
    
    /**
     * 滑动窗口限流
     * @param key 限流key
     * @param limit 限制次数
     * @param window 时间窗口(秒)
     * @return 是否允许访问
     */
    public boolean isAllowedByWindow(String key, int limit, int window) {
        String windowKey = "ratelimit:sliding:" + key;
        long now = System.currentTimeMillis();
        
        // 移除时间窗口之前的数据
        redisTemplate.opsForZSet().removeRangeByScore(
            windowKey, 0, now - window * 1000
        );
        
        // 获取当前窗口的请求数
        Long count = redisTemplate.opsForZSet().zCard(windowKey);
        
        if (count < limit) {
            // 添加当前请求
            redisTemplate.opsForZSet().add(windowKey, UUID.randomUUID().toString(), now);
            // 设置过期时间
            redisTemplate.expire(windowKey, window + 1, TimeUnit.SECONDS);
            return true;
        }
        
        return false;
    }
}

6.2 限流器的使用示例

@RestController
public class ApiController {
    
    @Autowired
    private RateLimiter rateLimiter;
    
    @GetMapping("/api/test")
    public String testApi() {
        String key = "api:" + RequestContextHolder.currentRequestAttributes().getSessionId();
        
        if (!rateLimiter.isAllowed(key, 10, 60)) {
            throw new RuntimeException("访问过于频繁,请稍后再试");
        }
        
        // 业务逻辑
        return "success";
    }
}

7. 地理位置服务实战

7.1 基于Redis GEO实现位置服务

@Service
public class LocationService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String GEO_KEY = "locations";
    
    // 添加位置信息
    public void addLocation(String memberId, double longitude, double latitude) {
        redisTemplate.opsForGeo().add(GEO_KEY, new Point(longitude, latitude), memberId);
    }
    
    // 获取位置信息
    public Point getLocation(String memberId) {
        List<Point> points = redisTemplate.opsForGeo().position(GEO_KEY, memberId);
        return points != null && !points.isEmpty() ? points.get(0) : null;
    }
    
    // 计算两点之间的距离(单位:米)
    public Double getDistance(String member1, String member2) {
        Distance distance = redisTemplate.opsForGeo().distance(GEO_KEY, member1, member2, Metrics.METERS);
        return distance != null ? distance.getValue() : null;
    }
    
    // 查找指定范围内的位置
    public GeoResults<RedisGeoCommands.GeoLocation<Object>> findNearby(
            double longitude,
            double latitude,
            double radius,    // 单位:米
            int limit
    ) {
        Circle circle = new Circle(new Point(longitude, latitude), new Distance(radius, Metrics.METERS));
        RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs
                .newGeoRadiusArgs()
                .includeDistance()
                .includeCoordinates()
                .sortAscending()
                .limit(limit);
        
        return redisTemplate.opsForGeo().radius(GEO_KEY, circle, args);
    }
}

7.2 地理位置服务的使用示例

@RestController
@RequestMapping("/api/location")
public class LocationController {
    
    @Autowired
    private LocationService locationService;
    
    @PostMapping("/add")
    public void addLocation(@RequestBody LocationDTO dto) {
        locationService.addLocation(dto.getMemberId(), dto.getLongitude(), dto.getLatitude());
    }
    
    @GetMapping("/nearby")
    public List<Map<String, Object>> findNearby(
            @RequestParam double longitude,
            @RequestParam double latitude,
            @RequestParam(defaultValue = "1000") double radius
    ) {
        GeoResults<RedisGeoCommands.GeoLocation<Object>> results = 
                locationService.findNearby(longitude, latitude, radius, 10);
        
        List<Map<String, Object>> response = new ArrayList<>();
        for (GeoResult<RedisGeoCommands.GeoLocation<Object>> result : results) {
            Map<String, Object> location = new HashMap<>();
            location.put("memberId", result.getContent().getName());
            location.put("distance", result.getDistance().getValue());
            location.put("coordinates", result.getContent().getPoint());
            response.add(location);
        }
        
        return response;
    }
}

总结

本文介绍了Redis在实际项目中的多种应用场景,包括缓存、分布式锁、消息队列、排行榜、限流器和地理位置服务等。通过这些实战示例,我们可以看到Redis不仅仅是一个简单的缓存系统,而是一个功能强大的数据存储解决方案。在实际使用中,需要根据具体的业务场景选择合适的数据结构和实现方式,同时注意性能优化和数据一致性等问题。

参考资料


希望这篇文章能帮助您更好地理解和应用Redis的各项功能。如果您有任何问题或建议,欢迎在评论区与我交流!


文章作者: lucky845
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 lucky845 !
评论
  目录