1
0

Compare commits

...

13 Commits

Author SHA1 Message Date
635590d37a implement log shipping config
to use this, you need config like for example

+  logging.shipping = {
+    enable = true;
+    service = longrun {
+      name = "ship-logs";
+      run = let path = lib.makeBinPath (with pkgs; [ s6 s6-networking s6 execline ]);
+            in ''
+        PATH=${path}:$PATH
+        s6-ipcserver -1 ${config.logging.shipping.socket} \
+        s6-tcpclient 10.0.2.2 19612 \
+        fdmove -c 1 7 cat
+      '';
+    };
+  };

but I think we can reduce the noise a bit if we use an s6-rc pipeline
with an s6-ipcserver on one side and and a (whatever the user wants)
on the other
2024-09-18 22:14:34 +01:00
17630f2678 rename logtee->logtap 2024-09-18 20:58:02 +01:00
707a471bc2 add logtee to catchall logger 2024-09-16 21:30:06 +01:00
d3fce5edd4 implement error() for musl 2024-09-16 20:35:23 +01:00
5771108fed improve logtee socket connection warning
* print it less often
* to the correct stream (stdout not stderr)
2024-09-16 20:34:26 +01:00
9e5f2d663d close socket fd if we can't connect it 2024-09-15 22:09:31 +01:00
21eeb1671e print diagnostic when eof on stderr 2024-09-15 21:59:24 +01:00
44762d38fc write start cookie when socket connect succeeds 2024-09-15 21:54:21 +01:00
1f6cfc3679 extract method is_connected 2024-09-15 21:40:05 +01:00
8ec00f1710 improve error message 2024-09-15 21:37:04 +01:00
6a6dd32dea make pollfd array global 2024-09-15 21:32:48 +01:00
9b1fc11a59 logshipper/logtee :copy stdin to stdout & to a unix socket if present
first draft
2024-09-15 19:33:21 +01:00
aaa6e353db incz is a very rudimentary log shipper for zinc search
although it probably would work with elasticsearch as well
as zinc is alleged to be ES-compatible

this is just the package and needs hooking into the service/log
infrastructure somehow
2024-09-08 16:38:37 +01:00
7 changed files with 296 additions and 3 deletions

View File

@ -1,4 +1,4 @@
{ config, pkgs, lib, ... }:
{ config, pkgs, lib, lim, ... }:
let
inherit (pkgs)
execline
@ -9,7 +9,8 @@ let
inherit (lib.lists) unique concatMap;
inherit (pkgs.pseudofile) dir symlink;
inherit (pkgs.liminix.services) oneshot bundle;
inherit (lib) mkIf mkEnableOption mkOption types;
cfg = config.logging;
s6-rc-db =
let
# In the default bundle we need to have all the services
@ -110,7 +111,13 @@ let
#!${execline}/bin/execlineb -P
${execline}/bin/redirfd -w 1 /dev/null
${execline}/bin/redirfd -rnb 0 fifo
${s6}/bin/s6-log -bpd3 -- t /run/uncaught-logs
${if cfg.shipping.enable then ''
pipeline { ${s6}/bin/s6-log -bpd3 -- ${cfg.script} 1 }
pipeline { ${pkgs.logshipper}/bin/logtap ${cfg.shipping.socket} logshipper-socket-event }
${s6}/bin/s6-log -- ${cfg.directory}
'' else ''
${s6}/bin/s6-log -bpd3 -- ${cfg.script} ${cfg.directory}
''}
'';
mode = "0755";
};
@ -203,6 +210,39 @@ let
};
};
in {
options = {
logging = {
shipping = {
enable = mkEnableOption "unix socket for log shipping";
socket = mkOption {
description = "socket pathname"; type = types.path;
default = "/run/.log-shipping.sock";
};
service = mkOption {
description = "log shipper service";
type = pkgs.liminix.lib.types.service;
};
};
script = mkOption {
description = "\"log script\" used by fallback s6-log process";
type = types.str;
default = "p${config.hostname} t";
};
directory = mkOption {
description = "default log directory";
default = "/run/log";
type = types.path;
};
};
};
imports = [
( {config, pkgs, lib, ...}:
let cfg = config.logging;
in mkIf cfg.shipping.enable {
services.${cfg.shipping.service.name} = cfg.shipping.service;
}
)];
config = {
filesystem = dir {
etc = dir {

View File

@ -71,12 +71,14 @@ in {
hi = callPackage ./hi { };
ifwait = callPackage ./ifwait { };
initramfs-peek = callPackage ./initramfs-peek { };
incz = callPackage ./incz { };
json-to-fstree = callPackage ./json-to-fstree { };
kernel-backport = callPackage ./kernel-backport { };
kmodloader = callPackage ./kmodloader { };
levitate = callPackage ./levitate { };
libubootenv = callPackage ./libubootenv { };
linotify = callPackage ./linotify { };
logshipper = callPackage ./logshipper { };
lualinux = callPackage ./lualinux { };
# we need to build real lzma instead of using xz, because the lzma

38
pkgs/incz/default.nix Normal file
View File

@ -0,0 +1,38 @@
{
fetchurl,
writeFennel,
fennel,
fennelrepl,
runCommand,
lua,
anoia,
lualinux,
stdenv
}:
let name = "incz";
in stdenv.mkDerivation {
inherit name;
src = ./.;
buildInputs = [lua];
nativeBuildInputs = [fennelrepl];
buildPhase = ''
fennelrepl --test ./incz.fnl
cp -p ${writeFennel name {
packages = [
anoia
lualinux
fennel
];
macros = [
anoia.dev
];
mainFunction = "run";
} ./incz.fnl } ${name}
'';
installPhase = ''
install -D ${name} $out/bin/${name}
'';
}

58
pkgs/incz/incz.fnl Normal file
View File

@ -0,0 +1,58 @@
(local { : base64 } (require :anoia))
(local ll (require :lualinux))
(fn index [str indexname]
(string.format "{\"index\":{\"_index\":%q}}\n%s" indexname str))
(local crlf "\r\n")
(fn chunk [str]
(let [len (# str)]
(string.format "%x%s%s%s" len crlf str crlf)))
(fn http-header [host path auth]
(string.format
"POST %s HTTP/1.1\r
Host: %s\
Authorization: basic %s
Transfer-Encoding: chunked\r
\r
"
path host
(let [b64 (base64 :url)] (b64:encode auth))))
(fn format-timestamp [timestamp]
;; I can't make zincsearch understand any epoch-based timestamp
;; formats, so we are formatting dates as iso-8601 and damn the leap
;; seconds :-(
(let [secs (- (tonumber (string.sub timestamp 1 16) 16)
(lshift 1 62))
nano (tonumber (string.sub timestamp 16 24) 16)
ts (+ (* secs 1000) (math.floor (/ nano 1000000)))]
(.. (os.date "!%FT%T" secs) "." nano "Z")))
(fn process-line [indexname hostname line]
(let [(timestamp service msg) (string.match line "@(%x+) (%g+) (.+)$")]
(->
(string.format
"{%q:%q,%q:%q,%q:%q,%q:%q}\n"
"@timestamp" (format-timestamp timestamp)
:service service
:message msg
:host hostname)
(index indexname)
chunk)))
(fn run []
(let [myhostname (with-open [h (io.popen "hostname" :r)] (h:read "l"))
(filename loghost credentials indexname) (table.unpack arg)]
(with-open [infile (assert (io.open filename :r))]
(ll.write 1 (http-header loghost "/api/_bulk" credentials))
(while (case (infile:read "l")
line (ll.write 1 (process-line indexname myhostname line))))
(ll.write 1 (chunk "")))
(io.stderr:write (ll.read 0))))
{ : run }

5
pkgs/logshipper/Makefile Normal file
View File

@ -0,0 +1,5 @@
TARGETS=logtap
default: $(TARGETS)
install: $(TARGETS)
install -Dt $(PREFIX)/bin/ logtap

View File

@ -0,0 +1,8 @@
{
stdenv
}:
stdenv.mkDerivation {
name = "logshipper";
makeFlags = [ "PREFIX=${placeholder "out"}" ];
src = ./.;
}

142
pkgs/logshipper/logtap.c Normal file
View File

@ -0,0 +1,142 @@
#include <poll.h>
#include <sys/timerfd.h>
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <errno.h>
#define PROGRAM_NAME "logtap"
#ifdef _GNU_SOURCE
#include <error.h>
#else
#include <stdarg.h>
static void error(int status, int errnum, const char * fmt, ...) {
va_list ap;
va_start(ap, fmt);
fprintf(stderr, PROGRAM_NAME ": ");
vfprintf(stderr, fmt, ap);
if(errnum) fprintf(stderr, ": %s", strerror(errnum));
fprintf(stderr, "\n");
if(status) exit(status);
}
#endif
int open_shipper_socket(char *pathname) {
int fd;
static int fail_count = 0;
struct sockaddr_un sa = {
.sun_family = AF_LOCAL
};
strncpy(sa.sun_path, pathname, sizeof(sa.sun_path) - 1);
fd = socket(AF_LOCAL, SOCK_STREAM, 0);
if(fd >= 0) {
if(connect(fd, (struct sockaddr *) &sa, sizeof sa)) {
if((fail_count % 30) == 0)
printf(PROGRAM_NAME ": cannot connect socket \"%s\": %s\n",
pathname,
strerror(errno));
fail_count++;
close(fd);
return -1;
}
int flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
return fd;
}
struct pollfd fds[] = {
{ .fd = 0, .events = POLLIN },
{ .fd = 1, .events = POLLERR },
{ .fd = -1, .events = POLLERR },
};
int is_connected(void) {
return (fds[2].fd >= 0);
}
int main(int argc, char * argv[]) {
char * buf = malloc(8192);
int out_bytes = 0;
int tee_bytes = 0;
if(argc != 3) {
error(1, 0, "usage: " PROGRAM_NAME " /path/to/socket cookie-text");
}
char * socket_pathname = argv[1];
char * cookie = argv[2];
char * start_cookie = malloc(strlen(cookie) + 8);
char * stop_cookie = malloc(strlen(cookie) + 7);
if(strlen(socket_pathname) > 108) {
error(1, 0, "socket pathname \"%s\" is too long, max 108 bytes",
socket_pathname);
};
strcpy(start_cookie, cookie); strcat(start_cookie, " START\n");
strcpy(stop_cookie, cookie); strcat(stop_cookie, " STOP\n");
signal(SIGPIPE, SIG_IGN);
int flags = fcntl(STDOUT_FILENO, F_GETFL);
fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK);
int quitting = 0;
while(! quitting) {
int nfds = poll(fds, 3, 2000);
if(nfds > 0) {
if((fds[0].revents & (POLLIN|POLLHUP)) &&
(out_bytes == 0) &&
(tee_bytes == 0)) {
out_bytes = read(fds[0].fd, buf, 8192);
if(out_bytes == 0) {
quitting = 1;
buf = PROGRAM_NAME " detected eof of file on stdin, exiting\n";
out_bytes = strlen(buf);
};
if(is_connected()) tee_bytes = out_bytes;
};
if(out_bytes) {
out_bytes -= write(fds[1].fd, buf, out_bytes);
};
if(fds[1].revents & (POLLERR|POLLHUP)) {
exit(1); // can't even log an error if the logging stream fails
};
if(is_connected()) {
if(tee_bytes) {
tee_bytes -= write(fds[2].fd, buf, tee_bytes);
};
if(fds[2].revents & (POLLERR|POLLHUP)) {
close(fds[2].fd);
fds[2].fd = -1;
(void) write(1, stop_cookie, strlen(stop_cookie));
};
};
} else {
if(! is_connected()) {
fds[2].fd = open_shipper_socket(argv[1]);
if(is_connected()) {
/* write cookie to stdout so that the backfill
* process knows we are now logging realtime
*/
write(fds[1].fd, start_cookie, strlen(start_cookie));
}
}
};
};
}