摘要:缓冲输入流从被称为缓冲区的存储器区域读出数据仅当缓冲区是空时,本地输入才被调用。同样,缓冲输出流,将数据写入到缓存区,只有当缓冲区已满才调用本机输出。
:https://segmentfault.com/blog...
1.前言前阵子休息天日常在寻找项目里不好的代码,看到了这样的一段代码:
private Result sshSameExec(Session session, String cmd) { if (log.isDebugEnabled()) { log.debug("shell command: {}", cmd); } UserInfo ui = getUserInfo(); session.setUserInfo(ui); int exitStatus = 0; StringBuilder builder = new StringBuilder(); ChannelExec channel; InputStream in; InputStream err; try { session.connect(connectTimeout); channel = (ChannelExec) session.openChannel("exec"); channel.setCommand(cmd); in = channel.getInputStream(); err = channel.getErrStream(); channel.connect(); } catch (Exception e) { throw new CloudRuntimeException(e); } try { long lastRead = Long.MAX_VALUE; byte[] tmp = new byte[1024]; while (true) { while (in.available() > 0 || err.available() > 0) { int i = 0; if (in.available() > 0) { i = in.read(tmp, 0, 1024); } else if (err.available() > 0) { i = err.read(tmp, 0, 1024); } if (i < 0) { break; } lastRead = System.currentTimeMillis(); builder.append(new String(tmp, 0, i)); } if (channel.isClosed()) { if (in.available() > 0) { continue; } exitStatus = channel.getExitStatus(); break; } if (System.currentTimeMillis() - lastRead > exeTimeout) { break; } } } catch (IOException e) { throw new CloudRuntimeException(e); } finally { channel.disconnect(); session.disconnect(); } if (0 != exitStatus) { return Result.createByError(ErrorData.builder() .errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode()) .detail(builder.toString()) .title(ResultCode.EXECUTE_SSH_FAIL.toString()) .build()); } else { return Result.createBySuccess(builder.toString()); } }
简单解释一下这段代码——即通过ssh到一台机器上,然后执行一些命令.对命令输出的东西,开了一个循环,每一次读一定的位置,然后以字节流的形式读回来.
这段代码有点丑,于是我闻到了学习的味道.
首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream.其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来.
在改良之前,我们先来回顾一下JavaIO的接口定义.
2.JavaIO 接口知识回顾 2.1 低级抽象接口:InputStream 和 OutputStream这里有同学可能问了,为啥叫它低抽象接口呢?因为它离底层太近了,计算机本来就是处理二进制的,而这两个接口正是用来处理二进制数据流的.
先简单看一眼这两个接口:
InputStream
** * This abstract class is the superclass of all classes representing * an input stream of bytes. * *Applications that need to define a subclass of
InputStream
* must always provide a method that returns the next byte of input. * * @author Arthur van Hoff * @see java.io.BufferedInputStream * @see java.io.ByteArrayInputStream * @see java.io.DataInputStream * @see java.io.FilterInputStream * @see java.io.InputStream#read() * @see java.io.OutputStream * @see java.io.PushbackInputStream * @since JDK1.0 */ public abstract class InputStream implements Closeable {.....}
OutputStream
/** * This abstract class is the superclass of all classes representing * an output stream of bytes. An output stream accepts output bytes * and sends them to some sink. ** Applications that need to define a subclass of *
OutputStream
must always provide at least a method * that writes one byte of output. * * @author Arthur van Hoff * @see java.io.BufferedOutputStream * @see java.io.ByteArrayOutputStream * @see java.io.DataOutputStream * @see java.io.FilterOutputStream * @see java.io.InputStream * @see java.io.OutputStream#write(int) * @since JDK1.0 */ public abstract class OutputStream implements Closeable, Flushable {...}
我们可以发现,它们都实现了Closeable的接口.因此大家在使用这些原生类时,要注意在结束时调用Close方法哦.
这两个接口的常用实现类有:
- FileInputStream和FileOutputStream
DataInputStream和DataOutputStream
ObjectInputStream和ObjectOutputStream
2.2 高级抽象接口——Writer和Reader为啥说它是高级抽象接口呢?我们先来看看它们的注释:
Writer
/** * Abstract class for writing to character streams. The only methods that a * subclass must implement are write(char[], int, int), flush(), and close(). * Most subclasses, however, will override some of the methods defined here in * order to provide higher efficiency, additional functionality, or both. * * @see Writer * @see BufferedWriter * @see CharArrayWriter * @see FilterWriter * @see OutputStreamWriter * @see FileWriter * @see PipedWriter * @see PrintWriter * @see StringWriter * @see Reader * * @author Mark Reinhold * @since JDK1.1 */ public abstract class Writer implements Appendable, Closeable, Flushable {
Reader
/** * Abstract class for reading character streams. The only methods that a * subclass must implement are read(char[], int, int) and close(). Most * subclasses, however, will override some of the methods defined here in order * to provide higher efficiency, additional functionality, or both. * * * @see BufferedReader * @see LineNumberReader * @see CharArrayReader * @see InputStreamReader * @see FileReader * @see FilterReader * @see PushbackReader * @see PipedReader * @see StringReader * @see Writer * * @author Mark Reinhold * @since JDK1.1 */ public abstract class Reader implements Readable, Closeable {
我们可以看到,这个抽象类是用来面向character的,也就是字符.字符的抽象等级必然比字节高,因为字符靠近上层,即人类.
2.3 优化输入和输出——Buffered如果我们直接使用上述实现类去打开一个文件(如FileWriter 、FileReader 、FileInputStream 、FileOutputStream ),对其对象调用read、write、readLine等,每个请求都是由基础OS直接处理的,这会使一个程序效率低得多——因为它们都会引发磁盘访问or网络请求等.
为了减少这种开销,Java 平台实现缓冲 I/O 流。缓冲输入流从被称为缓冲区(buffer)的存储器区域读出数据;仅当缓冲区是空时,本地输入 API 才被调用。同样,缓冲输出流,将数据写入到缓存区,只有当缓冲区已满才调用本机输出 API。
用于包装非缓存流的缓冲流类有4个:BufferedInputStream和BufferedOutputStream·用于创建字节缓冲字节流, BufferedReader和BufferedWriter`用于创建字符缓冲字节流.
3. 着手优化之前,我们提到了这段代码写得搓的地方:
首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream.
其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来.
故此,我们可以考虑对每个Stream都进行包装,支持用线程去消费,其次我们可以用高级抽象分接口去适配Byte,然后去装饰成Buffer.
接下来,我们来看一段ZStack里的工具类ShellUtils,为了节省篇幅,我们仅仅截出它在IDE里的
概览:
run方法的核心:
public ShellResult run() { StopWatch watch = new StopWatch(); watch.start(); try { if (withSudo) { command = String.format("sudo %s", command); } ProcessBuilder pb = new ProcessBuilder(Arrays.asList("/bin/bash", "-c", command)); if (baseDir == null) { baseDir = System.getProperty("user.home"); } pb.directory(new File(baseDir)); process = pb.start(); if (!suppressTraceLog && logger.isTraceEnabled()) { logger.debug(String.format("exec shell command[%s]", command)); } Writer stdout; int stdoutLog = stdoutLogStrategy(); if (stdoutLog == LOG_TO_FILE) { stdout = new BufferedWriter(new FileWriter(stdoutFile)); } else if (stdoutLog == LOG_TO_SCREEN) { stdout = new BufferedWriter(new OutputStreamWriter(System.out)); } else { stdout = new StringWriter(); } Writer stderr; int stderrLog = stderrLogStrategy(); if (stderrLog == LOG_TO_FILE) { stderr = new BufferedWriter(new FileWriter(stderrFile)); } else if (stderrLog == LOG_TO_SCREEN) { stderr = new BufferedWriter(new OutputStreamWriter(System.err)); } else { stderr = new StringWriter(); } StreamConsumer stdoutConsumer = new StreamConsumer(process.getInputStream(), new PrintWriter(stdout, true), stdoutLog != LOG_TO_FILE); StreamConsumer stderrConsumer = new StreamConsumer(process.getErrorStream(), new PrintWriter(stderr, true), stderrLog != LOG_TO_FILE); stderrConsumer.start(); stdoutConsumer.start(); process.waitFor(); stderrConsumer.join(TimeUnit.SECONDS.toMillis(30)); stdoutConsumer.join(TimeUnit.SECONDS.toMillis(30)); ShellResult ret = new ShellResult(); ret.setCommand(command); ret.setRetCode(process.exitValue()); if (stderrLog == LOG_TO_STRING) { ret.setStderr(stderr.toString()); } else if (stderrLog == LOG_TO_FILE) { stderr.close(); } if (stdoutLog == LOG_TO_STRING) { ret.setStdout(stdout.toString()); } else if (stdoutLog == LOG_TO_FILE) { stdout.close(); } return ret; } catch (Exception e) { StringBuilder sb = new StringBuilder(); sb.append("Shell command failed: "); sb.append(command); throw new ShellException(sb.toString(), e); } finally { if (process != null) { process.destroy(); } watch.stop(); if (!suppressTraceLog && logger.isTraceEnabled()) { logger.trace(String.format("shell command[%s] costs %sms to finish", command, watch.getTime())); } } } }
我们可以看到StreamConsumer这个类,我们来看一下它的代码:
private static class StreamConsumer extends Thread { final InputStream in; final PrintWriter out; final boolean flush; StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) { this.in = in; this.out = out; flush = flushEveryWrite; } @Override public void run() { BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(in)); String line; while ( (line = br.readLine()) != null) { out.println(line); if (flush) { out.flush(); } } } catch (Exception e) { logger.warn(e.getMessage(), e); } finally { try { if (br != null) { br.close(); } } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
这段代码已经达到了我们的理想状态:线程消费,高级抽象.
3.1 使用Kotlin 3.1.1 Kotlin IO闲话不多说,先贴代码为敬:
import java.io.InputStream import java.io.InputStreamReader class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable { override fun run() { InputStreamReader(inputStream).buffered().use { it.lines().forEach { r -> result.append(r) } } } }
还是一样熟悉的配方,我们逐行来解读:
定义一个类,并且要求构造函数必须传入InputStream和一个StringBuilder.且实现了Runnable接口,这意味着它可以被线程消费.
覆写run方法.我们可以看到InputStream被适配成了InputStreamReader,这意味着它可以输出字符流了,然后我们使用了Kotlin的接口将其装饰成了Buffer.
读每一行buffer,并appned到result这个StringBuilder里去.
读完就可以告辞了,close.(use会将其关闭)
3.1.2 Kotlin Coroutine先看一下上面的图,我们都知道内核态线程是由OS调度的,但当一个线程拿到时间片时,却调到了阻塞IO,那么只能等在那边,浪费时间.
而协程则可以解决这个问题,当一个Jobhang住的时候,可以去做别的事情,绕开阻塞.更好的利用时间片.
最后,我们来看一下成品代码:
override fun sshExecWithCoroutine(session: Session, cmd: String): SimpleResult{ val ui = InnerUserInfo() session.userInfo = ui val exitStatus: Int var channel = ChannelExec() val inputBuilder = StringBuilder() val errorBuilder = StringBuilder() try { session.connect(connectTimeout) channel = session.openChannel("exec") as ChannelExec channel.setCommand(cmd) channel.connect() val inputStream = StreamGobbler(channel.inputStream, inputBuilder) val errStream = StreamGobbler(channel.errStream, errorBuilder) val customJob = GlobalScope.launch { customStream(inputStream, errStream) } while (!customJob.isCompleted) { // wait job be done } exitStatus = channel.exitStatus } catch (e: IOException) { throw java.lang.RuntimeException(e) } finally { if (channel.isConnected) { channel.disconnect() } if (session.isConnected) { session.disconnect() } } return if (0 != exitStatus) { return SimpleResult.createByError(ErrorData.Builder() .errorCode(ResultCode.EXECUTE_SSH_FAIL.value) .detail(errorBuilder.toString()) .title(ResultCode.EXECUTE_SSH_FAIL.toString()) .build()) } else { SimpleResult.createBySuccess(inputBuilder.toString()) } } private suspend fun customStream(inputStream: StreamGobbler, errorStream: StreamGobbler) { val inputDeferred = GlobalScope.async { inputStream.run() } val errorDeferred = GlobalScope.async { errorStream.run() } inputDeferred.join() errorDeferred.join() }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/75095.html
摘要:使用字节流写入文件,如果没有关闭字节流操作,文件依然存在内容,说明字节流是操作文件本身的。字节流比字符流更好,使用更广泛。 Java知识点总结(JavaIO-字符流) @(Java知识点总结)[Java, JavaIO] [toc] 在程序中一个字符等于两个字节,那么 Java 提供了 Reader 和 Writer 两个专门操作字符流的类。 字符输出流:Writer 类定义如下: p...
摘要:知识点总结异常知识点总结异常为什么需要异常机制不是所有的问题都能在编译时被发现,有些问题在程序运行时才会暴露出来异常机制是面向对象的处理程序在运行时发生的状况的手段使用异常机制不会打乱原有业务逻辑的用块把可能出异常的代码保护起来用一个 Java知识点总结(JavaIO-异常) @(Java知识点总结)[Java, Java异常] [toc] 为什么需要异常机制 不是所有的问题都能在编译...
摘要:知识点总结内存操作流知识点总结前面所讲的程序中输入输出都是从文件中来,当然也可以将输出的位置设置在内存上。将内容写入到内存中。 Java知识点总结(JavaIO-内存操作流) @(Java知识点总结)[Java, JavaIO] [toc] showImg(https://segmentfault.com/img/bV82tm?w=753&h=275); 前面所讲的程序中输入、输出都是...
摘要:下面我们使用字节输入输出流来说明这个问题输入流一般是由对象如建立的,当新建一个时,对象建立了一个包含有数据的管道其实就是我们所说的这个流并将对象存储的数据输入到管道中,因此管道里的数据流就是对象内的数据。 流的原理: showImg(/img/bVqa89); 一连串有顺序的数据系列可以看成是一个流。 输入输出流: 数据从IO输入到程序的流是输入流,数据从程序输出到IO的流是输出流。 ...
摘要:知识点总结类知识点总结后提供的输入数据类,此类位于包中,不仅可以完成输入数据的操作,还可以方便地对输入数据进行验证。 Java知识点总结(JavaIO- Scanner类 ) @(Java知识点总结)[Java, JavaIO] showImg(https://segmentfault.com/img/bV9dAj?w=838&h=396); JDK 1.5后提供的输入数据类,此类位于...
阅读 1720·2021-11-24 09:39
阅读 1657·2021-11-22 15:22
阅读 973·2021-09-27 13:36
阅读 3078·2021-09-24 10:34
阅读 3291·2021-07-26 23:38
阅读 2604·2019-08-29 16:44
阅读 947·2019-08-29 16:39
阅读 1066·2019-08-29 16:20