我用libcurl下载一个tar.gz文件,下载时需要解压该文件,即当下载了文件的chunk时,立即解压文件的chunk,而不是下载整个文件时解压整个文件。有没有符合我要求的C/C++库
我尝试使用 libarchive 来提取文件,但在提取文件的第一个块时它返回了截断的 gzip 输入。看来 libarchive 需要整个文件才能提取它。这是我的代码。我不确定我是否正确使用了 libarchive,因为我是新手。
#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <atomic>
#include <thread>
// libarchive
#include <archive.h>
#include <archive_entry.h>
#include <curl/curl.h>
struct mydata {
void *buffer;
ssize_t *size;
};
struct curldata {
void *buffer;
ssize_t *size;
CURL *curl;
};
std::atomic<bool> rd(true);
struct archive *archive, *archivefd;
std::atomic<bool> start_read(false);
la_ssize_t libarchiveRead(struct archive* a, void* client_data, const void** block)
{
if(!rd) {
mydata *my_data = (mydata*)client_data;
std::cout << "calling custom read(), size " << *(my_data->size) << std::endl;
*block = my_data->buffer;
rd=true;
return *(my_data->size);
}
return 0;
}
int libarchiveClose(struct archive* a, void* client_data)
{
std::cout << "calling custom close() for archive" << std::endl;
mydata *my_data = (mydata*)client_data;
delete my_data;
return (ARCHIVE_OK);
}
int libarchiveClosefd(struct archive* a, void* client_data)
{
std::cout << "calling custom close() for archivefd" << std::endl;
mydata *my_data = (mydata*)client_data;
delete my_data;
return (ARCHIVE_OK);
}
static size_t curlWriteFunction(void *ptr, size_t size, size_t nmemb, void *write_data) {
//size is always 1
curldata *my_data = (curldata*)(write_data);
*(my_data->size) = nmemb * size;
std::cout << "calling curlWriteFunction(), size: " << size << " , nmemb: " << nmemb
<< " , my_data->size: " << *(my_data->size) << std::endl;
memcpy(my_data->buffer, ptr, *(my_data->size));
curl_easy_pause(my_data->curl, CURL_WRITEFUNC_PAUSE);
rd=false;
return (*(my_data->size));
}
static size_t progress(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow) {
CURL *curl = (CURL *)clientp;
(void)ultotal;
(void)ulnow;
if(dltotal == 0) {
return 0;
}
if(rd) {
curl_easy_pause(curl, CURLPAUSE_CONT);
std::cout << "progress: " << dlnow/dltotal * 100 << "%" << std::endl;
}
return 0;
}
void readarchive(void *client_data) {
struct archive_entry *entry;
int flags = ARCHIVE_EXTRACT_TIME;
flags |= ARCHIVE_EXTRACT_PERM;
flags |= ARCHIVE_EXTRACT_ACL;
flags |= ARCHIVE_EXTRACT_FFLAGS;
while(rd);
std::cout << "calling archive_read_open for archive.." << std::endl;
int res = archive_read_open(archive,
client_data,
nullptr,
(archive_read_callback*)libarchiveRead,
(archive_close_callback*)libarchiveClose);
std::cout << "called archive_read_open for archive.." << std::endl;
res = archive_read_next_header(archive, &(entry));
while(res == ARCHIVE_OK ) {
std::cout << "Extracting for archive " << archive_entry_pathname(entry) << "..." << std::endl;
// extract current entry
archive_read_extract(archive, entry, flags);
// read next if available
res = archive_read_next_header(archive, &(entry));
}
std::cout << "archive_read_next_header for archive failed, errcode: " << res << " error: " << archive_error_string(archive) << std::endl;
}
//size_t curlWriteFunction(void *ptr, size_t size, size_t nmemb,FILE* fptr) {
// //size is always 1
// std::cout << "calling curlWriteFunction().." << std::endl;
// return fwrite(ptr, size, nmemb, fptr);
//}
int main(int argc, char** argv) {
if(argc < 3)
{
std::cout << argv[0] << "{-r | -w} file[s]" << std::endl;
return 1;
}
std::vector<std::string> filenames;
filenames.reserve(argc);
while (*++argv != nullptr)
{
filenames.emplace_back(*argv);
}
bool modeRead = (filenames[0] == "-r");
std::cout << filenames[0] << " " << filenames[1] << std::endl;
// archive related variables
char buff_archive[16 * 1024], buff_archivefd[16 * 1024];
if(modeRead)
{
archive = archive_read_new();
archive_read_support_filter_gzip(archive);
archive_read_support_format_tar(archive);
mydata *client_data = new mydata();
int res;
char *buff1 = new char[16 * 1024];
client_data->size = new ssize_t;
*(client_data->size) = 0;
client_data->buffer = buff1;
curldata *curl_data = new curldata();
curl_data->size=client_data->size;
curl_data->buffer=buff1;
CURL *curl = curl_easy_init();
curl_data->curl = curl;
curl_easy_setopt(curl, CURLOPT_URL, filenames[1].c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, curl_data);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriteFunction);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, curl);
curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION,progress);
std::thread t(readarchive, client_data);
CURLcode result = curl_easy_perform(curl);
if(result != CURLE_OK) {
std::cout << "curl perform failed, errcode; " << result << " err: " << curl_easy_strerror(result) << std::endl;
}
//std::cout << "calling archive_read_open for archivefd.." << std::endl;
//res = archive_read_open(archivefd,
// client_datafd,
// nullptr,
// (archive_read_callback*)libarchiveReadfd,
// (archive_close_callback*)libarchiveClosefd);
//std::cout << "called archive_read_open for archivefd.." << std::endl;
//res = archive_read_next_header(archivefd, &(entry));
//if (res != ARCHIVE_OK) {
// std::cout << "archive_read_next_header for archivefd failed, errcode: " << res << " error: " << archive_error_string(archivefd) << std::endl;
//}
//while(res == ARCHIVE_OK) {
// std::cout << "Extracting for archivefd " << archive_entry_pathname(entry) << "..." << std::endl;
// // extract current entry
// archive_read_extract(archivefd, entry, flags);
// // read next if available
// res = archive_read_next_header(archivefd, &(entry));
//}
t.join();
delete client_data->size;
delete []buff1;
archive_read_close(archive);
archive_read_free(archive);
archive_read_free(archive);
curl_easy_cleanup(curl);
}
return 0;
}
libarchive 连接到外部程序以实现某些压缩算法,包括gunzip。您不能只向其传递一个文件块并让它对其进行解压缩,但您可以向其传递一个“文件描述符”,以便它将数据通过管道传输到gunzip,然后通过libarchive返回以动态解压缩和解压存档。
这是一个没有任何错误处理的快速示例程序,它采用本地压缩 tar(文档说这也可以是套接字),为其分配一个文件描述符并将其直接提取到磁盘。我已经通过它传递了千兆字节大小的文件,并且它只使用了几KB的内存,因此它似乎具有您想要的功能。我使用的缓冲区大小为 512,因为存档文件由 512 字节块组成,但这可能更大:
#include <archive.h>
#include <archive_entry.h>
#include <fcntl.h>
#include <unistd.h>
int main() {
const char *input_name = "compressed.tgz";
struct archive *input_file = archive_read_new();
struct archive_entry *entry;
struct archive *output_file = archive_write_disk_new();
int flags = ARCHIVE_EXTRACT_ACL; // Attempt to restore Access Control lists.
flags |= ARCHIVE_EXTRACT_FFLAGS; // Attempt to restore file attributes.
flags |= ARCHIVE_EXTRACT_OWNER; // User and group IDs set on the file.
flags |= ARCHIVE_EXTRACT_PERM; // Full permissions (including SGID, SUID, and sticky bits) are restored.
flags |= ARCHIVE_EXTRACT_TIME; // Timestamps (mtime, ctime, and atime) are restored.
flags |= ARCHIVE_EXTRACT_UNLINK; // Existing files on disk will be unlinked before creating them.
flags |= ARCHIVE_EXTRACT_XATTR; // Attempt to restore extended file attributes.
archive_write_disk_set_options(output_file, flags);
archive_write_disk_set_standard_lookup(output_file);
archive_read_support_filter_all(input_file);
archive_read_support_format_all(input_file);
int fd = open(input_name, O_RDONLY);
archive_read_open_fd(input_file, fd, 512);
char buffer[512];
size_t buffer_size = 512;
ssize_t remaining;
while (archive_read_next_header(input_file, &entry) != ARCHIVE_EOF) {
archive_write_header(output_file, entry);
remaining = archive_read_data(input_file, buffer, buffer_size);
while (remaining > 0) {
archive_write_data(output_file, buffer, buffer_size);
remaining = archive_read_data(input_file, buffer, buffer_size);
}
}
archive_read_free(input_file);
archive_write_free(output_file);
}
用套接字对此进行测试后,结果表明不可能使用 libarchive,尽管它的(非常有限的)文档似乎声称可以。它适用于未压缩的 tars,但对于压缩的 tars,libarchive 使用先行代码来确定在调用 archive_read_open_fd 时需要整个文件的压缩格式。您
可以在客户端上执行此操作,然后将 input_file 存档结构在数据之前上传到服务器,但似乎您只处理下载的一侧,因此这是行不通的。 我最终根本没有使用 libarchive。由于我在 Linux 上进行开发,因此我将 fork 将从套接字读取的数据打包并通过管道传输到其中。这是一个高度简化的示例,没有错误处理或所需的套接字代码:
int extract_pipe[2]{-1, -1};
pipe(extract_pipe);
pid_t pid = fork();
if (pid == 0) {
// Child process.
dup2(extract_pipe[0], STDIN_FILENO); // Duplicate read end of pipe to stdin.
close(extract_pipe[0]);
close(extract_pipe[1]);
// Pipe data into tar through stdin.
// If the compression program is not installed, execlp just fails.
execlp("tar", "tar", "--gzip", "-xC", "/output/path", static_cast<char *>(0));
// Shouldn't get here.
exit(EXIT_FAILURE);
} else {
// Parent process.
// Has been redirected to child process stdin, so fd can now be closed.
close(extract_pipe[0]);
int socket_fd; // Assuming this is set to a socket fd with accept().
size_t download_size; // Assuming this is set by header information read from the socket.
const size_t kReadSize = 1024; // Size of chunks your file is downloaded in.
int bytes_uploaded = 0;
while (bytes_uploaded < download_size) {
bytes_uploaded += splice(socket_fd, NULL, extract_pipe[1], NULL, kReadSize, 0);
}
close(extract_pipe[1]); // Close pipe to send EOF flag to tar.
int process_status;
// Hang until process finished, use WNOHANG flag if you don't want to do this.
(void)waitpid(pid, &process_status, 0);
if (WIFEXITED(process_status) && (WEXITSTATUS(process_status) == 0))
{ // Tar succeeded.
} else {
// Tar failed.
}
}