diff --git a/dlib/matlab/subprocess_stream.cpp b/dlib/matlab/subprocess_stream.cpp new file mode 100644 index 000000000..d4647ba10 --- /dev/null +++ b/dlib/matlab/subprocess_stream.cpp @@ -0,0 +1,343 @@ +// Copyright (C) 2016 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. + +#include "subprocess_stream.h" + +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------------------- + +namespace dlib +{ + + class filestreambuf : public std::streambuf + { + /*! + INITIAL VALUE + - fd_in == the file descriptor we read from. + - fd_out == the file descriptor we write to. + - in_buffer == an array of in_buffer_size bytes + - out_buffer == an array of out_buffer_size bytes + + CONVENTION + - in_buffer == the input buffer used by this streambuf + - out_buffer == the output buffer used by this streambuf + - max_putback == the maximum number of chars to have in the put back buffer. + !*/ + + public: + + filestreambuf ( + int fd_in_, + int fd_out_ + ) : + fd_in(fd_in_), + fd_out(fd_out_), + out_buffer(0), + in_buffer(0) + { + init(); + } + + virtual ~filestreambuf ( + ) + { + sync(); + delete [] out_buffer; + delete [] in_buffer; + } + + int sync ( + ) + { + if (flush_out_buffer() == EOF) + { + // an error occurred + return -1; + } + return 0; + } + protected: + + void init ( + ) + { + try + { + out_buffer = new char[out_buffer_size]; + in_buffer = new char[in_buffer_size]; + } + catch (...) + { + if (out_buffer) delete [] out_buffer; + throw; + } + setp(out_buffer, out_buffer + (out_buffer_size-1)); + setg(in_buffer+max_putback, + in_buffer+max_putback, + in_buffer+max_putback); + } + + int flush_out_buffer ( + ) + { + int num = static_cast(pptr()-pbase()); + if (write(fd_out,out_buffer,num) != num) + { + // the write was not successful so return EOF + return EOF; + } + pbump(-num); + return num; + } + + // output functions + int_type overflow ( + int_type c + ) + { + if (c != EOF) + { + *pptr() = c; + pbump(1); + } + if (flush_out_buffer() == EOF) + { + // an error occurred + return EOF; + } + return c; + } + + + std::streamsize xsputn ( + const char* s, + std::streamsize num + ) + { + // Add a sanity check here + DLIB_ASSERT(num >= 0, + "\tstd::streamsize filestreambuf::xsputn" + << "\n\tThe number of bytes to write can't be negative" + << "\n\tnum: " << num + << "\n\tthis: " << this + ); + + std::streamsize space_left = static_cast(epptr()-pptr()); + if (num <= space_left) + { + std::memcpy(pptr(),s,static_cast(num)); + pbump(static_cast(num)); + return num; + } + else + { + std::memcpy(pptr(),s,static_cast(space_left)); + s += space_left; + pbump(space_left); + std::streamsize num_left = num - space_left; + + if (flush_out_buffer() == EOF) + { + // the write was not successful so return that 0 bytes were written + return 0; + } + + if (num_left < out_buffer_size) + { + std::memcpy(pptr(),s,static_cast(num_left)); + pbump(num_left); + return num; + } + else + { + if (write(fd_out,s,num_left) != num_left) + { + // the write was not successful so return that 0 bytes were written + return 0; + } + return num; + } + } + } + + // input functions + int_type underflow( + ) + { + if (gptr() < egptr()) + { + return static_cast(*gptr()); + } + + int num_put_back = static_cast(gptr() - eback()); + if (num_put_back > max_putback) + { + num_put_back = max_putback; + } + + // copy the putback characters into the putback end of the in_buffer + std::memmove(in_buffer+(max_putback-num_put_back), gptr()-num_put_back, num_put_back); + + + int num = read(fd_in,in_buffer+max_putback, in_buffer_size-max_putback); + if (num <= 0) + { + // an error occurred or the connection is over which is EOF + return EOF; + } + + // reset in_buffer pointers + setg (in_buffer+(max_putback-num_put_back), + in_buffer+max_putback, + in_buffer+max_putback+num); + + return static_cast(*gptr()); + } + + std::streamsize xsgetn ( + char_type* s, + std::streamsize n + ) + { + std::streamsize temp = n; + while (n > 0) + { + int num = static_cast(egptr() - gptr()); + if (num >= n) + { + // copy data from our buffer + std::memcpy(s, gptr(), static_cast(n)); + gbump(static_cast(n)); + return temp; + } + + // read more data into our buffer + if (num == 0) + { + if (underflow() == EOF) + break; + continue; + } + + // copy all the data from our buffer + std::memcpy(s, gptr(), num); + n -= num; + gbump(num); + s += num; + } + return temp-n; + } + + private: + + // member data + int fd_in; + int fd_out; + static const std::streamsize max_putback = 4; + static const std::streamsize out_buffer_size = 10000; + static const std::streamsize in_buffer_size = 10000; + char* out_buffer; + char* in_buffer; + + }; + +// ---------------------------------------------------------------------------------------- + + subprocess_stream:: + subprocess_stream(const char* program_name) : stderr(NULL), std::iostream(NULL) + { + if (access(program_name, F_OK)) + throw dlib::error("Error: '" + std::string(program_name) + "' file does not exist."); + if (access(program_name, X_OK)) + throw dlib::error("Error: '" + std::string(program_name) + "' file is not executable."); + + child_pid = fork(); + if (child_pid == -1) + throw dlib::error("Failed to start child process"); + + if (child_pid == 0) + { + // In child process + dup2(write_pipe.read_fd(), STDIN_FILENO); + dup2(read_pipe.write_fd(), STDOUT_FILENO); + dup2(err_pipe.write_fd(), STDERR_FILENO); + write_pipe.close(); + read_pipe.close(); + err_pipe.close(); + + char* argv[] = {(char*)program_name, nullptr}; + char* envp[] = {nullptr}; + + execve(argv[0], argv, envp); + // If launching the child didn't work then bail immediately so the parent + // process has no chance to get tweaked out (*cough* MATLAB *cough*). + _Exit(1); + } + else + { + // In parent process + close(write_pipe.read_fd()); + close(read_pipe.write_fd()); + close(err_pipe.write_fd()); + inout_buf = std::unique_ptr(new filestreambuf(read_pipe.read_fd(), write_pipe.write_fd())); + err_buf = std::unique_ptr(new filestreambuf(err_pipe.read_fd(), 0)); + this->rdbuf(inout_buf.get()); + stderr.rdbuf(err_buf.get()); + this->tie(this); + stderr.tie(this); + } + } + +// ---------------------------------------------------------------------------------------- + + subprocess_stream:: + ~subprocess_stream() + { + try + { + wait(); + } + catch (dlib::error& e) + { + std::cerr << e.what() << std::endl; + } + } + +// ---------------------------------------------------------------------------------------- + + void subprocess_stream:: + wait() + { + if (!wait_called) + { + wait_called = true; + send_eof(); + + std::ostringstream sout; + sout << stderr.rdbuf(); + + int status; + waitpid(child_pid, &status, 0); + if (status) + throw dlib::error("Child process terminated with an error.\n" + sout.str()); + + if (sout.str().size() != 0) + throw dlib::error("Child process terminated with an error.\n" + sout.str()); + } + } + +// ---------------------------------------------------------------------------------------- + + void subprocess_stream:: + send_eof() { inout_buf->sync(); ::close(write_pipe.write_fd()); } + +// ---------------------------------------------------------------------------------------- + +} + + diff --git a/dlib/matlab/subprocess_stream.h b/dlib/matlab/subprocess_stream.h new file mode 100644 index 000000000..6b358bcb9 --- /dev/null +++ b/dlib/matlab/subprocess_stream.h @@ -0,0 +1,211 @@ +// Copyright (C) 2016 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_SUBPROCeSS_STREAM_H_ +#define DLIB_SUBPROCeSS_STREAM_H_ + +#include +#include +#include +#include +#include + +namespace dlib +{ + +// -------------------------------------------------------------------------------------- + + // Call dlib's serialize and deserialize by default. The point of this version of + // serailize is to do something fast that normally we wouldn't do, like directly copy + // memory. This is safe since this is an interprocess communication happening the same + // machine. + template void interprocess_serialize ( const T& item, std::ostream& out) { serialize(item, out); } + template void interprocess_deserialize (T& item, std::istream& in) { deserialize(item, in); } + + // But have overloads for direct memory copies for some types since this is faster than + // their default serialization. + template + void interprocess_serialize(const dlib::matrix& item, std::ostream& out) + { + dlib::serialize(item.nr(), out); + dlib::serialize(item.nc(), out); + if (item.size() != 0) + out.write((const char*)&item(0,0), sizeof(T)*item.size()); + if (!out) + throw dlib::serialization_error("Error writing matrix to interprocess iostream."); + } + + template + void interprocess_deserialize(dlib::matrix& item, std::istream& in) + { + long nr, nc; + dlib::deserialize(nr, in); + dlib::deserialize(nc, in); + item.set_size(nr,nc); + if (item.size() != 0) + in.read((char*)&item(0,0), sizeof(T)*item.size()); + if (!in) + throw dlib::serialization_error("Error reading matrix from interprocess iostream."); + } + +// ---------------------------------------------------------------------------------------- + + inline void send_to_parent_process() {std::cout.flush();} + template + void send_to_parent_process(U&& arg1, T&& ...args) + /*! + ensures + - sends all the arguments to send_to_parent_process() to standard output (and + hence to the parent process) by serializing them with + interprocess_serialize(). + !*/ + { + interprocess_serialize(arg1, std::cout); + send_to_parent_process(std::forward(args)...); + if (!std::cout) + throw dlib::error("Error sending object to parent process."); + } + + inline void receive_from_parent_process() {} + template + void receive_from_parent_process(U&& arg1, T&& ...args) + /*! + ensures + - receives all the arguments to receive_from_parent_process() from standard + input (and hence from the parent process) by deserializing them with + interprocess_deserialize(). + !*/ + { + interprocess_deserialize(arg1, std::cin); + receive_from_parent_process(std::forward(args)...); + if (!std::cin) + throw dlib::error("Error receiving object from parent process."); + } + + +// ---------------------------------------------------------------------------------------- + + class filestreambuf; + + class subprocess_stream : public std::iostream + { + /*! + WHAT THIS OBJECT REPRESENTS + This is a tool for spawning a subprocess and communicating with it through + that processes standard input, output, and error. Here is an example: + + subprocess_stream s("/usr/bin/echo") + s << "echo me this!"; + string line; + getline(s, line); + cout << line << endl; + s.wait(); + + That example runs echo, sends it some text, gets it back, and prints it to + the screen. Then it waits for the subprocess to finish. + !*/ + + public: + + explicit subprocess_stream( + const char* program_name + ); + /*! + ensures + - spawns a sub process by executing the file with the given program_name. + !*/ + + ~subprocess_stream( + ); + /*! + ensures + - calls wait(). Note that the destructor never throws even though wait() can. + If an exception is thrown by wait() it is just logged to std::cerr. + !*/ + + void wait( + ); + /*! + ensures + - closes the standard input of the child process and then waits for the + child to terminate. + - If the child returns an error (by returning != 0 from its main) or + outputs to its standard error then wait() throws a dlib::error() with the + standard error output in it. + !*/ + + int get_child_pid() const { return child_pid; } + /*! + ensures + - returns the PID of the child process + !*/ + + template + void send(U&& arg1, T&& ...args) + /*! + ensures + - sends all the arguments to send() to the subprocess by serializing them + with interprocess_serialize(). + !*/ + { + interprocess_serialize(arg1, *this); + send(std::forward(args)...); + if (!this->good()) + { + std::ostringstream sout; + sout << stderr.rdbuf(); + throw dlib::error("Error sending object to child process.\n" + sout.str()); + } + } + void send() {this->flush();} + + template + void receive(U&& arg1, T&& ...args) + /*! + ensures + - receives all the arguments to receive() to the subprocess by deserializing + them with interprocess_deserialize(). + !*/ + { + interprocess_deserialize(arg1, *this); + receive(std::forward(args)...); + if (!this->good()) + { + std::ostringstream sout; + sout << stderr.rdbuf(); + throw dlib::error("Error receiving object from child process.\n" + sout.str() ); + } + } + void receive() {} + + + private: + + void send_eof(); + + class cpipe + { + private: + int fd[2]; + public: + cpipe() { if (pipe(fd)) throw dlib::error("Failed to create pipe"); } + ~cpipe() { close(); } + int read_fd() const { return fd[0]; } + int write_fd() const { return fd[1]; } + void close() { ::close(fd[0]); ::close(fd[1]); } + }; + + cpipe write_pipe; + cpipe read_pipe; + cpipe err_pipe; + bool wait_called = false; + std::unique_ptr inout_buf = NULL; + std::unique_ptr err_buf = NULL; + int child_pid = -1; + std::istream stderr; + }; +} + +// ---------------------------------------------------------------------------------------- + +#endif // DLIB_SUBPROCeSS_STREAM_H_ +