/* $OpenBSD: queue_proc.c,v 1.11 2024/05/07 12:10:06 op Exp $ */ /* * Copyright (c) 2013 Eric Faurot * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #include #include #include #include "smtpd.h" #include "log.h" static struct imsgbuf ibuf; static struct imsg imsg; static size_t rlen; static char *rdata; static void queue_proc_call(void) { ssize_t n; if (imsg_flush(&ibuf) == -1) { log_warn("warn: queue-proc: imsg_flush"); fatalx("queue-proc: exiting"); } while (1) { if ((n = imsg_get(&ibuf, &imsg)) == -1) { log_warn("warn: queue-proc: imsg_get"); break; } if (n) { rlen = imsg.hdr.len - IMSG_HEADER_SIZE; rdata = imsg.data; if (imsg.hdr.type != PROC_QUEUE_OK) { log_warnx("warn: queue-proc: bad response"); break; } return; } if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) { log_warn("warn: queue-proc: imsg_read"); break; } if (n == 0) { log_warnx("warn: queue-proc: pipe closed"); break; } } fatalx("queue-proc: exiting"); } static void queue_proc_read(void *dst, size_t len) { if (len > rlen) { log_warnx("warn: queue-proc: bad msg len"); fatalx("queue-proc: exiting"); } memmove(dst, rdata, len); rlen -= len; rdata += len; } static void queue_proc_end(void) { if (rlen) { log_warnx("warn: queue-proc: bogus data"); fatalx("queue-proc: exiting"); } imsg_free(&imsg); } /* * API */ static int queue_proc_close(void) { int r; imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0); queue_proc_call(); queue_proc_read(&r, sizeof(r)); queue_proc_end(); return (r); } static int queue_proc_message_create(uint32_t *msgid) { int r; imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0); queue_proc_call(); queue_proc_read(&r, sizeof(r)); if (r == 1) queue_proc_read(msgid, sizeof(*msgid)); queue_proc_end(); return (r); } static int queue_proc_message_commit(uint32_t msgid, const char *path) { int r, fd; fd = open(path, O_RDONLY); if (fd == -1) { log_warn("queue-proc: open: %s", path); return (0); } imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid, sizeof(msgid)); queue_proc_call(); queue_proc_read(&r, sizeof(r)); queue_proc_end(); return (r); } static int queue_proc_message_delete(uint32_t msgid) { int r; imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid, sizeof(msgid)); queue_proc_call(); queue_proc_read(&r, sizeof(r)); queue_proc_end(); return (r); } static int queue_proc_message_fd_r(uint32_t msgid) { imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid, sizeof(msgid)); queue_proc_call(); queue_proc_end(); return (imsg_get_fd(&imsg)); } static int queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len, uint64_t *evpid) { struct ibuf *b; int r; msgid = evpid_to_msgid(*evpid); b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0, sizeof(msgid) + len); if (imsg_add(b, &msgid, sizeof(msgid)) == -1 || imsg_add(b, buf, len) == -1) return (0); imsg_close(&ibuf, b); queue_proc_call(); queue_proc_read(&r, sizeof(r)); if (r == 1) queue_proc_read(evpid, sizeof(*evpid)); queue_proc_end(); return (r); } static int queue_proc_envelope_delete(uint64_t evpid) { int r; imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid, sizeof(evpid)); queue_proc_call(); queue_proc_read(&r, sizeof(r)); queue_proc_end(); return (r); } static int queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len) { struct ibuf *b; int r; b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0, len + sizeof(evpid)); if (imsg_add(b, &evpid, sizeof(evpid)) == -1 || imsg_add(b, buf, len) == -1) return (0); imsg_close(&ibuf, b); queue_proc_call(); queue_proc_read(&r, sizeof(r)); queue_proc_end(); return (r); } static int queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len) { int r; imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid, sizeof(evpid)); queue_proc_call(); if (rlen > len) { log_warnx("warn: queue-proc: buf too small"); fatalx("queue-proc: exiting"); } r = rlen; queue_proc_read(buf, rlen); queue_proc_end(); return (r); } static int queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len) { int r; imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0); queue_proc_call(); queue_proc_read(&r, sizeof(r)); if (r > 0) { queue_proc_read(evpid, sizeof(*evpid)); if (rlen > len) { log_warnx("warn: queue-proc: buf too small"); fatalx("queue-proc: exiting"); } if (r != (int)rlen) { log_warnx("warn: queue-proc: len mismatch"); fatalx("queue-proc: exiting"); } queue_proc_read(buf, rlen); } queue_proc_end(); return (r); } static int queue_proc_init(struct passwd *pw, int server, const char *conf) { uint32_t version; int fd; fd = fork_proc_backend("queue", conf, "queue-proc", 0); if (fd == -1) fatalx("queue-proc: exiting"); imsg_init(&ibuf, fd); version = PROC_QUEUE_API_VERSION; imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1, &version, sizeof(version)); queue_api_on_close(queue_proc_close); queue_api_on_message_create(queue_proc_message_create); queue_api_on_message_commit(queue_proc_message_commit); queue_api_on_message_delete(queue_proc_message_delete); queue_api_on_message_fd_r(queue_proc_message_fd_r); queue_api_on_envelope_create(queue_proc_envelope_create); queue_api_on_envelope_delete(queue_proc_envelope_delete); queue_api_on_envelope_update(queue_proc_envelope_update); queue_api_on_envelope_load(queue_proc_envelope_load); queue_api_on_envelope_walk(queue_proc_envelope_walk); queue_proc_call(); queue_proc_end(); return (1); } struct queue_backend queue_backend_proc = { queue_proc_init, };