Răsfoiți Sursa

8.30 引入雪花算法,生成同步数据给航通时永不重复的 msgId

Mechrevo 1 an în urmă
părinte
comite
1f3c1f7590

+ 70 - 0
sp-core/sp-api/src/main/java/com/pj/api/dto/OrderDto.java

@@ -7,6 +7,7 @@ import java.util.Date;
  * @Author Mechrevo
  * @Date 2023 08 30 09 12
  **/
+
 public class OrderDto implements Serializable {
     private Long id;
 
@@ -891,4 +892,73 @@ public class OrderDto implements Serializable {
 
     public OrderDto() {
     }
+
+    @Override
+    public String toString() {
+        return "OrderDto{" +
+                "id=" + id +
+                ", tradeAreaId=" + tradeAreaId +
+                ", tradeAreaName='" + tradeAreaName + '\'' +
+                ", addressIds='" + addressIds + '\'' +
+                ", saleMainId=" + saleMainId +
+                ", groupId=" + groupId +
+                ", goodsId=" + goodsId +
+                ", tradeNo='" + tradeNo + '\'' +
+                ", buyUserId=" + buyUserId +
+                ", buyUserName='" + buyUserName + '\'' +
+                ", buyUserType='" + buyUserType + '\'' +
+                ", enterpriseId=" + enterpriseId +
+                ", enterpriseName='" + enterpriseName + '\'' +
+                ", totalWeight=" + totalWeight +
+                ", totalPrice=" + totalPrice +
+                ", tradeTime=" + tradeTime +
+                ", tradeStatus=" + tradeStatus +
+                ", cancelPeople=" + cancelPeople +
+                ", payType=" + payType +
+                ", settleTime='" + settleTime + '\'' +
+                ", realPrice=" + realPrice +
+                ", shouldPrice=" + shouldPrice +
+                ", settleUserId=" + settleUserId +
+                ", recordUserId=" + recordUserId +
+                ", recordTime='" + recordTime + '\'' +
+                ", record='" + record + '\'' +
+                ", refundReason='" + refundReason + '\'' +
+                ", refundTime='" + refundTime + '\'' +
+                ", receiveName='" + receiveName + '\'' +
+                ", receivePhone='" + receivePhone + '\'' +
+                ", receiveAddress='" + receiveAddress + '\'' +
+                ", outTime='" + outTime + '\'' +
+                ", goodsNames='" + goodsNames + '\'' +
+                ", apply=" + apply +
+                ", applyTime=" + applyTime +
+                ", applyResult='" + applyResult + '\'' +
+                ", applyFailReason='" + applyFailReason + '\'' +
+                ", distribution='" + distribution + '\'' +
+                ", peopleConfirmStatus=" + peopleConfirmStatus +
+                ", peopleConfirmType=" + peopleConfirmType +
+                ", peopleConfirmTime=" + peopleConfirmTime +
+                ", applyConfirmStatus=" + applyConfirmStatus +
+                ", applyConfirmTime=" + applyConfirmTime +
+                ", enterpriseConfirm=" + enterpriseConfirm +
+                ", pick='" + pick + '\'' +
+                ", pickTime=" + pickTime +
+                ", shopId=" + shopId +
+                ", shopName='" + shopName + '\'' +
+                ", send=" + send +
+                ", sendTime='" + sendTime + '\'' +
+                ", collageOrdersId=" + collageOrdersId +
+                ", createTime=" + createTime +
+                ", createBy=" + createBy +
+                ", createName='" + createName + '\'' +
+                ", updateTime=" + updateTime +
+                ", updateBy=" + updateBy +
+                ", updateName='" + updateName + '\'' +
+                ", deleteStatus=" + deleteStatus +
+                ", finishStatus=" + finishStatus +
+                ", applyNo=" + applyNo +
+                ", resaleStatus=" + resaleStatus +
+                ", callCarStatus=" + callCarStatus +
+                ", goodsUnit='" + goodsUnit + '\'' +
+                '}';
+    }
 }

+ 144 - 0
sp-core/sp-base/src/main/java/com/pj/utils/IdWorker.java

@@ -0,0 +1,144 @@
+package com.pj.utils;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+
+//雪花算法代码实现
+public class IdWorker {
+    // 时间起始标记点,作为基准,一般取系统的最近时间(一旦确定不能变动)
+    private final static long twepoch = 1288834974657L;  //todo: 届时需修改时间起点
+    // 机器标识位数
+    private final static long workerIdBits = 5L;
+    // 数据中心标识位数
+    private final static long datacenterIdBits = 5L;
+    // 机器ID最大值
+    private final static long maxWorkerId = -1L ^ (-1L << workerIdBits);
+    // 数据中心ID最大值
+    private final static long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
+    // 毫秒内自增位
+    private final static long sequenceBits = 12L;
+    // 机器ID偏左移12位
+    private final static long workerIdShift = sequenceBits;
+    // 数据中心ID左移17位
+    private final static long datacenterIdShift = sequenceBits + workerIdBits;
+    // 时间毫秒左移22位
+    private final static long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
+
+    private final static long sequenceMask = -1L ^ (-1L << sequenceBits);
+    /* 上次生产id时间戳 */
+    private static long lastTimestamp = -1L;
+    // 0,并发控制
+    private long sequence = 0L;
+
+    private final long workerId;
+    // 数据标识id部分
+    private final long datacenterId;
+
+    public IdWorker(){
+        this.datacenterId = getDatacenterId(maxDatacenterId);
+        this.workerId = getMaxWorkerId(datacenterId, maxWorkerId);
+    }
+    /**
+     * @param workerId
+     *            工作机器ID
+     * @param datacenterId
+     *            序列号
+     */
+    public IdWorker(long workerId, long datacenterId) {
+        if (workerId > maxWorkerId || workerId < 0) {
+            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
+        }
+        if (datacenterId > maxDatacenterId || datacenterId < 0) {
+            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
+        }
+        this.workerId = workerId;
+        this.datacenterId = datacenterId;
+    }
+    /**
+     * 获取下一个ID
+     *
+     * @return
+     */
+    public synchronized long nextId() {
+        long timestamp = timeGen();
+        if (timestamp < lastTimestamp) {
+            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
+        }
+
+        if (lastTimestamp == timestamp) {
+            // 当前毫秒内,则+1
+            sequence = (sequence + 1) & sequenceMask;
+            if (sequence == 0) {
+                // 当前毫秒内计数满了,则等待下一秒
+                timestamp = tilNextMillis(lastTimestamp);
+            }
+        } else {
+            sequence = 0L;
+        }
+        lastTimestamp = timestamp;
+        // ID偏移组合生成最终的ID,并返回ID
+        long nextId = ((timestamp - twepoch) << timestampLeftShift)
+                | (datacenterId << datacenterIdShift)
+                | (workerId << workerIdShift) | sequence;
+
+        return nextId;
+    }
+
+    private long tilNextMillis(final long lastTimestamp) {
+        long timestamp = this.timeGen();
+        while (timestamp <= lastTimestamp) {
+            timestamp = this.timeGen();
+        }
+        return timestamp;
+    }
+
+    private long timeGen() {
+        return System.currentTimeMillis();
+    }
+
+    /**
+     * <p>
+     * 获取 maxWorkerId
+     * </p>
+     */
+    protected static long getMaxWorkerId(long datacenterId, long maxWorkerId) {
+        StringBuffer mpid = new StringBuffer();
+        mpid.append(datacenterId);
+        String name = ManagementFactory.getRuntimeMXBean().getName();
+        if (!name.isEmpty()) {
+            /*
+             * GET jvmPid
+             */
+            mpid.append(name.split("@")[0]);
+        }
+        /*
+         * MAC + PID 的 hashcode 获取16个低位
+         */
+        return (mpid.toString().hashCode() & 0xffff) % (maxWorkerId + 1);
+    }
+
+    /**
+     * <p>
+     * 数据标识id部分
+     * </p>
+     */
+    protected static long getDatacenterId(long maxDatacenterId) {
+        long id = 0L;
+        try {
+            InetAddress ip = InetAddress.getLocalHost();
+            NetworkInterface network = NetworkInterface.getByInetAddress(ip);
+            if (network == null) {
+                id = 1L;
+            } else {
+                byte[] mac = network.getHardwareAddress();
+                id = ((0x000000FF & (long) mac[mac.length - 1])
+                        | (0x0000FF00 & (((long) mac[mac.length - 2]) << 8))) >> 6;
+                id = id % (maxDatacenterId + 1);
+            }
+        } catch (Exception e) {
+            System.out.println(" getDatacenterId: " + e.getMessage());
+        }
+        return id;
+    }
+}

+ 4 - 2
sp-service/async-server/src/main/java/com/pj/feign/AmqpTemplateFeign.java

@@ -5,6 +5,7 @@ import com.pj.api.dto.OrderDto;
 import com.pj.api.dto.OrdersDto;
 import com.pj.dto.DataDto;
 import com.pj.rabbitmq.RabbitMQ;
+import com.pj.utils.IdWorker;
 import com.pj.utils.MD5;
 import com.pj.utils.ht.AESUtil;
 import org.springframework.amqp.core.AmqpTemplate;
@@ -34,8 +35,9 @@ public class AmqpTemplateFeign {
         //todo: 数据暂时定死
         DataDto dataDto = new DataDto();
         //msgId
-        String msgId = "msgid";
-        dataDto.setMsgId(msgId);
+        //使用雪花算法生成,永远不会重复
+        Long msgId = new IdWorker().nextId();
+        dataDto.setMsgId(msgId + "");
         //appId
         String appId = "shop01";
         dataDto.setAppId(appId);

+ 1 - 4
sp-service/async-server/src/main/java/com/pj/listen/BaseInfoListen.java

@@ -51,12 +51,9 @@ public class BaseInfoListen {
     @RabbitListener(queuesToDeclare = @Queue(RabbitMQ.TEST_SEND_QUEUE))
     public void sendMessage(DataDto dataDto){
         System.out.println("\n测试:接收信息\n");
-//        byte[] body = message.getBody();
-//        String s = new String(body);
-//        DataDto dataDto = JSONObject.parseObject(s, DataDto.class);
         String cbc = AESUtil.decryptCBC(dataDto.getData(), RabbitMQ.ACC_KEY);
         OrderDto orderDto = JSONObject.parseObject(cbc, OrderDto.class);
-        System.out.println("\n" + orderDto + "\n");
+        System.out.println("\n" + orderDto.toString() + "\n");
     }
 
 }

+ 1 - 12
sp-service/async-server/src/main/java/com/pj/rabbitmq/RabbitmqConfig.java

@@ -4,8 +4,6 @@ import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.BindingBuilder;
 import org.springframework.amqp.core.DirectExchange;
 import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -35,16 +33,7 @@ public class RabbitmqConfig {
 //
 //    @Value("${spring.rabbitmq.virtual-host}")
 //    private String virtualHost;
-//
-//    @Bean
-//    public ConnectionFactory connectionFactory() {
-//        System.out.println(host + port + username + password + virtualHost);
-//        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
-//        connectionFactory.setUsername(username);
-//        connectionFactory.setPassword(password);
-//        connectionFactory.setVirtualHost(virtualHost);
-//        return connectionFactory;
-//    }
+
 
 
     /**

+ 1 - 2
sp-service/async-server/src/main/java/com/pj/task/HtByteTask.java

@@ -47,13 +47,12 @@ public class HtByteTask {
      *      * 006=进境申报单
      *      * 007=进口申报单
      */
-    @Scheduled(cron = "*/7 * * * * ?")  // 测试阶段,每7秒扫描一次
+    @Scheduled(cron = "*/9 * * * * ?")  // 测试阶段,每9秒扫描一次
     private void htByteTask(){
         /*  测试mq */
 //        List<HtByte> htBytes = htByteMapper.selectList(new LambdaQueryWrapper<HtByte>().eq(HtByte::getFinishStatus, 2));
         OrderDto orderDtoById = levelOneServerInterface.getOrderDtoById(8833342005355478419L);
         asyncServerInterface.sendLevelOneOrderDto(orderDtoById, DataType.DATA_TYPE_ONE.getCode());
-        return;
         /*  测试mq */
 //        //扫描表内是否有待处理任务
 //        List<HtByte> byteList = htByteMapper.selectList(new LambdaQueryWrapper<HtByte>().eq(HtByte::getFinishStatus, FinishStatus.FINISH_STATUS_ZERO.getCode()));