摘要:说明这里用到的项目都是基于的项目。但同时,它和其他的分布式文件系统的区别也是很明显的。能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。放宽了一部分约束,来实现流式读取文件系统数据的目的。是项目的一部分。
关键词
Java、PHP、hdfs、mqrocket、excel、poi、报表
需求背景在业务需求方面,每个企业或多或少都会有报表导出的作业,量少则可是使用输出流或者字符串的输出即可完成,只要指定respose的相应Content-Type即可。如果大量的数据需要导出,尤其是订单这类业务逻辑复杂的报表,导出的时候需要加入各种条件和权限,从数据处理方面就已经很费力了,更何况导出的需求不是一天两天,而是半月一月的数据量,小公司的业务,数量级也可能达到了十多万。
function generateExcel($filename, $header, array &$data) { generateDownHeader($filename); $rs = "
".$v." | "; } $rs .= "|
---|---|
".$v." | "; } else { $rs .= "".$v." | "; } } $rs .= "
这十多万的数据,如果使用一般的方法(上面代码所示)或许是不可行的(其他一般方法没有尝试过),php处理中一般使用curl调用接口,nginx服务器和php中的curl请求超时一般都是30s,30s处理1w条数据的导出工作,如果服务器的性能好,并且是多核的,可以使用multi_curl多线程处理,如果服务器的性能不是很好,这种处理方法或许更耗时。
下面是我使用的curl处理接口数据:
function curl($url, $option = null, $method = "POST", $getCode = false, $header = []) { $curl = curl_init (); curl_setopt($curl, CURLOPT_URL, $url); curl_setopt($curl, CURLOPT_TIMEOUT, 30); if (!array_key_exists("Content-Type", $header)) { $header["Content-Type"] = "application/json;charset=UTF-8"; } $headers = []; if ($header) { foreach ($header as $k=>$v) { $headers[] = $k.": ".$v; } } curl_setopt($curl, CURLOPT_HTTPHEADER, $headers); if ($option) { if (is_array($option)) { $option = json_encode($option); } curl_setopt($curl, CURLOPT_POSTFIELDS, $option); } curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1); curl_setopt($curl, CURLOPT_CUSTOMREQUEST, $method); $result = curl_exec($curl); if ($getCode) { $curl_code = curl_getinfo($curl, CURLINFO_HTTP_CODE); $message = self::isJson($result) ? json_decode($result, true) : $result; $result = ["code" => $curl_code]; if (isset($message["exception"]) && count($message) == 1) { $result["exception"] = $message["exception"]; $result["result"] = null; } else { $result["result"] = $message; } } curl_close($curl); return $result; }
因为数据量大,后来改为多线程:
function curlMulti(array $urls, $options = null, $method = "POST", $getCode = false, $header = []) { $mh = curl_multi_init(); // 添加curl批处理会话 $handles = $contents = []; foreach ($urls as $key => $url) { $handles[$key] = curl_init($url); curl_setopt($handles[$key], CURLOPT_RETURNTRANSFER, 1); curl_setopt($handles[$key], CURLOPT_TIMEOUT, 30); curl_setopt($handles[$key], CURLOPT_CUSTOMREQUEST, $method); if (!array_key_exists("Content-Type", $header)) { $header["Content-Type"] = "application/json;charset=utf-8"; } $headers = []; if ($header) { foreach ($header as $k => $val) { $headers[] = $k.": ".$val; } } curl_setopt($handles[$key], CURLOPT_HTTPHEADER, $headers); if ($options) { if (is_array($options)) { $options = json_encode($options); } curl_setopt($handles[$key], CURLOPT_POSTFIELDS, $options); } curl_multi_add_handle($mh, $handles[$key]); } // 执行批处理句柄 /*$active = null; do{ $mrc = curl_multi_exec($mh, $active); } while ($mrc == CURLM_CALL_MULTI_PERFORM); while ($active and $mrc == CURLM_OK) { if (curl_multi_select($mh) === -1) { usleep(100); do { $mrc = curl_multi_exec($mh, $active); }while($mrc == CURLM_CALL_MULTI_PERFORM); } }// 获取批处理内容 $errors = []; foreach ($handles as $k => $ch) { $errors[$k] = curl_error($ch); $content = curl_multi_getcontent($ch); if ($getCode) { $content = curl_errno($ch) == 0 && self::isJson($content)? json_decode($content,true) : []; } $contents = array_merge($contents,$content); } $info = curl_multi_info_read($mh);*/ $output = $errors = $infos = []; do { while (($execrun = curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM); if ($execrun != CURLM_OK) break; while ($done = curl_multi_info_read($mh)) { $info= curl_getinfo($done["handle"]); $infos["http_code"][] = $info["http_code"]; $result["code"] = $info["http_code"]; $infos["url"][] = $info["url"]; $errors[] = curl_error($done["handle"]); $output = self::isJson(curl_multi_getcontent($done["handle"])) ? array_merge($output, json_decode(curl_multi_getcontent($done["handle"]),true)) : $output; if ($running) curl_multi_select($mh, 30); } } while ($running); $result["result"] = $output; $result["exception"] = $errors; $result["info"] = $infos; foreach ($handles as $ch) { curl_multi_remove_handle($mh, $ch); } curl_multi_close($mh); return $result; }
上面的代码中有一段代码是注释掉的,按照道理来说,上面的代码执行的结果应该和下面的一样,事实证明,却是执行的结果是一样,我这里说的结果不是多线程返回的结果,既然是多线程,那么不同的线程竞争到资源也是不一样的,返回结果出现了混乱,导出的excel数据并不是根据某种排序而排序的,也就是你不知道那个线程先返回了结果,这是问题一,其二,在导出的过程中,发现不同程度的丢失数据,加热管每个线程500条数据,结果在验证数据时,发现仅仅返回了300多条数据,数据变动不一致,第三,过多的数据,依然造成nginx服务器超时,错误code 504。
PS: 为什么在php的中没有使用phpexcel第三方包,原因很简单,测试发现,phpexcel太耗内存,机器吃不消,所以就没用。
初步解决方案既然php的多线程方案不能解决问题,只能找其他的办法,最可靠的也是大家都能想到的,就是队列处理,把导出请求放入到队列中,直接返回给客户端,告诉客户业务正在处理,然后具体的导出交由消费端处理,最后把结果反馈到客户端。
我们都知道php的队列有很多,常用的比如Swoole,Workman以及Gearman等。我选择了Gearman,因为方便,而Swoole原来在我们的项目中,后来被踢掉了,不知原由。
Gearman服务端work的代码demo:
addServer(); $worker->addFunction("export", function (GearmanJob $job) { $workload = $job->workload(); if (($data = $this->parseJson($workload)) == false) { return AppHelper::encodeJson(["code" => "-1", "result" => null, "exception" => "参数错误"]); } $user = isset($data["user"]) && !empty($data["user"]) ? $data["user"] : "guest"; $path = dirname(Yii::$app->basePath) . "/backend/downloads/" . sha1($user) . "/" . date("Y-m-d") . "/"; $filename = isset($data["filename"]) && !empty($data["filename"]) ? $data["filename"] : date("Y-m-d") . "-order.xls"; $rs = $this->getData($data["type"]["data"], $data["type"]["count"], $data["api"], $data["params"]); $this->writeExcel($path, $filename, $rs, $data["header"]); return 200; }); //无际循环运行,gearman内部已有处理,不会出现占用过高死掉的情况 while ($worker->work()) { if ($worker->returnCode() !== GEARMAN_SUCCESS) { echo "error" . PHP_EOL; } } } public function parseJson($str) { $data = json_decode($str, true); return (json_last_error() == JSON_ERROR_NONE) ? $data : false; } public function writeExcel($path, $filename, $data, $header) { if ($this->mkDir($path)) { $data = $this->assembleData($data); $rs = $this->generateExcel($header, $data); file_put_contents(rtrim($path, "/") . "/" . $filename, $rs); } else { echo "目录不存在,写文件错误!"; } return; } public function getData($dataApi, $countApi, $api, $params) { $start = microtime(true); $count = AppHelper::getData($api . $countApi . "?" . http_build_query($params)); echo $api . $countApi . "?" . http_build_query($params).PHP_EOL; echo "总条数:" . $count . PHP_EOL; $params["perpage"] = 500; $times = ceil($count / $params["perpage"]); $data = []; if ($count > 0) { for ($i = 0; $i < $times; $i++) { $params["page"] = $i + 1; $rs = AppHelper::getData($api . $dataApi . "?" . http_build_query($params)); $data = array_merge($data, $rs); } } $end = microtime(true); echo "花费时间:" . ($end - $start) . PHP_EOL; return $data; } public function generateExcel($header, array &$data) { $rs = "
" . $v . " | "; } $rs .= "|
---|---|
" . $v . " | "; } else { $rs .= "" . $v . " | "; } } $rs .= "
Gearman的Client端的代码:
addServer("127.0.0.1", 4730); $client->setCompleteCallback(completeCallBack); $result2 = $client->doBackground("export", $str);//异步进行,只返回处理句柄。 // $result1 = $client->do("export", "do");//do是同步进行,进行处理并返回处理结果。 // $result3 = $client->addTask("export", "addTask");//添加任务到队列,同步进行?通过添加task可以设置回调函数。 // $result4 = $client->addTaskBackground("export", "addTaskBackground");//添加后台任务到队列,异步进行? $client->runTasks();//运行队列中的任务,只是do系列不需要runTask() return $result2; } //绑定回调函数,只对addTask有效 function completeCallBack($task) { echo "CompleteCallback!handle result:".$task->data()."
"; }
ps:要运行上面的代码,需要在服务器或者本地安装Gearman服务,并且需要安装php_gearman扩展,安装教程自行搜索。
如果你的业务逻辑不复杂,到此可以导出几万条数据绰绰有余了,然而,我的问题并没有因此而解决,上司说,不想用Gearman队列处理,最好还是java处理。嗯,没关系,我喜欢这种在技术中跳来跳去的解决问题,既然不满足上司的需求,那就另行方案。
MqRocket+HDFS+POI说明:这里用到的java项目都是基于spring+dubbo/dubbox的项目。所用到的配置或者注解均在spring的相关配置和注解范畴,除了mapper的配置和注解。
三个项目:
mq项目:提供rest服务,发送消息(@rxl)
biz项目:提供dubbo、restfull接口,处理业务(@lee)
data项目:处理数据导出
如上,三个项目分别是不同的工程师所写,我们不关心怎么实现的,只需知道我们能使用每个功能即可。
mq提供的restfull接口:@Path("/message") @Produces({ContentType.APPLICATION_JSON_UTF_8}) @Component("sendMessageService") public class SendMessageImpl implements SendMessageService{ @Resource public IProducer producer; @PUT @Path("send") @Consumes({MediaType.APPLICATION_JSON}) @Override public void sendMessage(Message message) { System.out.println("message" + message.getMessage()); producer.send(message.getTopic(),message.getKey(),message.getMessage()); } }
这样我们在php后台通过put方式,调用该接口,将需要处理的数据发送给导出处理服务端。发送put请求可以使用curl强大的request功能。
curl_setopt($curl, CURLOPT_CUSTOMREQUEST, "PUT");
假如mq提供的rest接口是:http://localhost:8018/mq/message/send,我们需要传递一个json字符串,该字符串原型是一个关联数组,数组的key分别为“topic”、“key”和“message”,topic是消息的主题,需要指定的mq主题去消费,key是消息的key,该topic下面会有很多key,因此,我们的消费方即数据导出方需要根据key做判断处理。message里面就是具体的一下参数,比如需要导出哪些字段,比如文件上传服务器地址等等信息。
$message = [ "topic" => "order_export", "key" => "order_tag_" . $orderNo, "message" => [ "params" => [ ... ], "headers" => [ ... ], "options" => [ ... ], ], ];
完整的接口请求:
http://localhost:8018/mq/message/send?{"topic":"order_export","key":"order_tag_","message":{"params":[],"header":[],"options":[]}}
poi工具类封装Java的Excel API很多,唯独Apache POI这款使用最方便最灵活(或许其他的没有使用过)。
HSSF is the POI Project"s pure Java implementation of the Excel "97(-2007) file format. XSSF is the POI Project"s pure Java implementation of the Excel 2007 OOXML (.xlsx) file format.
HSSF and XSSF provides ways to read spreadsheets create, modify, read and write XLS spreadsheets. They provide:
low level structures for those with special needs
an eventmodel api for efficient read-only access
a full usermodel api for creating, reading and modifying XLS files
在gradle引入poi包:
// java excel api compile "org.apache.poi:poi:3.10.1" compile "org.apache.poi:poi-ooxml:3.9"
package cn.test.web.utils; import cn.test.util.Utils; import org.apache.commons.io.FilenameUtils; import org.apache.poi.hssf.record.crypto.Biff8EncryptionKey; import org.apache.poi.hssf.usermodel.HSSFFont; import org.apache.poi.hssf.usermodel.HSSFFooter; import org.apache.poi.hssf.usermodel.HSSFHeader; import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.openxml4j.exceptions.InvalidFormatException; import org.apache.poi.poifs.filesystem.POIFSFileSystem; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.CellStyle; import org.apache.poi.ss.usermodel.Font; import org.apache.poi.ss.usermodel.Footer; import org.apache.poi.ss.usermodel.Header; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; import org.apache.poi.ss.usermodel.WorkbookFactory; import org.apache.poi.xssf.usermodel.XSSFWorkbook; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Properties; /** * Created with test-data * User zhoujunwen * Date 16/8/11 * Time 下午5:02 */ public class POIUtils { private static final short HEADER_FONT_SIZE = 16; // 大纲字体 private static final short FONT_HEIGHT_IN_POINTS = 14; // 行首字体 public static Workbook createWorkbook(String file) { String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); Workbook wb = null; switch (ext) { case "xls": wb = createHSSFWorkbook(); break; case "xlsx": wb = createXSSFWorkbook(); break; default: wb = createHSSFWorkbook(); } return wb; } public static Workbook createWorkbookByIS(String file, InputStream inputStream) { String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); Workbook wb = null; try { switch (ext) { case "xls": wb = new HSSFWorkbook(inputStream); break; case "xlsx": wb = new XSSFWorkbook(inputStream); break; default: wb = new HSSFWorkbook(inputStream); } } catch (IOException e) { e.printStackTrace(); } return wb; } public static Workbook writeFile(Workbook wb, String file) { if (wb == null || Utils.isEmpty(file)) { return null; } FileOutputStream out = null; try { out = new FileOutputStream(file); wb.write(out); } catch (IOException e) { e.printStackTrace(); } finally { if (out != null) { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } return wb; } public static Workbook createHSSFWorkbook() { //生成Workbook HSSFWorkbook wb = new HSSFWorkbook(); //添加Worksheet(不添加sheet时生成的xls文件打开时会报错) @SuppressWarnings("unused") Sheet sheet = wb.createSheet(); return wb; } public static Workbook createXSSFWorkbook() { XSSFWorkbook wb = new XSSFWorkbook(); @SuppressWarnings("unused") Sheet sheet = wb.createSheet(); return wb; } public static Workbook openWorkbook(String file) { FileInputStream in = null; Workbook wb = null; try { in = new FileInputStream(file); wb = WorkbookFactory.create(in); } catch (InvalidFormatException | IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { e.printStackTrace(); } } return wb; } public static Workbook openEncryptedWorkbook(String file, String password) { FileInputStream input = null; BufferedInputStream binput = null; POIFSFileSystem poifs = null; Workbook wb = null; try { input = new FileInputStream(file); binput = new BufferedInputStream(input); poifs = new POIFSFileSystem(binput); Biff8EncryptionKey.setCurrentUserPassword(password); String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file)); switch (ext) { case "xls": wb = new HSSFWorkbook(poifs); break; case "xlsx": wb = new XSSFWorkbook(input); break; default: wb = new HSSFWorkbook(poifs); } } catch (IOException e) { e.printStackTrace(); } return wb; } /** * 追加一个sheet,如果wb为空且isNew为true,创建一个wb * * @param wb * @param isNew * @param type 创建wb类型,isNew为true时有效 1:xls,2:xlsx * @return */ public static Workbook appendSheet(Workbook wb, boolean isNew, int type) { if (wb != null) { Sheet sheet = wb.createSheet(); } else if (isNew) { if (type == 1) { wb = new HSSFWorkbook(); wb.createSheet(); } else { wb = new XSSFWorkbook(); wb.createSheet(); } } return wb; } public static Workbook setSheetName(Workbook wb, int index, String sheetName) { if (wb != null && wb.getSheetAt(index) != null) { wb.setSheetName(index, sheetName); } return wb; } public static Workbook removeSheet(Workbook wb, int index) { if (wb != null && wb.getSheetAt(index) != null) { wb.removeSheetAt(index); } return wb; } public static Workbook insert(Workbook wb, String sheetName, int row, int start, List> columns) { if (row == 0 || wb == null) return wb; for (int i = start; i < (row + start); i++) { Row rows = wb.getSheet(sheetName).createRow(i); if (columns != null && columns.size() > 0) { for (int j = 0; j < columns.size(); j++) { Cell ceil = rows.createCell(j); ceil.setCellValue(String.valueOf(columns.get(j))); } } } return wb; } /** * 设置excel头部 * * @param wb * @param sheetName * @param columns 比如:["国家","活动类型","年份"] * @return */ public static Workbook setHeader(Workbook wb, String sheetName, List> columns) { if (wb == null) return null; if (sheetName == null) { sheetName = wb.getSheetAt(0).getSheetName(); } return setHeaderStyle(insert(wb, sheetName, 1, 0, columns), sheetName); } /** * 插入数据 * * @param wb Workbook * @param sheetName sheetName,默认为第一个sheet * @param start 开始行数 * @param data 数据,List嵌套List ,比如:[["中国","奥运会",2008],["伦敦","奥运会",2012]] * @return */ public static Workbook setData(Workbook wb, String sheetName, int start, List> data) { if (wb == null) return null; if (sheetName == null) { sheetName = wb.getSheetAt(0).getSheetName(); } if (data != null || data.size() > 0) { if (data instanceof List) { int s = start; for (Object columns : data) { insert(wb, sheetName, data.size() - (s - 1), s, (List>) columns); s++; } } } return wb; } /** * 移除某一行 * * @param wb * @param sheetName sheet name * @param row 行号 * @return */ public static Workbook delRow(Workbook wb, String sheetName, int row) { if (wb == null) return null; if (sheetName == null) { sheetName = wb.getSheetAt(0).getSheetName(); } Row r = wb.getSheet(sheetName).getRow(row); wb.getSheet(sheetName).removeRow(r); return wb; } /** * 移动行 * * @param wb * @param sheetName * @param start 开始行 * @param end 结束行 * @param step 移动到那一行后(前) ,负数表示向前移动 * moveRow(wb,null,2,3,5); 把第2和3行移到第5行之后 * moveRow(wb,null,2,3,-1); 把第3行和第4行往上移动1行 * @return */ public static Workbook moveRow(Workbook wb, String sheetName, int start, int end, int step) { if (wb == null) return null; if (sheetName == null) { sheetName = wb.getSheetAt(0).getSheetName(); } wb.getSheet(sheetName).shiftRows(start, end, step); return wb; } public static Workbook setHeaderStyle(Workbook wb, String sheetName) { Font font = wb.createFont(); CellStyle style = wb.createCellStyle(); font.setBoldweight(HSSFFont.BOLDWEIGHT_BOLD); font.setFontHeightInPoints(FONT_HEIGHT_IN_POINTS); font.setFontName("黑体"); style.setFont(font); if (Utils.isEmpty(sheetName)) { sheetName = wb.getSheetAt(0).getSheetName(); } int row = wb.getSheet(sheetName).getFirstRowNum(); int cell = wb.getSheet(sheetName).getRow(row).getLastCellNum(); for (int i = 0; i < cell; i++) { wb.getSheet(sheetName).getRow(row).getCell(i).setCellStyle(style); } return wb; } public static Workbook setHeaderOutline(Workbook wb, String sheetName, String title) { if (wb == null) return null; if (Utils.isEmpty(sheetName)) { sheetName = wb.getSheetAt(0).getSheetName(); } Header header = wb.getSheet(sheetName).getHeader(); header.setLeft(HSSFHeader.startUnderline() + HSSFHeader.font("宋体", "Italic") + "打鸡血的口号!" + // 比如:爱我中华 HSSFHeader.endUnderline()); header.setCenter(HSSFHeader.fontSize(HEADER_FONT_SIZE) + HSSFHeader.startDoubleUnderline() + HSSFHeader.startBold() + title + HSSFHeader.endBold() + HSSFHeader.endDoubleUnderline()); header.setRight("时间:" + HSSFHeader.date() + " " + HSSFHeader.time()); return wb; } public static Workbook setFooter(Workbook wb, String sheetName, String copyright) { if (wb == null) return null; if (Utils.isEmpty(sheetName)) { sheetName = wb.getSheetAt(0).getSheetName(); } Footer footer = wb.getSheet(sheetName).getFooter(); if (Utils.isEmpty(copyright)) { copyright = "中华人民共和国"; // 版权信息,自己公司的名字或者app的名字 } footer.setLeft("Copyright @ " + copyright); footer.setCenter("Page:" + HSSFFooter.page() + " / " + HSSFFooter.numPages()); footer.setRight("File:" + HSSFFooter.file()); return wb; } public static String create(String sheetNm, String file, List> header, List> data, String title, String copyright) { Workbook wb = createWorkbook(file); if (Utils.isEmpty(sheetNm)) { sheetNm = wb.getSheetAt(0).getSheetName(); } setHeaderOutline(wb, sheetNm, title); setHeader(wb, sheetNm, header); setData(wb, sheetNm, 1, data); setFooter(wb, sheetNm, copyright); writeFile(wb, file); if (wb != null) { return file; } return null; } public static String getSystemFileCharset() { Properties pro = System.getProperties(); return pro.getProperty("file.encoding"); } // TODO 后面增加其他设置 }HDFS工具类封装
Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的。HDFS是Apache Hadoop Core项目的一部分。
HDFS有着高容错性(fault-tolerant)的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求(requirements)这样可以实现流的形式访问(streaming access)文件系统中的数据。
在gradle中引入hdfs:
// jersey compile "com.sun.jersey:jersey-core:1.19.1" compile "com.sun.jersey:jersey-server:1.19.1" compile "com.sun.jersey:jersey-client:1.19.1" compile "com.sun.jersey:jersey-json:1.19.1" // hadoop compile ("org.apache.hadoop:hadoop-common:2.7.2") { exclude(module: "jersey") exclude(module: "contribs") } compile ("org.apache.hadoop:hadoop-hdfs:2.7.2") { exclude(module: "jersey") exclude(module: "contribs") } compile ("org.apache.hadoop:hadoop-client:2.7.2") { exclude(module: "jersey") exclude(module: "contribs") }`
package cn.test.web.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; import org.apache.poi.ss.usermodel.Workbook; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URISyntaxException; /** * Created with test-data * User zhoujunwen * Date 16/8/11 * Time 下午5:41 */ public class HDFSUtils { private static FileSystem fs = null; public static FileSystem getFileSystem(Configuration conf) throws IOException, URISyntaxException { fs = FileSystem.get(conf); //fs = FileSystem.newInstance(conf); return fs; } /** * 判断路径是否存在 * * @param conf * @param path * @return * @throws IOException */ public static boolean exits(Configuration conf, String path) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); return fs.exists(new Path(path)); } /** * 创建文件 * * @param conf * @param filePath * @param contents * @throws IOException */ public static void createFile(Configuration conf, String filePath, byte[] contents) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path path = new Path(filePath); FSDataOutputStream outputStream = fs.create(path); outputStream.write(contents, 0, contents.length); outputStream.hflush(); outputStream.close(); fs.close(); } /** * 创建文件 * * @param conf * @param filePath * @param fileContent * @throws IOException */ public static void createFile(Configuration conf, String fileContent, String filePath) throws IOException, URISyntaxException { createFile(conf, filePath, fileContent.getBytes()); } /** * 上传文件 * * @param conf * @param localFilePath * @param remoteFilePath * @throws IOException */ public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path localPath = new Path(localFilePath); Path remotePath = new Path(remoteFilePath); fs.copyFromLocalFile(true, true, localPath, remotePath); fs.close(); } /** * 删除目录或文件 * * @param conf * @param remoteFilePath * @param recursive * @return * @throws IOException */ public static boolean deleteFile(Configuration conf, String remoteFilePath, boolean recursive) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); boolean result = fs.delete(new Path(remoteFilePath), recursive); fs.close(); return result; } /** * 删除目录或文件(如果有子目录,则级联删除) * * @param conf * @param remoteFilePath * @return * @throws IOException */ public static boolean deleteFile(Configuration conf, String remoteFilePath) throws IOException, URISyntaxException { return deleteFile(conf, remoteFilePath, true); } /** * 文件重命名 * * @param conf * @param oldFileName * @param newFileName * @return * @throws IOException */ public static boolean renameFile(Configuration conf, String oldFileName, String newFileName) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path oldPath = new Path(oldFileName); Path newPath = new Path(newFileName); boolean result = fs.rename(oldPath, newPath); fs.close(); return result; } /** * 创建目录 * * @param conf * @param dirName * @return * @throws IOException */ public static boolean createDirectory(Configuration conf, String dirName) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path dir = new Path(dirName); boolean result = fs.mkdirs(dir); fs.close(); return result; } /** * 列出指定路径下的所有文件(不包含目录) * * @param fs * @param basePath * @param recursive */ public static RemoteIteratorlistFiles(FileSystem fs, String basePath, boolean recursive) throws IOException { RemoteIterator fileStatusRemoteIterator = fs.listFiles(new Path(basePath), recursive); return fileStatusRemoteIterator; } /** * 列出指定路径下的文件(非递归) * * @param conf * @param basePath * @return * @throws IOException */ public static RemoteIterator listFiles(Configuration conf, String basePath) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); RemoteIterator remoteIterator = fs.listFiles(new Path(basePath), false); fs.close(); return remoteIterator; } /** * 列出指定目录下的文件子目录信息(非递归) * * @param conf * @param dirPath * @return * @throws IOException */ public static FileStatus[] listStatus(Configuration conf, String dirPath) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath)); fs.close(); return fileStatuses; } /** * 读取文件内容并写入outputStream中 * * @param conf 配置 * @param filePath 文件路径 * @param os 输出流 * @return * @throws IOException */ public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException, URISyntaxException { FileSystem fs = getFileSystem(conf); Path path = new Path(filePath); try (FSDataInputStream inputStream = fs.open(path)) { Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream); wb.write(os); inputStream.close(); } finally { fs.close(); } } /** * 读取文件内容并返回 * @param conf * @param filePath * @return * @throws IOException * @throws URISyntaxException */ public static String readFile(Configuration conf, String filePath) throws IOException, URISyntaxException { String fileContent = null; FileSystem fs = getFileSystem(conf); Path path = new Path(filePath); InputStream inputStream = null; ByteArrayOutputStream outputStream = null; try { inputStream = fs.open(path); outputStream = new ByteArrayOutputStream(inputStream.available()); IOUtils.copyBytes(inputStream, outputStream, conf); byte[] lens = outputStream.toByteArray(); fileContent = new String(lens, "UTF-8"); } finally { IOUtils.closeStream(inputStream); IOUtils.closeStream(outputStream); fs.close(); } return fileContent; } }
对于hdfs我多带带有谢了两个类,一个是HDFSFileUploader,一个是Configuration。如类名,前者用于文件上传,后者用于hdfs的配置。
HDFSFileUploaderpackage cn.test.web.utils.hadoop; import cn.test.common.log.Log; import cn.test.common.log.LogFactory; import cn.test.common.util.Utils; import cn.test.web.utils.HDFSUtils; import org.apache.commons.lang.NullArgumentException; import java.io.IOException; import java.net.URISyntaxException; import java.util.UUID; /** * Created with test-data * User zhoujunwen * Date 16/8/11 * Time 下午5:42 */ public class HDFSFileUploader { public static final byte FROM_LOCAL_COPY = 1; // 从本地上传文件 public static final byte FROM_CONTENT_WRITE = 2; // 读取字符串或字节,生成文件 private static final Log LOGGER = LogFactory.getLog(HDFSFileUploader.class); private static final String HDFS_SCHEMA = "hdfs://"; private static final String SEPARATOR = "/"; private static final String SUFFIX_PREFIX = "."; private static final int BUFFER_SIZE = 1024; private static final Configuration CONF = new Configuration(); /** * 上传二进制文件,使用默认配置的域名,随机生成文件名 * * @param path * @param suffix * @param contents * @return */ public static String upload(String path, String suffix, byte[] contents) { return upload(null, path, suffix, contents); } /** * 上传二进制文件,随机生成文件名 * * @param domain * @param path * @param suffix * @param contents * @return */ public static String upload(String domain, String path, String suffix, byte[] contents) { return upload(domain, path, null, suffix, contents); } /** * 上传二进制文件,指定文件名,只能通过流上传 * * @param domain * @param path * @param filename * @param suffix * @param content * @return */ public static String upload(String domain, String path, String filename, String suffix, final byte[] content) { return upload(domain, path, filename, suffix, new String(content), FROM_CONTENT_WRITE); } /** * 上传文件,默认域名和随机文件名 * * @param path * @param suffix * @param src * @return */ public static String upload(String path, String suffix, String src, byte fromLocal) { return upload(null, path, suffix, src, fromLocal); } /** * 上传文件到指定域名的指定目录,文件名随机生成 * * @param domain 域名,比如 10.25.126.28:9000 * @param path 文件路径,比如 /usr/local/com.hd.test/2016-08-08/ * @param suffix 文件后缀,比如 .xsl,xsl * @param src 文件内容,字符串 || 本地文件路径 * @return String 完整的文件名 */ public static String upload(String domain, String path, String suffix, String src, byte fromLocal) { return upload(domain, path, null, suffix, src, fromLocal); } /** * 上传文件,指定了域名,路径,文件名,后缀 * * @param domain 域名 * @param path 路径 * @param filename 文件名 * @param suffix 后缀 * @param src 内容 || 本地路径 * @return */ public static String upload(String domain, String path, String filename, String suffix, String src, byte fromLocal) { String filePath = getRealAddr(domain, path, suffix, filename); System.out.println(filePath); try { switch (fromLocal) { case FROM_LOCAL_COPY: HDFSUtils.copyFromLocalFile(CONF, src, filePath); break; case FROM_CONTENT_WRITE: HDFSUtils.createFile(CONF, src, filePath); break; } return filePath; } catch (IOException | URISyntaxException e) { LOGGER.warn("上传文件失败:{}",e.getMessage()); } return null; } /** * 文件完整的路径 * * @param domain 域名 * @param path 目录路径 * @param suffix 后缀 * @param filename 文件名 * @return */ private static String getRealAddr(String domain, String path, String suffix, String filename) { if (!Utils.isEmpty(domain) && !domain.startsWith(HDFS_SCHEMA)) { domain = HDFS_SCHEMA + domain; } else { domain = ""; } path = getPath(path); filename = getFilename(filename, suffix); return String.format("%s%s%s", domain, path, filename); } /** * 文件路径 * * @param path * @return */ private static String getPath(String path) { if (Utils.isEmpty(path)) { throw new NullArgumentException("path id null"); } if (!path.startsWith(SEPARATOR)) { path = SEPARATOR + path; } if (!path.endsWith(SEPARATOR)) { path = path + SEPARATOR; } return path; } /** * 生成文件名 * * @param filename * @param suffix * @return */ private static String getFilename(String filename, String suffix) { if (Utils.isEmpty(filename)) { filename = generateFilename(); } if (!Utils.isEmpty(suffix)) { filename = suffix.equals(SEPARATOR) ? filename : (filename.endsWith(suffix) ? filename : ((filename.endsWith(SUFFIX_PREFIX) || suffix.startsWith(SUFFIX_PREFIX)) ? filename + suffix : filename + SUFFIX_PREFIX + suffix)); } return filename; } /** * 生成文件名 * * @return */ private static String generateFilename() { return getUuid(false); } /** * 生成UUID * * @param isNeedHyphen * @return */ public static String getUuid(boolean isNeedHyphen) { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); if (isNeedHyphen) { str = str.replaceAll("-", ""); } return str; } public static void setConfResource(final Configuration config) { CONF.addResource(config); } }
HDFSFileUploader中的一系列方法,用于上传不同类型的文件,比如二进制文件,字符串等,还有hdfs的copy本地文件以及文件名uuid生成等方法。
Configurationpackage cn.test.web.utils.hadoop; import cn.test.web.utils.CommonUtils; import org.apache.commons.io.FilenameUtils; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * Created with test-data * User zhoujunwen * Date 16/8/9 * Time 上午9:30 * 建议使用方法: ** * 在使用的地方直接注入hadoopConfig: * * @Resource private Configuration hadoopConfig; */ public class Configuration extends org.apache.hadoop.conf.Configuration { private Resource[] resources; public void setResources(List* **
*classpath:/spring/core-site.xml *filenames) throws IOException { List resources = new ArrayList<>(); if (filenames != null && filenames.size() > 0) { for (String filename : filenames) { filename = filename.trim(); String realName = getFileName(filename); String ext = FilenameUtils.getExtension(realName); if (ext.equals("xml")) { PathMatchingResourcePatternResolver pathMatchingResourcePatternResolver = new PathMatchingResourcePatternResolver(); try { Resource[] resourceList = pathMatchingResourcePatternResolver.getResources(filename); Collections.addAll(resources, resourceList); } catch (IOException e) { e.printStackTrace(); } } } } for (Resource resource : resources) { this.addResource(resource.getURL()); } } private String getFileName(String fileName) { return CommonUtils.getFileName(fileName); } }
这个类很简单,其实是集成了hadoop的org.apache.hadoop.conf.Configuration类,目的是为了在spring配置文件中,灵活的指定hadoop的配置文件,所用到的就是org.apache.hadoop.conf.Configuration的addResource(String name)方法,下面是在spring xml中的配置。
导出订单处理(mq消费端)
classpath:META-INF/hadoop/*.xml
package cn.test.web.mq.consumer; ... // 很多依赖包 /** * Created with test-data * User zhoujunwen * Date 16/8/9 * Time 下午2:14 */ public class OrderExportHandler implements IMessageHandler{ private static final Log LOGGER = LogFactory.getLog(OrderExportHandler.class); private static final int MUL_SEC = 1000; private static final Gson GSON = new Gson(); @Value("${image_server}") private String imageServer; @Autowired private DataManager manager; @Override public void handle(final String key, final String message) { System.out.println("message" + message); Pattern p = Pattern.compile("-"); String[] skey = p.split(key); if (skey.length < 3) { return; } int res = insert(skey[1], skey[0], skey[2]); LOGGER.debug("主键:{}", res); if (res > 0) { //插入数据成功,执行导出数据逻辑 Map data = manager.parseData(message); List> header = null; List> content = null; List orders = null; DataExportLog log = new DataExportLog(); log.setDelid(res); log.setUid(Integer.valueOf(skey[2])); if (data.containsKey("params")) { LOGGER.debug("params:{}", data.get("params")); orders = manager.getOrders(data.get("params")); LOGGER.debug("导出数据的条数:{}", orders.size()); } if (orders == null || orders.size() == 0) { log.setStatus((byte) 4); } else if (data.containsKey("header") && (data.get("header") instanceof Map)) { Object obj = data.get("header"); Map map = (obj instanceof Map) ? manager.parseHeader((Map ) obj) : null; if (map != null && map.size() > 0) { if (map.containsKey("header")) { header = getHeader(map.get("header")); } if (map.containsKey("key")) { content = getContent(orders, map.get("key")); } } // 调用hdfs 接口,上传文件 if (!Utils.isEmpty(header) || !Utils.isEmpty(content)) { // 生成excel文件 String fName = getFilename(data); String localFile = manager.writeExecelFile(fName, header, content, null, null); String file = manager.copyFileFromLocal(skey[0], localFile); if (Utils.isEmpty(localFile) || Utils.isEmpty(file)) { log.setStatus((byte) 3); } else { log.setStatus((byte) 1); log.setLink(file); } LOGGER.info("本地临时文件:{}", localFile); LOGGER.info("上传到hadoop服务器中的文件:{}", file); } } update(log); } } // TODO // 处理数据,这里面会调用biz项目的dubbo接口 // 具体的操作不在这里面写 }
订单导出逻辑都在上面的类,以及DataManager中进行处理,期间获取数据等接口则由biz项目的dubbo接口提供,具体业务逻辑在此不涉及。
下面会给出manager.writeExecelFile(fName, header, content, null, null);方法和manager.copyFileFromLocal(skey[0], localFile);方法的code:
public String writeExecelFile(String filename, List> header, List> datas, String title, String copyright) { SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd"); String date = sd.format(new Date()); if (Utils.isEmpty(filename)) { filename = HDFSFileUploader.getUuid(true) + this.ext; } String filePath = this.tmpDir + "/" + date + "/" + filename; filePath = filePath.replaceAll("//", "/"); File f = new File(CommonUtils.getFilePath(filePath)); if (!f.exists() && !f.isDirectory()) { f.mkdir(); } if (Utils.isEmpty(title)) { title = DEFAULT_TITLE; } if (Utils.isEmpty(copyright)) { copyright = this.copyright; } return POIUtils.create(null, filePath, header, datas, title, copyright); }
writeExecelFile方法调用了poi的create方法,此时临时文件已生成。
还有一点需要说一下,比如临时路径,上传到hdfs的路径,版权信息等最好是在配置文件中可配置的,这就依赖予spring的org.springframework.beans.factory.config.PropertyPlaceholderConfigurer类,他可以做到,我们只需要在代码中这么写并且在properties文件中写入相应的配置即可:
@Value("${hdfs_upload_dir}") private String uploadDir; @Value("${file_tmp_dir}") private String tmpDir; @Value("${copyright}") private String copyright; @Value("${default_file_ext}") private String ext;
再看看copyFileFromLocal这个方法:
/** * 写hdfs文件 * * @param type * @param file * @return */ public String copyFileFromLocal(String type, String file) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); String date = format.format(new Date()); String path = this.uploadDir + type + "/" + date + "/"; HDFSFileUploader.setConfResource(hadoopConfig); return HDFSFileUploader.upload(path, this.ext, file, HDFSFileUploader.FROM_LOCAL_COPY); }
这个方法中调用了HDFSFileUploader.upload的方法,即上面展示的一个封装类中的方法。需要注意的是,这地方注入了hadoop的配置文件HDFSFileUploader.setConfResource(hadoopConfig);。而hadoop得Configuration这样引入在DataMananager类中:
@Resource private Configuration hadoopConfig;
到此,我们把生成的excel文件上传到了hdfs的指定文件路径。可以使用hadoop客户端的命令查看:
hadoop fs -ls /cn/test/order/ (这里是上传路径)订单导出(下载)
订单导出,这里由java后端直接提供rest接口,如果使用php的hdfs第三方包phdfs(github),用起来并不那么书顺畅,编译时报错。
好吧,看看这个接口是怎么写的:
package cn.test.web.impl; import cn.test.common.log.Log; import cn.test.common.log.LogFactory; import cn.test.util.Utils; import cn.test.web.manager.DataManager; import cn.test.web.service.DownloadService; import cn.test.web.utils.CommonUtils; import com.alibaba.dubbo.rpc.protocol.rest.support.ContentType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import java.io.IOException; import java.net.URISyntaxException; /** * Created with test-data * User zhoujunwen * Date 16/8/16 * Time 下午5:21 */ @Path("download") @Component("downloads") @Produces({ContentType.APPLICATION_JSON_UTF_8}) public class DownloadServiceImpl implements DownloadService { private static final Log LOGGER = LogFactory.getLog(DownloadServiceImpl.class); @Autowired private DataManager manager; @Override @GET @Path("order") public void down(@Context HttpServletResponse response, @QueryParam("url") String url, @QueryParam("uid") Integer uid) { LOGGER.debug("下载地址:{}", url); if (Utils.isEmpty(url)) { return; } String filename = CommonUtils.getFileName(url); // 设置头部 response.setContentType(MediaType.APPLICATION_OCTET_STREAM); response.setContentType("application/vnd.ms-excel;charset=gb2312"); response.setHeader("Content-Disposition", "attachment;filename=" + filename); try { // 读取并写入下载数据 manager.readFile(url, response.getOutputStream()); response.flushBuffer(); } catch (IOException | URISyntaxException e) { LOGGER.error(e.getMessage()); } } }
PHP页面只需要一个超级链接即可。优化了一下,线上接口全部走内网的,因此,在a标签中不可能直接把该接口的ip暴露出去,因此在nginx服务器做了代理配置,只需要访问一个downloads/order?url=xxx&uid=xxx即可。
location /downloads/ { proxy_pass http://127.0.0.1:8086/presentation/download/; }踩过的坑 多线程获取调用biz接口
public ListgetOrders(Object params) { OrderSearch search = null; if (params != null && (params instanceof Map)) { System.out.println("params:" + params); search = GSON.fromJson(GSON.toJson(params), OrderSearch.class); System.out.println("title:" + search.getTitle()); } else { search = new OrderSearch(); } int count = orderService.searchCount(search); int cycleTimes = (int) Math.ceil(count * 1.0 / TIMES_IN_SIGNEL_PROCESSOR); LOGGER.debug("数据总条数count:{},外部循坏执行次数:times:{}", count, cycleTimes); // 获取所有并发任务的运行结果 List orders = new ArrayList<>(); int page = 0; for (int j = 0; j < cycleTimes; j++) { int signel = (count > TIMES_IN_SIGNEL_PROCESSOR) ? TIMES_IN_SIGNEL_PROCESSOR : count; count = count - signel; int poolNum = (int) Math.ceil(signel * 1.0 / LIMIT); LOGGER.debug("线程池数量:{}", poolNum); // 创建一个线程池 ExecutorService pool = Executors.newFixedThreadPool(poolNum); // 创建多个有返回值的任务 List list = new ArrayList (); for (int i = 0; i < poolNum; i++) { Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search); // 执行任务并获取Future对象 Future f = pool.submit(c); list.add(f); } // 关闭线程池 pool.shutdown(); try { Thread.sleep(THREAD_SLEEP); } catch (InterruptedException e) { LOGGER.debug("线程休眠时,引起中断异常:{}", e.getMessage()); } for (Future f : list) { // 从Future对象上获取任务的返回值 try { orders.addAll((Collection extends Order>) f.get()); LOGGER.debug(">>>线程:{}返回的数据条数:{}", f.toString(), ((Collection extends Order>) f.get()).size()); } catch (InterruptedException | ExecutionException e) { LOGGER.warn("调用OrderService接口的search方法失败:{}", e.getMessage()); return null; } } } re
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/30362.html
摘要:说明这里用到的项目都是基于的项目。但同时,它和其他的分布式文件系统的区别也是很明显的。能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。放宽了一部分约束,来实现流式读取文件系统数据的目的。是项目的一部分。 关键词 Java、PHP、hdfs、mqrocket、excel、poi、报表 需求背景 在业务需求方面,每个企业或多或少都会有报表导出的作业,量少则可是使用输出流或者字符串的...
摘要:接着上一篇基于的和开源库导出大数据报表一的遗留的问题开始,这篇做优化处理。这个错误造成的直接问题是数据空白,因为只会执行一次,第二次条件就为了。几经波折,终于知道,引起错误的原因是包冲突,和包的冲突。 接着上一篇《基于haddop的HDFS和Excel开源库POI导出大数据报表(一)》的遗留的问题开始,这篇做优化处理。 优化导出流程 在一开始的时候,当我获取到订单的数量,遍历订单,获取...
摘要:的使用及导出报表首先,了解是什么一基本概念是软件基金会的开放源码函式库,提供给程序对格式档案读和写的功能。 POI的使用及导出excel报表 首先,了解poi是什么? 一、基本概念 Apache POI是Apache软件基金会的开放源码函式库,POI提供API给Java程序对Microsoft Office格式档案读和写的功能。 二、基本结构 HSSF - 提供读写...
摘要:积分消费明细对账单其中,有四个参数,分别是,,,。导出读取数据库的信息,转成。 public void detailExport() { String sourceSystem = getPara(source_system); String dataDate = getPara(data_date); Integer pointsType = get...
摘要:阅读原文如何高效导出百万级数据在一个具有统计功能的系统中,导出功能几乎是一定的,如何导出导出的数据有多少如何高效的导出简介什么是就不用介绍了,这里主要说明不同版本下每个下的行列限制。 阅读原文:POI如何高效导出百万级Excel数据? 在一个具有统计功能的系统中,导出excel功能几乎是一定的,如何导出excel?导出的数据有多少?如何高效的导出? Excel简介什么是excel就不用...
阅读 2054·2023-04-26 02:23
阅读 1795·2021-09-03 10:30
阅读 1359·2019-08-30 15:43
阅读 1199·2019-08-29 16:29
阅读 543·2019-08-29 12:28
阅读 2341·2019-08-26 12:13
阅读 2200·2019-08-26 12:01
阅读 2413·2019-08-26 11:56