MQ使用总结(四)如何保证消息不重复消费
幂等性 幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。例如,“setTrue()”函数就是一个幂等函数,无论多次执行,其结果都是一样的.更复杂的操作幂等保证是利用唯一交易号(流水号)实现. 简单来说,幂等性就是一个数据或者一个请求,给你重复来了多次,你得确保对应的数据是不会改变的,不能出错。 出现重复消费场景 首先,比如rabbitmq、rocketmq、kafka,都有可能会出现消息重复消费的问题。因为这个问题通常不是由mq来保证的,而是消费方自己来保证的。 举例kafka来说明重复消费问题:kafka有一个叫做offset的概念,就是每个消息写进去,都有一个offset代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次就算重启,k...
MQ使用总结(三)如何保证消息不丢
mq原则数据不能多,也不能少,不能多是说消息不能重复消费。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。 丢失数据场景丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下,丢失数据的场景。 rabbitmq生产者弄丢了数据生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。 rabbitmq自己丢了数据如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所依必须开启持久化将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。 消费端弄丢了数据主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rabbitmq就认为你已经消费过了,然后就丢了数据。 kafka生产者弄丢了数据生产者没有设置相应的策略,发送过程中丢失数据。 kafka弄丢了数据比较常见的...
MQ使用总结(二)消息积压在消息队列里怎么办
大量消息在mq里积压了几个小时了还没解决场景:几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上很晚,10点多,11点多。线上故障了,这个时候要不然就是修复consumer的问题,让他恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不行。一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。 解决方案:这种时候只能操作临时扩容,以更快的速度去消费数据了。具体操作步骤和思路如下: 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。 临时建立好原先10倍或者20倍的queue数量(新建一个topic,partition是原来的10倍)。 然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。 紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息。 这种做...
MQ使用总结(一)如何保证消息按顺序执行
为什么要保证顺序消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。举例:比如通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了 删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。 出现顺序错乱的场景rabbitmq 一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。 一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。 kafka kafka一个topic,一个partition...
深入学习jvm与垃圾回收
一、 Java内存区域常见面试题 介绍下 Java 内存区域(运行时数据区) Java 对象的创建过程(五步,建议能默写出来并且要知道每一步虚拟机做了什么) 对象的访问定位的两种方式(句柄和直接指针两种方式) String 类和常量池 8 种基本类型的包装类和常量池 1. 运行时数据区域 Java 虚拟机在执行 Java 程序的过程中会把它管理的内存划分成若干个不同的数据区域。JDK. 1.8 和之前的版本略有不同。 JVM 内存区域主要分为线程私有区域【程序计数器、虚拟机栈、本地方法区】、线程共享区域【JAVA 堆、方法区】、直接内存。线程私有数据区域生命周期与线程相同, 依赖用户线程的 启动/结束 而 创建/销毁 (在 Hotspot VM 内, 每个线程都与操作系统的本地线程直接映射, 因此这部分内存区域的存/否跟随本地线程的生/死对应)。 1.1程序计数器(线程私有)一块较小的内存空间, 是当前线程所执行的字节码的行号指示器,每条线程都要有一个独立的程序计数器,这类内存也称为“线程私有”的内存。正在执行 java 方...
mvcc浅析
在讲Mvcc前,我觉得有必要先了解一下InnoDB的行锁。mysql从5.5.5开始使用InnoDB作为默认存储引擎,lock的对象是事务,用来锁定的是数据库中的对象,如表、页、行。并且一般lock的对象仅在事务commit或rollback后进行释放(不同事务隔离级别释放的时间可能不同)。InnoDB存储引擎实现了如下两种标准的行锁: 共享锁(S Lock)允许事务读一行数据 排他锁(X Lock)允许事务删除或更新一行数据 说明: 读锁、S锁,若事务T对数据对象A加上S锁,则事务T可以读A但不能修改A,其他只能再对A加S锁,而不能加X锁,直到T释放A上的S锁。这保证了其他事务可以读A,但在T释放A上S锁之前不能对A做任何修改。 写锁、X锁,若事务T对数据对象A加上X锁,事务T可以读A也可以修改A,其他事务不能再对A加任何锁,直到T释放A上的锁,这保证了其他事务在T释放A上的锁之前不能再读取和修改A。 一致性非锁定读一致性的非锁定读(consistent nonlocking read )是指InnoDB通过行多版本控制(multi versioning)的方式来读...
布隆过滤器原理及使用
什么是布隆过滤器?我们来看这么一个场景:目标网站有上千万个URL,如何判断某个URL是否已经访问过?使用DB存储的话,就是把每个URL存入DB,然后每次访问URL前执行 1slect id from table where url = 'xxxx' 但随着URL数据量增多,每次请求前都要访问DB一次,效率非常低。当然,也可以用redis的set结构存储URL,优于DB存储,但也同样存在一个问题:耗费内存过多。如何解决?布隆过滤器! 布隆过滤器从本质上讲是一个位数组,位数组就是数组的每个元素都只占用1bit。每个元素只能是0或者1。这样申请一个10000个元素的位数组只占用 10000/8 = 1250B 的空间。布隆过滤器除了一个位数组,还有K个哈希函数。当一个元素加入布隆过滤器的时候,会进行如下操作: 使用K个哈希函数对元素值进行K次计算,得到K个哈希值。 根据得到的哈希值,在数组中把对应下标的值设置为1。 如上图:url1通过f1,f2,f3 三个哈希函数计算得到3个值,把数组中的三个位置设置为1。 要判断一个值是否存在,则只需对...
生产环境缓存失效解决方案
Redis的持久化机制Redis的数据都存放在内存中,如果没有配置持久化,redis重启后数据就全丢失了,于是需要开启redis的持久化功能,将数据保存到磁盘上,当redis重启后,可以从磁盘中恢复数据。 持久化方式 RDB 持久化 RDB 持久化方式能够在指定的时间间隔对你的数据进行快照存储 AOF(append only file)持久化 AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据 RDB 方式 客户端直接通过命令BGSAVE或者SAVE来创建一个内存快照 BGSAVE 调用fork来创建一个子进程,子进程负责将快照写入磁盘,而父进程仍然继续处理命令。 SAVE 执行SAVE命令过程中,不再响应其他命令。 在redis.conf中调整save配置选项,当在规定的时间内,Redis发生了写操作的个数满足条件会触发发生 BGSAVE命令 123456# 900秒之内至少一次写操作save 900 1 # 300秒之内至少发生10次写操作save 300 10# 60秒之内发生至少10000次save 60 10...
高可用Nginx集群安装搭建手册
LVS搭建Nginx集群准备工作环境说明共需要三台linux centos服务器,一台LVS,两台RealServer,端口号必须保持一致,设为80,所以需要3台服务器。 设定IP环境如下 服务名 IP 端口 作用 LVS-Director VIP 192.168.120.200 RIP 192.168.120.58 80 运行LVS均衡调度,对外提供虚拟IP访问 RealServer-Nginx1 192.168.120.83 80 运行nginx及tomcat服务,作为真实服务 1 RealServer-Nginx2 192.168.120.58 80 运行nginx及tomcat服务,作为真实服务 2 LVS-Director负责将80端口的请求,负载均衡到Nginx1、Nginx2两台真实服务器上去,接下来我们将进行配置LVS-Director的路由方式-DR,直接路由。准备IP地址信息打印程序balancer-1.0.0.jar,http://hostname:port/server/ip,打印服务器的IP端口号信息。 启动RealServe...
ElasticSearch Centos7 安装
准备工作开放端口关闭防火墙 12systemctl stop firewalldsystemctl disable firewalld 或者开放对应的端口号,比如ElasticSearch的9300、9200,Kibana的5601 1234sudo firewall-cmd --zone=public --add-port=9300/tcp --permanentsudo firewall-cmd --zone=public --add-port=9200/tcp --permanentsudo firewall-cmd --zone=public --add-port=5601/tcp --permanentsudo firewall-cmd --reload 新建用户1234# 添加用户备用useradd elastic# 设置密码passwd elastic 创建安装目录1mkdir /usr/local/elastic 下载安装包下载安装包 elasticsearch、kibana、IK Analysis,这里下载的是7.5.1版本,大家选择自己需要...




