资讯专栏INFORMATION COLUMN

dubbo源码解析(十二)远程通信——Telnet

li21 / 1866人阅读

摘要:远程通讯目标介绍的相关实现逻辑介绍中的包内的源码解析。源码分析一处理对应的命令命令该接口上命令处理器接口,是一个可扩展接口。关闭通道五该类实现了接口,封装了命令的实现。下一篇我会讲解基于实现远程通信部分。

远程通讯——Telnet
目标:介绍telnet的相关实现逻辑、介绍dubbo-remoting-api中的telnet包内的源码解析。
前言

从dubbo 2.0.5开始,dubbo开始支持通过 telnet 命令来进行服务治理。本文就是讲解一些公用的telnet命令的实现。下面来看一下telnet实现的类图:

可以看到,实现了TelnetHandler接口的有六个类,除了TelnetHandlerAdapter是以外,其他五个分别对应了clear、exit、help、log、status命令的实现,具体用来干嘛,请看官方文档的介绍。

源码分析 (一)TelnetHandler
@SPI
public interface TelnetHandler {

    /**
     * telnet.
     * 处理对应的telnet命令
     * @param channel
     * @param message telnet命令
     */
    String telnet(Channel channel, String message) throws RemotingException;

}

该接口上telnet命令处理器接口,是一个可扩展接口。它定义了一个方法,就是处理相关的telnet命令。

(二)TelnetHandlerAdapter

该类继承了ChannelHandlerAdapter,实现了TelnetHandler接口,是TelnetHandler的适配器类,负责在接收到HeaderExchangeHandler发来的telnet命令后分发给对应的TelnetHandler实现类去实现,并且返回命令结果。

public class TelnetHandlerAdapter extends ChannelHandlerAdapter implements TelnetHandler {

    /**
     * 扩展加载器
     */
    private final ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class);

    @Override
    public String telnet(Channel channel, String message) throws RemotingException {
        // 获得提示键配置,用于nc获取信息时不显示提示符
        String prompt = channel.getUrl().getParameterAndDecoded(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
        boolean noprompt = message.contains("--no-prompt");
        message = message.replace("--no-prompt", "");
        StringBuilder buf = new StringBuilder();
        // 删除头尾空白符的字符串
        message = message.trim();
        String command;
        // 获得命令
        if (message.length() > 0) {
            int i = message.indexOf(" ");
            if (i > 0) {
                // 获得命令
                command = message.substring(0, i).trim();
                // 获得参数
                message = message.substring(i + 1).trim();
            } else {
                command = message;
                message = "";
            }
        } else {
            command = "";
        }
        if (command.length() > 0) {
            // 如果有该命令的扩展实现类
            if (extensionLoader.hasExtension(command)) {
                try {
                    // 执行相应命令的实现类的telnet
                    String result = extensionLoader.getExtension(command).telnet(channel, message);
                    if (result == null) {
                        return null;
                    }
                    // 返回结果
                    buf.append(result);
                } catch (Throwable t) {
                    buf.append(t.getMessage());
                }
            } else {
                buf.append("Unsupported command: ");
                buf.append(command);
            }
        }
        if (buf.length() > 0) {
            buf.append("
");
        }
        // 添加 telnet 提示语
        if (prompt != null && prompt.length() > 0 && !noprompt) {
            buf.append(prompt);
        }
        return buf.toString();
    }

}

该类只实现了telnet方法,其中的逻辑还是比较清晰,就是根据对应的命令去让对应的实现类产生命令结果。

(三)ClearTelnetHandler

该类实现了TelnetHandler接口,封装了clear命令的实现。

@Activate
@Help(parameter = "[lines]", summary = "Clear screen.", detail = "Clear screen.")
public class ClearTelnetHandler implements TelnetHandler {

    @Override
    public String telnet(Channel channel, String message) {
        // 清除屏幕上的内容行数
        int lines = 100;
        if (message.length() > 0) {
            // 如果不是一个数字
            if (!StringUtils.isInteger(message)) {
                return "Illegal lines " + message + ", must be integer.";
            }
            lines = Integer.parseInt(message);
        }
        StringBuilder buf = new StringBuilder();
        // 一行一行清除
        for (int i = 0; i < lines; i++) {
            buf.append("
");
        }
        return buf.toString();
    }

}
(四)ExitTelnetHandler

该类实现了TelnetHandler接口,封装了exit命令的实现。

@Activate
@Help(parameter = "", summary = "Exit the telnet.", detail = "Exit the telnet.")
public class ExitTelnetHandler implements TelnetHandler {

    @Override
    public String telnet(Channel channel, String message) {
        // 关闭通道
        channel.close();
        return null;
    }

}
(五)HelpTelnetHandler

该类实现了TelnetHandler接口,封装了help命令的实现。

@Activate
@Help(parameter = "[command]", summary = "Show help.", detail = "Show help.")
public class HelpTelnetHandler implements TelnetHandler {

    /**
     * 扩展加载器
     */
    private final ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class);

    @Override
    public String telnet(Channel channel, String message) {
        // 如果需要查看某一个命令的帮助
        if (message.length() > 0) {
            if (!extensionLoader.hasExtension(message)) {
                return "No such command " + message;
            }
            // 获得对应的扩展实现类
            TelnetHandler handler = extensionLoader.getExtension(message);
            Help help = handler.getClass().getAnnotation(Help.class);
            StringBuilder buf = new StringBuilder();
            // 生成命令和帮助信息
            buf.append("Command:
    ");
            buf.append(message + " " + help.parameter().replace("
", " ").replace("
", " "));
            buf.append("
Summary:
    ");
            buf.append(help.summary().replace("
", " ").replace("
", " "));
            buf.append("
Detail:
    ");
            buf.append(help.detail().replace("
", "    
").replace("
", "    
"));
            return buf.toString();
            // 如果查看所有命令的帮助
        } else {
            List> table = new ArrayList>();
            // 获得所有命令的提示信息
            List handlers = extensionLoader.getActivateExtension(channel.getUrl(), "telnet");
            if (handlers != null && !handlers.isEmpty()) {
                for (TelnetHandler handler : handlers) {
                    Help help = handler.getClass().getAnnotation(Help.class);
                    List row = new ArrayList();
                    String parameter = " " + extensionLoader.getExtensionName(handler) + " " + (help != null ? help.parameter().replace("
", " ").replace("
", " ") : "");
                    row.add(parameter.length() > 50 ? parameter.substring(0, 50) + "..." : parameter);
                    String summary = help != null ? help.summary().replace("
", " ").replace("
", " ") : "";
                    row.add(summary.length() > 50 ? summary.substring(0, 50) + "..." : summary);
                    table.add(row);
                }
            }
            return "Please input "help [command]" show detail.
" + TelnetUtils.toList(table);
        }
    }

}

help分为了需要查看某一个命令的帮助还是查看全部命令的帮助。

(六)LogTelnetHandler

该类实现了TelnetHandler接口,封装了log命令的实现。

@Activate
@Help(parameter = "level", summary = "Change log level or show log ", detail = "Change log level or show log")
public class LogTelnetHandler implements TelnetHandler {

    public static final String SERVICE_KEY = "telnet.log";

    @Override
    public String telnet(Channel channel, String message) {
        long size = 0;
        File file = LoggerFactory.getFile();
        StringBuffer buf = new StringBuffer();
        if (message == null || message.trim().length() == 0) {
            buf.append("EXAMPLE: log error / log 100");
        } else {
            String str[] = message.split(" ");
            if (!StringUtils.isInteger(str[0])) {
                // 设置日志级别
                LoggerFactory.setLevel(Level.valueOf(message.toUpperCase()));
            } else {
                // 获得日志长度
                int SHOW_LOG_LENGTH = Integer.parseInt(str[0]);

                if (file != null && file.exists()) {
                    try {
                        FileInputStream fis = new FileInputStream(file);
                        try {
                            FileChannel filechannel = fis.getChannel();
                            try {
                                size = filechannel.size();
                                ByteBuffer bb;
                                if (size <= SHOW_LOG_LENGTH) {
                                    // 分配缓冲区
                                    bb = ByteBuffer.allocate((int) size);
                                    // 读日志数据
                                    filechannel.read(bb, 0);
                                } else {
                                    int pos = (int) (size - SHOW_LOG_LENGTH);
                                    // 分配缓冲区
                                    bb = ByteBuffer.allocate(SHOW_LOG_LENGTH);
                                    // 读取日志数据
                                    filechannel.read(bb, pos);
                                }
                                bb.flip();
                                String content = new String(bb.array()).replace("<", "<")
                                        .replace(">", ">").replace("
", "

"); buf.append(" content:" + content); buf.append(" modified:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") .format(new Date(file.lastModified())))); buf.append(" size:" + size + " "); } finally { filechannel.close(); } } finally { fis.close(); } } catch (Exception e) { buf.append(e.getMessage()); } } else { size = 0; buf.append(" MESSAGE: log file not exists or log appender is console ."); } } } buf.append(" CURRENT LOG LEVEL:" + LoggerFactory.getLevel()) .append(" CURRENT LOG APPENDER:" + (file == null ? "console" : file.getAbsolutePath())); return buf.toString(); } }

log命令实现原理就是从日志文件中把日志信息读取出来。

(七)StatusTelnetHandler

该类实现了TelnetHandler接口,封装了status命令的实现。

@Activate
@Help(parameter = "[-l]", summary = "Show status.", detail = "Show status.")
public class StatusTelnetHandler implements TelnetHandler {

    private final ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(StatusChecker.class);

    @Override
    public String telnet(Channel channel, String message) {
        // 显示状态列表
        if (message.equals("-l")) {
            List checkers = extensionLoader.getActivateExtension(channel.getUrl(), "status");
            String[] header = new String[]{"resource", "status", "message"};
            List> table = new ArrayList>();
            Map statuses = new HashMap();
            if (checkers != null && !checkers.isEmpty()) {
                // 遍历各个资源的状态,如果一个当全部 OK 时则显示 OK,只要有一个 ERROR 则显示 ERROR,只要有一个 WARN 则显示 WARN
                for (StatusChecker checker : checkers) {
                    String name = extensionLoader.getExtensionName(checker);
                    Status stat;
                    try {
                        stat = checker.check();
                    } catch (Throwable t) {
                        stat = new Status(Status.Level.ERROR, t.getMessage());
                    }
                    statuses.put(name, stat);
                    if (stat.getLevel() != null && stat.getLevel() != Status.Level.UNKNOWN) {
                        List row = new ArrayList();
                        row.add(name);
                        row.add(String.valueOf(stat.getLevel()));
                        row.add(stat.getMessage() == null ? "" : stat.getMessage());
                        table.add(row);
                    }
                }
            }
            Status stat = StatusUtils.getSummaryStatus(statuses);
            List row = new ArrayList();
            row.add("summary");
            row.add(String.valueOf(stat.getLevel()));
            row.add(stat.getMessage());
            table.add(row);
            return TelnetUtils.toTable(header, table);
        } else if (message.length() > 0) {
            return "Unsupported parameter " + message + " for status.";
        }
        String status = channel.getUrl().getParameter("status");
        Map statuses = new HashMap();
        if (status != null && status.length() > 0) {
            String[] ss = Constants.COMMA_SPLIT_PATTERN.split(status);
            for (String s : ss) {
                StatusChecker handler = extensionLoader.getExtension(s);
                Status stat;
                try {
                    stat = handler.check();
                } catch (Throwable t) {
                    stat = new Status(Status.Level.ERROR, t.getMessage());
                }
                statuses.put(s, stat);
            }
        }
        Status stat = StatusUtils.getSummaryStatus(statuses);
        return String.valueOf(stat.getLevel());
    }

}
(八)Help

该接口是帮助文档接口

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Help {

    String parameter() default "";

    String summary();

    String detail() default "";

}

可以看上在每个命令的实现类上都加上了@Help注解,为了添加一些帮助文案。

(九)TelnetUtils

该类是Telnet命令的工具类,其中逻辑我就不介绍了。

(十)TelnetCodec

该类继承了TransportCodec,是telnet的编解码类。

1.属性
private static final Logger logger = LoggerFactory.getLogger(TelnetCodec.class);

/**
 * 历史命令列表
 */
private static final String HISTORY_LIST_KEY = "telnet.history.list";

/**
 * 历史命令位置,就是用上下键来找历史命令
 */
private static final String HISTORY_INDEX_KEY = "telnet.history.index";

/**
 * 向上键
 */
private static final byte[] UP = new byte[]{27, 91, 65};

/**
 * 向下键
 */
private static final byte[] DOWN = new byte[]{27, 91, 66};

/**
 * 回车
 */
private static final List ENTER = Arrays.asList(new Object[]{new byte[]{"
", "
"} /* Windows Enter */, new byte[]{"
"} /* Linux Enter */});

/**
 * 退出
 */
private static final List EXIT = Arrays.asList(new Object[]{new byte[]{3} /* Windows Ctrl+C */, new byte[]{-1, -12, -1, -3, 6} /* Linux Ctrl+C */, new byte[]{-1, -19, -1, -3, 6} /* Linux Pause */});
2.getCharset
private static Charset getCharset(Channel channel) {
    if (channel != null) {
        // 获得属性设置
        Object attribute = channel.getAttribute(Constants.CHARSET_KEY);
        // 返回指定字符集的charset对象。
        if (attribute instanceof String) {
            try {
                return Charset.forName((String) attribute);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        } else if (attribute instanceof Charset) {
            return (Charset) attribute;
        }
        URL url = channel.getUrl();
        if (url != null) {
            String parameter = url.getParameter(Constants.CHARSET_KEY);
            if (parameter != null && parameter.length() > 0) {
                try {
                    return Charset.forName(parameter);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
    }
    // 默认的编码是utf-8
    try {
        return Charset.forName(Constants.DEFAULT_CHARSET);
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    return Charset.defaultCharset();
}

该方法是获得通道的字符集,根据url中编码来获得字符集,默认是utf-8。

3.encode
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
    // 如果需要编码的是 telnet 命令结果
    if (message instanceof String) {
        //如果为客户端侧的通道message直接返回
        if (isClientSide(channel)) {
            message = message + "
";
        }
        // 获得字节数组
        byte[] msgData = ((String) message).getBytes(getCharset(channel).name());
        // 写入缓冲区
        buffer.writeBytes(msgData);
    } else {
        super.encode(channel, buffer, message);
    }
}

该方法是编码方法。

4.decode
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    // 获得缓冲区可读的字节
    int readable = buffer.readableBytes();
    byte[] message = new byte[readable];
    // 从缓冲区读数据
    buffer.readBytes(message);
    return decode(channel, buffer, readable, message);
}

@SuppressWarnings("unchecked")
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException {
    // 如果是客户端侧,直接返回结果
    if (isClientSide(channel)) {
        return toString(message, getCharset(channel));
    }
    // 检验消息长度
    checkPayload(channel, readable);
    if (message == null || message.length == 0) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // 如果回退
    if (message[message.length - 1] == "") { // Windows backspace echo
        try {
            boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char
            channel.send(new String(doublechar ? new byte[]{32, 32, 8, 8} : new byte[]{32, 8}, getCharset(channel).name()));
        } catch (RemotingException e) {
            throw new IOException(StringUtils.toString(e));
        }
        return DecodeResult.NEED_MORE_INPUT;
    }

    // 如果命令是退出
    for (Object command : EXIT) {
        if (isEquals(message, (byte[]) command)) {
            if (logger.isInfoEnabled()) {
                logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command)));
            }
            // 关闭通道
            channel.close();
            return null;
        }
    }
    boolean up = endsWith(message, UP);
    boolean down = endsWith(message, DOWN);
    // 如果用上下键找历史命令
    if (up || down) {
        LinkedList history = (LinkedList) channel.getAttribute(HISTORY_LIST_KEY);
        if (history == null || history.isEmpty()) {
            return DecodeResult.NEED_MORE_INPUT;
        }
        Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
        Integer old = index;
        if (index == null) {
            index = history.size() - 1;
        } else {
            // 向上
            if (up) {
                index = index - 1;
                if (index < 0) {
                    index = history.size() - 1;
                }
            } else {
                // 向下
                index = index + 1;
                if (index > history.size() - 1) {
                    index = 0;
                }
            }
        }
        // 获得历史命令,并发送给客户端
        if (old == null || !old.equals(index)) {
            // 设置当前命令位置
            channel.setAttribute(HISTORY_INDEX_KEY, index);
            // 获得历史命令
            String value = history.get(index);
            // 清除客户端原有命令,用查到的历史命令替代
            if (old != null && old >= 0 && old < history.size()) {
                String ov = history.get(old);
                StringBuilder buf = new StringBuilder();
                for (int i = 0; i < ov.length(); i++) {
                    buf.append("");
                }
                for (int i = 0; i < ov.length(); i++) {
                    buf.append(" ");
                }
                for (int i = 0; i < ov.length(); i++) {
                    buf.append("");
                }
                value = buf.toString() + value;
            }
            try {
                channel.send(value);
            } catch (RemotingException e) {
                throw new IOException(StringUtils.toString(e));
            }
        }
        // 返回,需要更多指令
        return DecodeResult.NEED_MORE_INPUT;
    }
    // 关闭命令
    for (Object command : EXIT) {
        if (isEquals(message, (byte[]) command)) {
            if (logger.isInfoEnabled()) {
                logger.info(new Exception("Close channel " + channel + " on exit command " + command));
            }
            channel.close();
            return null;
        }
    }
    byte[] enter = null;
    // 如果命令是回车
    for (Object command : ENTER) {
        if (endsWith(message, (byte[]) command)) {
            enter = (byte[]) command;
            break;
        }
    }
    if (enter == null) {
        return DecodeResult.NEED_MORE_INPUT;
    }
    LinkedList history = (LinkedList) channel.getAttribute(HISTORY_LIST_KEY);
    Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
    // 移除历史命令
    channel.removeAttribute(HISTORY_INDEX_KEY);
    // 将历史命令拼接
    if (history != null && !history.isEmpty() && index != null && index >= 0 && index < history.size()) {
        String value = history.get(index);
        if (value != null) {
            byte[] b1 = value.getBytes();
            byte[] b2 = new byte[b1.length + message.length];
            System.arraycopy(b1, 0, b2, 0, b1.length);
            System.arraycopy(message, 0, b2, b1.length, message.length);
            message = b2;
        }
    }
    // 将命令字节数组,转成具体的一条命令
    String result = toString(message, getCharset(channel));
    if (result.trim().length() > 0) {
        if (history == null) {
            history = new LinkedList();
            channel.setAttribute(HISTORY_LIST_KEY, history);
        }
        if (history.isEmpty()) {
            history.addLast(result);
        } else if (!result.equals(history.getLast())) {
            history.remove(result);
            // 添加当前命令到历史尾部
            history.addLast(result);
            // 超过上限,移除历史的头部
            if (history.size() > 10) {
                history.removeFirst();
            }
        }
    }
    return result;
}

该方法是编码。

后记
该部分相关的源码解析地址:https://github.com/CrazyHZM/i...

该文章讲解了telnet的相关实现逻辑,本文有兴趣的朋友可以看看。下一篇我会讲解基于grizzly实现远程通信部分。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72810.html

相关文章

  • dubbo源码解析(四十七)服务端处理请求过程

    摘要:而存在的意义就是保证请求或响应对象可在线程池中被解码,解码完成后,就会分发到的。 2.7大揭秘——服务端处理请求过程 目标:从源码的角度分析服务端接收到请求后的一系列操作,最终把客户端需要的值返回。 前言 上一篇讲到了消费端发送请求的过程,该篇就要将服务端处理请求的过程。也就是当服务端收到请求数据包后的一系列处理以及如何返回最终结果。我们也知道消费端在发送请求的时候已经做了编码,所以我...

    yzzz 评论0 收藏0
  • dubbo源码解析(八)远程通信——开篇

    摘要:而编码器是讲应用程序的数据转化为网络格式,解码器则是讲网络格式转化为应用程序,同时具备这两种功能的单一组件就叫编解码器。在中是老的编解码器接口,而是新的编解码器接口,并且已经用把适配成了。 远程通讯——开篇 目标:介绍之后解读远程通讯模块的内容如何编排、介绍dubbo-remoting-api中的包结构设计以及最外层的的源码解析。 前言 服务治理框架中可以大致分为服务通信和服务管理两个...

    Faremax 评论0 收藏0
  • dubbo源码解析(四十六)消费端发送请求过程

    摘要:可以参考源码解析二十四远程调用协议的八。十六的该类也是用了适配器模式,该类主要的作用就是增加了心跳功能,可以参考源码解析十远程通信层的四。二十的可以参考源码解析十七远程通信的一。 2.7大揭秘——消费端发送请求过程 目标:从源码的角度分析一个服务方法调用经历怎么样的磨难以后到达服务端。 前言 前一篇文章讲到的是引用服务的过程,引用服务无非就是创建出一个代理。供消费者调用服务的相关方法。...

    fish 评论0 收藏0
  • dubbo源码解析(四十八)异步化改造

    摘要:大揭秘异步化改造目标从源码的角度分析的新特性中对于异步化的改造原理。看源码解析四十六消费端发送请求过程讲到的十四的,在以前的逻辑会直接在方法中根据配置区分同步异步单向调用。改为关于可以参考源码解析十远程通信层的六。 2.7大揭秘——异步化改造 目标:从源码的角度分析2.7的新特性中对于异步化的改造原理。 前言 dubbo中提供了很多类型的协议,关于协议的系列可以查看下面的文章: du...

    lijinke666 评论0 收藏0
  • dubbo源码解析(十)远程通信——Exchange层

    摘要:和断开,处理措施不一样,会分别做出重连和关闭通道的操作。取消定时器取消大量已排队任务,用于回收空间该方法是停止现有心跳,也就是停止定时器,释放空间。做到异步处理返回结果时能给准确的返回给对应的请求。 远程通讯——Exchange层 目标:介绍Exchange层的相关设计和逻辑、介绍dubbo-remoting-api中的exchange包内的源码解析。 前言 上一篇文章我讲的是dubb...

    cppprimer 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<