billywhizz
5/24/2011 - 12:17 AM

sendfile to a socket

sendfile to a socket

var http = require("http");
var fs = require("fs");
var constants = process.binding("constants");

function f2s(fd, socket, off, len, chunked, chunksize, cb) {
	var twrite = 0;
	var chunksize = len < chunksize?len:chunksize;
	function chunk() {
		try {
			if(chunked) {
				if(socket.fd) {
					console.log("[" + len + ":" + off + ":" + chunksize + "]");
					socket.write(chunksize.toString(16) + "\r\n", "ascii", function() {
						fs.sendfile(socket.fd, fd, off, chunksize, function(ex, written) {
							if(ex && (ex.errno == constants.EINTR || ex.errno == constants.EAGAIN)) {
								socket.once("drain", chunk);
								socket._writeWatcher.start();
							}
							else if(ex) {
								cb(ex, twrite);
							}
							else {
								socket.write("\r\n", "ascii", function() {
									twrite += written;
									if(twrite < len) {
										off += written;
										process.nextTick(chunk);
									}
									else {
										socket.write("0\r\n\r\n");
										cb(null, twrite);
									}
								});
							}
						});
					});
				}
				else {
					cb(new Error("socket not connected"), twrite);
				}
			}
			else {
				if(socket.fd) {
					fs.sendfile(socket.fd, fd, off, len, function(ex, written) {
						if(ex && (ex.errno == constants.EINTR || ex.errno == constants.EAGAIN)) {
							socket.once("drain", chunk);
							socket._writeWatcher.start();
						}
						else if(ex) {
							cb(ex, twrite);
						}
						else {
							twrite += written;
							if(written < len) {
								off += written;
								len -= written;
								process.nextTick(chunk);
							}
							else {
								cb(null, twrite);
							}
						}
					});
				}
				else {
					cb(new Error("socket not connected"), twrite);
				}
			}
		}
		catch(ex) {
			cb(ex, twrite);
		}
	}
	chunk();
}

http.OutgoingMessage.prototype.sendfile = function(fd, off, len, chunksize, cb) {
	if (!this._header) {
		this._implicitHeader();
	}
	if (!this._headerSent) {
		this.output.unshift(this._header);
		this.outputEncodings.unshift('ascii');
		this._headerSent = true;
		this.finished = false;
		this.once("drain", function() {
			var _response = this;
			f2s(fd, this.connection, off, len, this.chunkedEncoding, chunksize, function(err, written) {
				_response.finished = true;
				_response._headerSent = false;
				_response.connection._httpMessage = null;
				cb(err, written);
			});
		});
		this._flush();
	}
};
var fs = require("fs");
var constants = process.binding("constants");

exports.f2s = function(file, socket, off, len, cb, chunked) {
	var twrite = 0;
	var chunksize = len < 4096?len:4096;
	function chunk() {
		try {
			if(chunked) {
				if(off + chunksize > len) chunksize = len - off;
				socket.write(chunksize.toString(16) + "\r\n");
				if(socket.fd) {
					fs.sendfile(socket.fd, file.fd, off, chunksize, function(ex, written) {
						if(ex && (ex.errno == constants.EINTR || ex.errno == constants.EAGAIN)) {
							socket.once("drain", chunk);
							socket._writeWatcher.start();
						}
						else if(ex) {
							socket.end();
							cb(ex, twrite);
						}
						else {
							socket.write("\r\n");
							twrite += written;
							if(twrite < len) {
								off += written;
								process.nextTick(chunk);
							}
							else {
								socket.write("0\r\n\r\n");
								cb(null, twrite);
							}
						}
					});
				}
				else {
					cb(new Error("socket not connected"), twrite);
				}
			}
			else {
				if(socket.fd) {
					fs.sendfile(socket.fd, file.fd, off, len, function(ex, written) {
						if(ex && (ex.errno == constants.EINTR || ex.errno == constants.EAGAIN)) {
							socket.once("drain", chunk);
							socket._writeWatcher.start();
						}
						else if(ex) {
							socket.end();
							cb(ex, twrite);
						}
						else {
							twrite += written;
							if(written < len) {
								off += written;
								len -= written;
								process.nextTick(chunk);
							}
							else {
								cb(null, twrite);
							}
						}
					});
				}
				else {
					cb(new Error("socket not connected"), twrite);
				}
			}
		}
		catch(ex) {
			socket.end();
			cb(ex, twrite);
		}
	}
	chunk();
}