2
0

make logtap notice sooner when the shipper dies

fifo(7) says

       Under Linux, opening a FIFO for read and write will succeed both
       in blocking and nonblocking mode.  POSIX leaves this behavior
       undefined.  This can be used to open a FIFO for writing while
       there are no readers available.

       When a process tries to write to a FIFO that is not opened for
       read on the other side, the process is sent a SIGPIPE signal.

but ... that doesn't match what I saw, which was that it carried on
writing until the buffer filled and then it blocked or returned -1
(depending on O_NONBLOCK)

Switch to opening O_WRONLY | O_NONBLOCK and retrying periodically
if it fails
This commit is contained in:
2025-10-09 21:16:54 +01:00
parent 12704f1c4a
commit b49beb2c86
2 changed files with 77 additions and 81 deletions

View File

@@ -1,4 +1,5 @@
TARGETS=logtap
CFLAGS=-Wall
default: $(TARGETS)
install: $(TARGETS)

View File

@@ -24,22 +24,40 @@ static void error(int status, int errnum, const char* fmt, ...)
vfprintf(stderr, fmt, ap);
if (errnum)
fprintf(stderr, ": %s", strerror(errnum));
fprintf(stderr, "\n");
va_end(ap);
fprintf(stderr, "\n");
if (status)
exit(status);
}
#endif
#define FIFO_RETRY_TIMEOUT_MAX (50)
struct pollfd fds[] = {
{ .fd = 0, .events = POLLIN },
{ .fd = 1, .events = POLLERR },
{ .fd = -1, .events = POLLERR },
};
char *start_cookie, *stop_cookie;
static int fifo_connected(void)
{
return (fds[2].fd >= 0);
}
int fifo_retry_timeout = -1;
int open_shipper_fifo(char* pathname)
{
int fd = -1;
struct stat statbuf;
int fd;
if (stat(pathname, &statbuf)) {
switch (errno) {
case ENOENT:
if(mkfifo(pathname, 0700)) {
if (mkfifo(pathname, 0700)) {
error(1, errno, "mkfifo %s failed", pathname);
}
break;
@@ -52,119 +70,96 @@ int open_shipper_fifo(char* pathname)
error(1, errno, "%s exists already and is not a fifo", pathname);
}
}
if (fd < 0) {
fd = open(pathname, O_NONBLOCK | O_RDWR, 0);
if (fd < 0)
error(1, errno, "failed to open fifo %s", pathname);
};
fd = open(pathname, O_WRONLY | O_NONBLOCK, 0);
if (fd >= 0) {
fifo_retry_timeout = -1;
/* write cookie to stdout so that the backfill process knows
* we are now logging realtime
*/
write(fds[1].fd, start_cookie, strlen(start_cookie));
} else {
fifo_retry_timeout = FIFO_RETRY_TIMEOUT_MAX;
}
return fd;
}
struct pollfd fds[] = {
{ .fd = 0, .events = POLLIN },
{ .fd = 1, .events = POLLERR },
{ .fd = -1, .events = POLLERR },
};
#define FIFO_STATE_GOOD (-1)
#define FIFO_STATE_TIMEOUT_EXPIRED (0)
#define FIFO_STATE_TIMEOUT_MAX (50) /* ? probably going to depend on log volume */
char *start_cookie, *stop_cookie;
static int fifo_state = FIFO_STATE_TIMEOUT_EXPIRED;
int write_fifo(int fd, char* buf, int count)
{
int written_bytes = 0;
if (fifo_state == FIFO_STATE_GOOD) {
written_bytes = write(fd, buf, count);
if (written_bytes == -1) {
fifo_state = FIFO_STATE_TIMEOUT_MAX;
write(1, stop_cookie, strlen(stop_cookie));
}
} else if (fifo_state > 0) {
fifo_state--;
} else if (fifo_state == FIFO_STATE_TIMEOUT_EXPIRED) {
written_bytes = write(fd, buf, count);
if (written_bytes >= 0) {
fifo_state = FIFO_STATE_GOOD;
write(1, start_cookie, strlen(start_cookie));
} else {
fifo_state = FIFO_STATE_TIMEOUT_MAX;
/* don't log again, we're in this state because it was bad
already, and it's still bad */
}
} else {
error(1, 0, "impossible(sic) fifo state %d", fifo_state);
};
/* if the fifo can't be written, pretend to caller that we wrote
everything so that it doesn't back up. the backfill process
will write these entries later when the shipper is online
again */
return (fifo_state == FIFO_STATE_GOOD) ? written_bytes : count;
}
#define WRITE_LITERAL(fd, c) write(fd, c, sizeof c)
int main(int argc, char* argv[])
{
char* buf = malloc(8192);
int out_bytes = 0;
int fifo_bytes = 0;
int tee_bytes = 0;
if (argc != 3) {
error(1, 0, "usage: " PROGRAM_NAME " /path/to/fifo cookie-text");
}
char* fifo_pathname = argv[1];
char* cookie = argv[2];
start_cookie = malloc(strlen(cookie) + 8);
stop_cookie = malloc(strlen(cookie) + 7);
start_cookie = malloc(strlen(cookie) + 9);
stop_cookie = malloc(strlen(cookie) + 8);
strcpy(start_cookie, cookie);
strcpy(start_cookie, "\n");
strcat(start_cookie, cookie);
strcat(start_cookie, " START\n");
strcpy(stop_cookie, cookie);
strcpy(stop_cookie, "\n");
strcat(stop_cookie, cookie);
strcat(stop_cookie, " STOP\n");
signal(SIGPIPE, SIG_IGN);
fds[2].fd = open_shipper_fifo(fifo_pathname);
int flags = fcntl(STDOUT_FILENO, F_GETFL);
fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK);
fds[2].fd = open_shipper_fifo(argv[1]);
int quitting = 0;
while (!quitting) {
int nfds = poll(fds, 3, 2000);
if (nfds > 0) {
if ((fds[0].revents & (POLLIN | POLLHUP)) && (out_bytes == 0) && (fifo_bytes == 0)) {
out_bytes = read(fds[0].fd, buf, 8192);
if (out_bytes == 0) {
quitting = 1;
WRITE_LITERAL(1, PROGRAM_NAME " detected eof of file on stdin, exiting\n");
};
fifo_bytes = out_bytes;
};
if (fds[1].revents & (POLLERR | POLLHUP)) {
exit(1); // can't even log an error if the logging stream fails
};
if (fds[2].revents & (POLLERR | POLLHUP)) {
error(1, 0, "error or hangup writing to log fifo (revents=%d)", fds[2].revents);
if (fifo_connected() && (fds[2].revents & (POLLERR | POLLHUP))) {
close(fds[2].fd);
fds[2].fd = -1;
tee_bytes = 0;
/* FIXME: nonblocking write, if stdout is not ready
* we will lose the message */
(void)write(1, stop_cookie, strlen(stop_cookie));
fifo_retry_timeout = FIFO_RETRY_TIMEOUT_MAX;
};
if ((fds[0].revents & (POLLIN | POLLHUP)) && (out_bytes == 0) && (tee_bytes == 0)) {
out_bytes = read(fds[0].fd, buf, 8192);
if (out_bytes == 0) {
quitting = 1;
buf = PROGRAM_NAME " detected eof of file on stdin, exiting\n";
out_bytes = strlen(buf);
};
if (fifo_connected())
tee_bytes = out_bytes;
};
if (out_bytes) {
out_bytes -= write(fds[1].fd, buf, out_bytes);
};
if (fifo_bytes) {
fifo_bytes -= write_fifo(fds[2].fd, buf, fifo_bytes);
if (tee_bytes) {
int written = write(fds[2].fd, buf, tee_bytes);
if (written >= 0)
tee_bytes -= written;
};
if (out_bytes == 0 && fifo_retry_timeout > 0) {
fifo_retry_timeout--;
if (fifo_retry_timeout == 0) {
fds[2].fd = open_shipper_fifo(argv[1]);
};
};
} else {
/* poll timed out, may as well try and see if the shipper
* is alive again
*/
if (fifo_state > FIFO_STATE_TIMEOUT_EXPIRED)
fifo_state = FIFO_STATE_TIMEOUT_EXPIRED;
if (!fifo_connected()) {
fds[2].fd = open_shipper_fifo(argv[1]);
}
};
};
}