连接设备执行指令是业务系统的常见需求之一,正如普通的JavaEE应用在高并发时会出现问题一样,设备连接同样存在并发问题,并发高时设备会出现登录失败或者执行指令慢等情况,此处的并发当然是指单个设备的连接,但在特殊的多个设备使用同一堡垒机跳转登录的场景中这多个设备也会相互影响存在并发问题。

对业务系统来说设备的并发问题可能体现为一台设备大部分时间都能正常登录但偶尔却会登录失败,大部分开发对于这种偶发性的问题都感到不知所措,本文介绍此问题的排查思路和解决方法

1. 排查思路

并发问题的排查需要经过反复的测试,如下是一份测试代码的示例,代码中的17个设备通过同一个堡垒机登录因此可能存在并发问题,通过参数指定信号量的值来控制登录的并发数,不断修改参数观察设备登录情况和指令下发情况。实际的业务系统可以参考此代码编写适用与自身业务场景的测试代码

参考 线上环境调试方法 在Linux环境下执行下面的代码
public class DeviceConcurrentTest {

    private static final Logger logger = LoggerFactory.getLogger(DeviceConcurrentTest.class);

    private static final ConnectionFactory connectionFactory = new TelnetConnectionFactory();

    private final static String[] DEVICES = {"XADS14", "XADS15", "NXADS3", "NXADS4", "NXADS1", "NXADS10", "NXADS11", "NXADS12",
                                        "NXADS2", "NXADS5", "NXADS6", "NXADS7", "NXADS8", "NXADS9", "NXADS13", "XADS3", "XADS4"};

    public static void main(String[] args) {
        int permits = DEVICES.length;
        if (args.length > 0) {
            permits = Integer.parseInt(args[0]);
        }
        logger.info("permits: {}", permits);
        Semaphore semaphore = new Semaphore(permits);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (String device : DEVICES) {
            String regCommand = "REG NE:Name=\"" + device + "\";";         (2)
            ConnectionConfigurer connectionConfigurer = ConnectionConfigurerBuilder.telnet()
             .host("13.46.10.211")           (1)
             .postConnect(CommandConfigurerBuilder.newCommandConfigurer(regCommand)
                    .successFlags("---    END")
                    .build())
             .build();
            executorService.submit(new CommandRunnable(device, connectionConfigurer, semaphore));
        }
        executorService.shutdown();
    }

    static class CommandRunnable implements Runnable {

        private final String device;

        private final ConnectionConfigurer connectionConfigurer;

        private final Semaphore semaphore;

        CommandRunnable(String device, ConnectionConfigurer connectionConfigurer, Semaphore semaphore) {
            this.device = device;
            this.connectionConfigurer = connectionConfigurer;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String command = "LST CNACLD: QR=LOCAL;";
            Connection connection = connectionFactory.createConnection(connectionConfigurer);
            logger.info("device: {}, send", device);
            LocalDateTime start = LocalDateTime.now();
            CommandResult commandResult = connection.sendCommand(CommandConfigurerBuilder.newCommandConfigurer(command)
                    .successFlags("个报告")
                    .timeoutMilliSeconds(60000)
                    .build());
            LocalDateTime end = LocalDateTime.now();
            logger.info("device: {}, success: {}, length: {}, time: {}", device, commandResult.isSuccess(), commandResult.getResult().length(), Duration.between(start, end).getSeconds());
            connection.close();
            semaphore.release();
        }
    }
}
  1. 所有设备使用同一个堡垒机连接

  2. 具体设备的登录命令

2. 解决方法

通过反复测试找到并发极限之后就可以修改代码保证不超过此极限,下面是一个实现示例,使用装饰模式定义了一个新的 ConnectionFactoryConnection ,创建连接前先获取信号量,关闭连接后释放信号量

SemaphoreConnectionFactory.java
public class SemaphoreConnectionFactory implements ConnectionFactory {

    private final ConcurrentHashMap<String, Semaphore> semaphoreMap = new ConcurrentHashMap<>();

    private final ConnectionFactory connectionFactory;

    public SemaphoreConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @Override
    public boolean support(ConnectionConfigurer connectionConfigurer) {
        return connectionFactory.support(connectionConfigurer);
    }

    @Override
    public Connection createConnection(ConnectionConfigurer connectionConfigurer) throws GeneralConnectionException {
        String limit = connectionConfigurer.getExtAttrs().get("limit");
        if (limit != null) {
            String key = connectionConfigurer.getExtAttrs().get("key");
            if (key == null) {
                key = connectionConfigurer.getHost();
            }
            Semaphore semaphore = semaphoreMap.computeIfAbsent(key, k -> new Semaphore(Integer.parseInt(limit)));
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new SemaphoreConnection(connectionFactory.createConnection(connectionConfigurer), semaphore);
        } else {
            return connectionFactory.createConnection(connectionConfigurer);
        }
    }
}
Semaphore.java
public class SemaphoreConnection implements Connection {

    private final Connection delegate;

    private final Semaphore semaphore;

    public SemaphoreConnection(Connection delegate, Semaphore semaphore) {
        this.delegate = delegate;
        this.semaphore = semaphore;
    }

    @Override
    public String getConnectionId() {
        return delegate.getConnectionId();
    }

    @Override
    public long getLastAccessTime() {
        return delegate.getLastAccessTime();
    }

    @Override
    public ConnectionConfigurer getConnectionConfigurer() {
        return delegate.getConnectionConfigurer();
    }

    @Override
    public void sendString(String command) {
        delegate.sendString(command);
    }

    @Override
    public CommandResult sendStringWithResult(String command) {
        return delegate.sendStringWithResult(command);
    }

    @Override
    public CommandResult sendStringWithResult(String command, String waitStr, int maxMilliSeconds) {
        return delegate.sendStringWithResult(command, waitStr, maxMilliSeconds);
    }

    @Override
    public CommandResult sendCommand(CommandConfigurer commandConfigurer) {
        return delegate.sendCommand(commandConfigurer);
    }

    @Override
    public boolean waitForString(String string, int maxMilliSeconds) {
        return delegate.waitForString(string, maxMilliSeconds);
    }

    @Override
    public String getLastScreen() {
        return delegate.getLastScreen();
    }

    @Override
    public void cleanScreen() {
        delegate.cleanScreen();
    }

    @Override
    public String getFullScreen() {
        return delegate.getFullScreen();
    }

    @Override
    public boolean isConnected() {
        return delegate.isConnected();
    }

    @Override
    public void close() {
        semaphore.release();
        delegate.close();
    }
}