Compare commits
13 Commits
69bf6cb5fb
...
635590d37a
Author | SHA1 | Date | |
---|---|---|---|
635590d37a | |||
17630f2678 | |||
707a471bc2 | |||
d3fce5edd4 | |||
5771108fed | |||
9e5f2d663d | |||
21eeb1671e | |||
44762d38fc | |||
1f6cfc3679 | |||
8ec00f1710 | |||
6a6dd32dea | |||
9b1fc11a59 | |||
aaa6e353db |
@ -1,4 +1,4 @@
|
||||
{ config, pkgs, lib, ... }:
|
||||
{ config, pkgs, lib, lim, ... }:
|
||||
let
|
||||
inherit (pkgs)
|
||||
execline
|
||||
@ -9,7 +9,8 @@ let
|
||||
inherit (lib.lists) unique concatMap;
|
||||
inherit (pkgs.pseudofile) dir symlink;
|
||||
inherit (pkgs.liminix.services) oneshot bundle;
|
||||
|
||||
inherit (lib) mkIf mkEnableOption mkOption types;
|
||||
cfg = config.logging;
|
||||
s6-rc-db =
|
||||
let
|
||||
# In the default bundle we need to have all the services
|
||||
@ -110,7 +111,13 @@ let
|
||||
#!${execline}/bin/execlineb -P
|
||||
${execline}/bin/redirfd -w 1 /dev/null
|
||||
${execline}/bin/redirfd -rnb 0 fifo
|
||||
${s6}/bin/s6-log -bpd3 -- t /run/uncaught-logs
|
||||
${if cfg.shipping.enable then ''
|
||||
pipeline { ${s6}/bin/s6-log -bpd3 -- ${cfg.script} 1 }
|
||||
pipeline { ${pkgs.logshipper}/bin/logtap ${cfg.shipping.socket} logshipper-socket-event }
|
||||
${s6}/bin/s6-log -- ${cfg.directory}
|
||||
'' else ''
|
||||
${s6}/bin/s6-log -bpd3 -- ${cfg.script} ${cfg.directory}
|
||||
''}
|
||||
'';
|
||||
mode = "0755";
|
||||
};
|
||||
@ -203,6 +210,39 @@ let
|
||||
};
|
||||
};
|
||||
in {
|
||||
options = {
|
||||
logging = {
|
||||
shipping = {
|
||||
enable = mkEnableOption "unix socket for log shipping";
|
||||
socket = mkOption {
|
||||
description = "socket pathname"; type = types.path;
|
||||
default = "/run/.log-shipping.sock";
|
||||
};
|
||||
service = mkOption {
|
||||
description = "log shipper service";
|
||||
type = pkgs.liminix.lib.types.service;
|
||||
};
|
||||
};
|
||||
script = mkOption {
|
||||
description = "\"log script\" used by fallback s6-log process";
|
||||
type = types.str;
|
||||
default = "p${config.hostname} t";
|
||||
};
|
||||
directory = mkOption {
|
||||
description = "default log directory";
|
||||
default = "/run/log";
|
||||
type = types.path;
|
||||
};
|
||||
};
|
||||
};
|
||||
imports = [
|
||||
( {config, pkgs, lib, ...}:
|
||||
let cfg = config.logging;
|
||||
in mkIf cfg.shipping.enable {
|
||||
services.${cfg.shipping.service.name} = cfg.shipping.service;
|
||||
}
|
||||
)];
|
||||
|
||||
config = {
|
||||
filesystem = dir {
|
||||
etc = dir {
|
||||
|
@ -71,12 +71,14 @@ in {
|
||||
hi = callPackage ./hi { };
|
||||
ifwait = callPackage ./ifwait { };
|
||||
initramfs-peek = callPackage ./initramfs-peek { };
|
||||
incz = callPackage ./incz { };
|
||||
json-to-fstree = callPackage ./json-to-fstree { };
|
||||
kernel-backport = callPackage ./kernel-backport { };
|
||||
kmodloader = callPackage ./kmodloader { };
|
||||
levitate = callPackage ./levitate { };
|
||||
libubootenv = callPackage ./libubootenv { };
|
||||
linotify = callPackage ./linotify { };
|
||||
logshipper = callPackage ./logshipper { };
|
||||
lualinux = callPackage ./lualinux { };
|
||||
|
||||
# we need to build real lzma instead of using xz, because the lzma
|
||||
|
38
pkgs/incz/default.nix
Normal file
38
pkgs/incz/default.nix
Normal file
@ -0,0 +1,38 @@
|
||||
{
|
||||
fetchurl,
|
||||
writeFennel,
|
||||
fennel,
|
||||
fennelrepl,
|
||||
runCommand,
|
||||
lua,
|
||||
anoia,
|
||||
lualinux,
|
||||
stdenv
|
||||
}:
|
||||
let name = "incz";
|
||||
in stdenv.mkDerivation {
|
||||
inherit name;
|
||||
src = ./.;
|
||||
|
||||
buildInputs = [lua];
|
||||
nativeBuildInputs = [fennelrepl];
|
||||
|
||||
buildPhase = ''
|
||||
fennelrepl --test ./incz.fnl
|
||||
cp -p ${writeFennel name {
|
||||
packages = [
|
||||
anoia
|
||||
lualinux
|
||||
fennel
|
||||
];
|
||||
macros = [
|
||||
anoia.dev
|
||||
];
|
||||
mainFunction = "run";
|
||||
} ./incz.fnl } ${name}
|
||||
'';
|
||||
|
||||
installPhase = ''
|
||||
install -D ${name} $out/bin/${name}
|
||||
'';
|
||||
}
|
58
pkgs/incz/incz.fnl
Normal file
58
pkgs/incz/incz.fnl
Normal file
@ -0,0 +1,58 @@
|
||||
(local { : base64 } (require :anoia))
|
||||
(local ll (require :lualinux))
|
||||
|
||||
(fn index [str indexname]
|
||||
(string.format "{\"index\":{\"_index\":%q}}\n%s" indexname str))
|
||||
|
||||
(local crlf "\r\n")
|
||||
|
||||
(fn chunk [str]
|
||||
(let [len (# str)]
|
||||
(string.format "%x%s%s%s" len crlf str crlf)))
|
||||
|
||||
(fn http-header [host path auth]
|
||||
(string.format
|
||||
"POST %s HTTP/1.1\r
|
||||
Host: %s\
|
||||
Authorization: basic %s
|
||||
Transfer-Encoding: chunked\r
|
||||
\r
|
||||
"
|
||||
path host
|
||||
(let [b64 (base64 :url)] (b64:encode auth))))
|
||||
|
||||
(fn format-timestamp [timestamp]
|
||||
;; I can't make zincsearch understand any epoch-based timestamp
|
||||
;; formats, so we are formatting dates as iso-8601 and damn the leap
|
||||
;; seconds :-(
|
||||
(let [secs (- (tonumber (string.sub timestamp 1 16) 16)
|
||||
(lshift 1 62))
|
||||
nano (tonumber (string.sub timestamp 16 24) 16)
|
||||
ts (+ (* secs 1000) (math.floor (/ nano 1000000)))]
|
||||
(.. (os.date "!%FT%T" secs) "." nano "Z")))
|
||||
|
||||
(fn process-line [indexname hostname line]
|
||||
(let [(timestamp service msg) (string.match line "@(%x+) (%g+) (.+)$")]
|
||||
(->
|
||||
(string.format
|
||||
"{%q:%q,%q:%q,%q:%q,%q:%q}\n"
|
||||
"@timestamp" (format-timestamp timestamp)
|
||||
:service service
|
||||
:message msg
|
||||
:host hostname)
|
||||
(index indexname)
|
||||
chunk)))
|
||||
|
||||
(fn run []
|
||||
(let [myhostname (with-open [h (io.popen "hostname" :r)] (h:read "l"))
|
||||
(filename loghost credentials indexname) (table.unpack arg)]
|
||||
|
||||
(with-open [infile (assert (io.open filename :r))]
|
||||
(ll.write 1 (http-header loghost "/api/_bulk" credentials))
|
||||
(while (case (infile:read "l")
|
||||
line (ll.write 1 (process-line indexname myhostname line))))
|
||||
(ll.write 1 (chunk "")))
|
||||
(io.stderr:write (ll.read 0))))
|
||||
|
||||
|
||||
{ : run }
|
5
pkgs/logshipper/Makefile
Normal file
5
pkgs/logshipper/Makefile
Normal file
@ -0,0 +1,5 @@
|
||||
TARGETS=logtap
|
||||
default: $(TARGETS)
|
||||
|
||||
install: $(TARGETS)
|
||||
install -Dt $(PREFIX)/bin/ logtap
|
8
pkgs/logshipper/default.nix
Normal file
8
pkgs/logshipper/default.nix
Normal file
@ -0,0 +1,8 @@
|
||||
{
|
||||
stdenv
|
||||
}:
|
||||
stdenv.mkDerivation {
|
||||
name = "logshipper";
|
||||
makeFlags = [ "PREFIX=${placeholder "out"}" ];
|
||||
src = ./.;
|
||||
}
|
142
pkgs/logshipper/logtap.c
Normal file
142
pkgs/logshipper/logtap.c
Normal file
@ -0,0 +1,142 @@
|
||||
#include <poll.h>
|
||||
#include <sys/timerfd.h>
|
||||
#include <time.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <signal.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/un.h>
|
||||
#include <errno.h>
|
||||
|
||||
#define PROGRAM_NAME "logtap"
|
||||
|
||||
#ifdef _GNU_SOURCE
|
||||
#include <error.h>
|
||||
#else
|
||||
#include <stdarg.h>
|
||||
static void error(int status, int errnum, const char * fmt, ...) {
|
||||
va_list ap;
|
||||
va_start(ap, fmt);
|
||||
|
||||
fprintf(stderr, PROGRAM_NAME ": ");
|
||||
vfprintf(stderr, fmt, ap);
|
||||
if(errnum) fprintf(stderr, ": %s", strerror(errnum));
|
||||
fprintf(stderr, "\n");
|
||||
if(status) exit(status);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
int open_shipper_socket(char *pathname) {
|
||||
int fd;
|
||||
static int fail_count = 0;
|
||||
|
||||
struct sockaddr_un sa = {
|
||||
.sun_family = AF_LOCAL
|
||||
};
|
||||
strncpy(sa.sun_path, pathname, sizeof(sa.sun_path) - 1);
|
||||
|
||||
fd = socket(AF_LOCAL, SOCK_STREAM, 0);
|
||||
if(fd >= 0) {
|
||||
if(connect(fd, (struct sockaddr *) &sa, sizeof sa)) {
|
||||
if((fail_count % 30) == 0)
|
||||
printf(PROGRAM_NAME ": cannot connect socket \"%s\": %s\n",
|
||||
pathname,
|
||||
strerror(errno));
|
||||
|
||||
fail_count++;
|
||||
close(fd);
|
||||
return -1;
|
||||
}
|
||||
int flags = fcntl(fd, F_GETFL);
|
||||
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
||||
}
|
||||
return fd;
|
||||
}
|
||||
|
||||
struct pollfd fds[] = {
|
||||
{ .fd = 0, .events = POLLIN },
|
||||
{ .fd = 1, .events = POLLERR },
|
||||
{ .fd = -1, .events = POLLERR },
|
||||
};
|
||||
|
||||
int is_connected(void) {
|
||||
return (fds[2].fd >= 0);
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char * argv[]) {
|
||||
|
||||
char * buf = malloc(8192);
|
||||
int out_bytes = 0;
|
||||
int tee_bytes = 0;
|
||||
|
||||
if(argc != 3) {
|
||||
error(1, 0, "usage: " PROGRAM_NAME " /path/to/socket cookie-text");
|
||||
}
|
||||
char * socket_pathname = argv[1];
|
||||
char * cookie = argv[2];
|
||||
char * start_cookie = malloc(strlen(cookie) + 8);
|
||||
char * stop_cookie = malloc(strlen(cookie) + 7);
|
||||
|
||||
if(strlen(socket_pathname) > 108) {
|
||||
error(1, 0, "socket pathname \"%s\" is too long, max 108 bytes",
|
||||
socket_pathname);
|
||||
};
|
||||
|
||||
strcpy(start_cookie, cookie); strcat(start_cookie, " START\n");
|
||||
strcpy(stop_cookie, cookie); strcat(stop_cookie, " STOP\n");
|
||||
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
int flags = fcntl(STDOUT_FILENO, F_GETFL);
|
||||
fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK);
|
||||
|
||||
int quitting = 0;
|
||||
while(! quitting) {
|
||||
int nfds = poll(fds, 3, 2000);
|
||||
if(nfds > 0) {
|
||||
if((fds[0].revents & (POLLIN|POLLHUP)) &&
|
||||
(out_bytes == 0) &&
|
||||
(tee_bytes == 0)) {
|
||||
out_bytes = read(fds[0].fd, buf, 8192);
|
||||
if(out_bytes == 0) {
|
||||
quitting = 1;
|
||||
buf = PROGRAM_NAME " detected eof of file on stdin, exiting\n";
|
||||
out_bytes = strlen(buf);
|
||||
};
|
||||
if(is_connected()) tee_bytes = out_bytes;
|
||||
};
|
||||
|
||||
if(out_bytes) {
|
||||
out_bytes -= write(fds[1].fd, buf, out_bytes);
|
||||
};
|
||||
if(fds[1].revents & (POLLERR|POLLHUP)) {
|
||||
exit(1); // can't even log an error if the logging stream fails
|
||||
};
|
||||
if(is_connected()) {
|
||||
if(tee_bytes) {
|
||||
tee_bytes -= write(fds[2].fd, buf, tee_bytes);
|
||||
};
|
||||
if(fds[2].revents & (POLLERR|POLLHUP)) {
|
||||
close(fds[2].fd);
|
||||
fds[2].fd = -1;
|
||||
(void) write(1, stop_cookie, strlen(stop_cookie));
|
||||
};
|
||||
};
|
||||
} else {
|
||||
if(! is_connected()) {
|
||||
fds[2].fd = open_shipper_socket(argv[1]);
|
||||
if(is_connected()) {
|
||||
/* write cookie to stdout so that the backfill
|
||||
* process knows we are now logging realtime
|
||||
*/
|
||||
write(fds[1].fd, start_cookie, strlen(start_cookie));
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
Loading…
Reference in New Issue
Block a user