1 /* $NetBSD: qmgr_message.c,v 1.5 2025/02/25 19:15:49 christos Exp $ */
2
3 /*++
4 /* NAME
5 /* qmgr_message 3
6 /* SUMMARY
7 /* in-core message structures
8 /* SYNOPSIS
9 /* #include "qmgr.h"
10 /*
11 /* int qmgr_message_count;
12 /* int qmgr_recipient_count;
13 /* int qmgr_vrfy_pend_count;
14 /*
15 /* QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode)
16 /* const char *class;
17 /* const char *name;
18 /* int qflags;
19 /* mode_t mode;
20 /*
21 /* QMGR_MESSAGE *qmgr_message_realloc(message)
22 /* QMGR_MESSAGE *message;
23 /*
24 /* void qmgr_message_free(message)
25 /* QMGR_MESSAGE *message;
26 /*
27 /* void qmgr_message_update_warn(message)
28 /* QMGR_MESSAGE *message;
29 /*
30 /* void qmgr_message_kill_record(message, offset)
31 /* QMGR_MESSAGE *message;
32 /* long offset;
33 /* DESCRIPTION
34 /* This module performs en-gross operations on queue messages.
35 /*
36 /* qmgr_message_count is a global counter for the total number
37 /* of in-core message structures (i.e. the total size of the
38 /* `active' message queue).
39 /*
40 /* qmgr_recipient_count is a global counter for the total number
41 /* of in-core recipient structures (i.e. the sum of all recipients
42 /* in all in-core message structures).
43 /*
44 /* qmgr_vrfy_pend_count is a global counter for the total
45 /* number of in-core message structures that are associated
46 /* with an address verification request. Requests that exceed
47 /* the address_verify_pending_limit are deferred immediately.
48 /* This is a backup mechanism for a more refined enforcement
49 /* mechanism in the verify(8) daemon.
50 /*
51 /* qmgr_message_alloc() creates an in-core message structure
52 /* with sender and recipient information taken from the named queue
53 /* file. A null result means the queue file could not be read or
54 /* that the queue file contained incorrect information. A result
55 /* QMGR_MESSAGE_LOCKED means delivery must be deferred. The number
56 /* of recipients read from a queue file is limited by the global
57 /* var_qmgr_rcpt_limit configuration parameter. When the limit
58 /* is reached, the \fIrcpt_offset\fR structure member is set to
59 /* the position where the read was terminated. Recipients are
60 /* run through the resolver, and are assigned to destination
61 /* queues. Recipients that cannot be assigned are deferred or
62 /* bounced. Mail that has bounced twice is silently absorbed.
63 /* A non-zero mode means change the queue file permissions.
64 /*
65 /* qmgr_message_realloc() resumes reading recipients from the queue
66 /* file, and updates the recipient list and \fIrcpt_offset\fR message
67 /* structure members. A null result means that the file could not be
68 /* read or that the file contained incorrect information. Recipient
69 /* limit imposed this time is based on the position of the message
70 /* job(s) on corresponding transport job list(s). It's considered
71 /* an error to call this when the recipient slots can't be allocated.
72 /*
73 /* qmgr_message_free() destroys an in-core message structure and makes
74 /* the resources available for reuse. It is an error to destroy
75 /* a message structure that is still referenced by queue entry structures.
76 /*
77 /* qmgr_message_update_warn() takes a closed message, opens it, updates
78 /* the warning field, and closes it again.
79 /*
80 /* qmgr_message_kill_record() takes a closed message, opens it, updates
81 /* the record type at the given offset to "killed", and closes the file.
82 /* A killed envelope record is ignored. Killed records are not allowed
83 /* inside the message content.
84 /* DIAGNOSTICS
85 /* Warnings: malformed message file. Fatal errors: out of memory.
86 /* SEE ALSO
87 /* envelope(3) message envelope parser
88 /* LICENSE
89 /* .ad
90 /* .fi
91 /* The Secure Mailer license must be distributed with this software.
92 /* AUTHOR(S)
93 /* Wietse Venema
94 /* IBM T.J. Watson Research
95 /* P.O. Box 704
96 /* Yorktown Heights, NY 10598, USA
97 /*
98 /* Wietse Venema
99 /* Google, Inc.
100 /* 111 8th Avenue
101 /* New York, NY 10011, USA
102 /*
103 /* Wietse Venema
104 /* porcupine.org
105 /*
106 /* Preemptive scheduler enhancements:
107 /* Patrik Rak
108 /* Modra 6
109 /* 155 00, Prague, Czech Republic
110 /*--*/
111
112 /* System library. */
113
114 #include <sys_defs.h>
115 #include <sys/stat.h>
116 #include <stdlib.h>
117 #include <stdio.h> /* sscanf() */
118 #include <fcntl.h>
119 #include <errno.h>
120 #include <unistd.h>
121 #include <string.h>
122 #include <ctype.h>
123
124 /* Utility library. */
125
126 #include <msg.h>
127 #include <mymalloc.h>
128 #include <vstring.h>
129 #include <vstream.h>
130 #include <split_at.h>
131 #include <valid_hostname.h>
132 #include <argv.h>
133 #include <stringops.h>
134 #include <myflock.h>
135 #include <sane_time.h>
136
137 /* Global library. */
138
139 #include <dict.h>
140 #include <mail_queue.h>
141 #include <mail_params.h>
142 #include <canon_addr.h>
143 #include <record.h>
144 #include <rec_type.h>
145 #include <sent.h>
146 #include <deliver_completed.h>
147 #include <opened.h>
148 #include <verp_sender.h>
149 #include <mail_proto.h>
150 #include <qmgr_user.h>
151 #include <split_addr.h>
152 #include <dsn_mask.h>
153 #include <rec_attr_map.h>
154 #include <sendopts.h>
155
156 /* Client stubs. */
157
158 #include <rewrite_clnt.h>
159 #include <resolve_clnt.h>
160
161 /* Application-specific. */
162
163 #include "qmgr.h"
164
165 int qmgr_message_count;
166 int qmgr_recipient_count;
167 int qmgr_vrfy_pend_count;
168
169 /* qmgr_message_create - create in-core message structure */
170
qmgr_message_create(const char * queue_name,const char * queue_id,int qflags)171 static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
172 const char *queue_id, int qflags)
173 {
174 QMGR_MESSAGE *message;
175
176 message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE));
177 qmgr_message_count++;
178 message->flags = 0;
179 message->qflags = qflags;
180 message->tflags = 0;
181 message->tflags_offset = 0;
182 message->rflags = QMGR_READ_FLAG_DEFAULT;
183 message->fp = 0;
184 message->refcount = 0;
185 message->single_rcpt = 0;
186 message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0;
187 message->create_time = 0;
188 GETTIMEOFDAY(&message->active_time);
189 message->queued_time = sane_time();
190 message->refill_time = 0;
191 message->data_offset = 0;
192 message->queue_id = mystrdup(queue_id);
193 message->queue_name = mystrdup(queue_name);
194 message->encoding = 0;
195 message->sender = 0;
196 message->dsn_envid = 0;
197 message->dsn_ret = 0;
198 message->sendopts = 0;
199 message->filter_xport = 0;
200 message->inspect_xport = 0;
201 message->redirect_addr = 0;
202 message->data_size = 0;
203 message->cont_length = 0;
204 message->warn_offset = 0;
205 message->warn_time = 0;
206 message->rcpt_offset = 0;
207 message->verp_delims = 0;
208 message->client_name = 0;
209 message->client_addr = 0;
210 message->client_port = 0;
211 message->client_proto = 0;
212 message->client_helo = 0;
213 message->sasl_method = 0;
214 message->sasl_username = 0;
215 message->sasl_sender = 0;
216 message->log_ident = 0;
217 message->rewrite_context = 0;
218 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
219 message->rcpt_count = 0;
220 message->rcpt_limit = var_qmgr_msg_rcpt_limit;
221 message->rcpt_unread = 0;
222 QMGR_LIST_INIT(message->job_list);
223 return (message);
224 }
225
226 /* qmgr_message_close - close queue file */
227
qmgr_message_close(QMGR_MESSAGE * message)228 static void qmgr_message_close(QMGR_MESSAGE *message)
229 {
230 vstream_fclose(message->fp);
231 message->fp = 0;
232 }
233
234 /* qmgr_message_open - open queue file */
235
qmgr_message_open(QMGR_MESSAGE * message)236 static int qmgr_message_open(QMGR_MESSAGE *message)
237 {
238
239 /*
240 * Sanity check.
241 */
242 if (message->fp)
243 msg_panic("%s: queue file is open", message->queue_id);
244
245 /*
246 * Open this queue file. Skip files that we cannot open. Back off when
247 * the system appears to be running out of resources.
248 */
249 if ((message->fp = mail_queue_open(message->queue_name,
250 message->queue_id,
251 O_RDWR, 0)) == 0) {
252 if (errno != ENOENT)
253 msg_fatal("open %s %s: %m", message->queue_name, message->queue_id);
254 msg_warn("open %s %s: %m", message->queue_name, message->queue_id);
255 return (-1);
256 }
257 return (0);
258 }
259
260 /* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */
261
qmgr_message_oldstyle_scan(QMGR_MESSAGE * message)262 static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message)
263 {
264 VSTRING *buf;
265 long orig_offset, extra_offset;
266 int rec_type;
267 char *start;
268
269 /*
270 * Initialize. No early returns or we have a memory leak.
271 */
272 buf = vstring_alloc(100);
273 if ((orig_offset = vstream_ftell(message->fp)) < 0)
274 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
275
276 /*
277 * Rewind to the very beginning to make sure we see all records.
278 */
279 if (vstream_fseek(message->fp, 0, SEEK_SET) < 0)
280 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
281
282 /*
283 * Scan through the old style queue file. Count the total number of
284 * recipients and find the data/extra sections offsets. Note that the new
285 * queue files require that data_size equals extra_offset - data_offset,
286 * so we set data_size to this as well and ignore the size record itself
287 * completely.
288 */
289 message->rcpt_unread = 0;
290 for (;;) {
291 rec_type = rec_get(message->fp, buf, 0);
292 if (rec_type <= 0)
293 /* Report missing end record later. */
294 break;
295 start = vstring_str(buf);
296 if (msg_verbose > 1)
297 msg_info("old-style scan record %c %s", rec_type, start);
298 if (rec_type == REC_TYPE_END)
299 break;
300 if (rec_type == REC_TYPE_DONE
301 || rec_type == REC_TYPE_RCPT
302 || rec_type == REC_TYPE_DRCP) {
303 message->rcpt_unread++;
304 continue;
305 }
306 if (rec_type == REC_TYPE_MESG) {
307 if (message->data_offset == 0) {
308 if ((message->data_offset = vstream_ftell(message->fp)) < 0)
309 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
310 if ((extra_offset = atol(start)) <= message->data_offset)
311 msg_fatal("bad extra offset %s file %s",
312 start, VSTREAM_PATH(message->fp));
313 if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0)
314 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
315 message->data_size = extra_offset - message->data_offset;
316 }
317 continue;
318 }
319 }
320
321 /*
322 * Clean up.
323 */
324 if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0)
325 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
326 vstring_free(buf);
327
328 /*
329 * Sanity checks. Verify that all required information was found,
330 * including the queue file end marker.
331 */
332 if (message->data_offset == 0 || rec_type != REC_TYPE_END)
333 msg_fatal("%s: envelope records out of order", message->queue_id);
334 }
335
336 /* qmgr_message_read - read envelope records */
337
qmgr_message_read(QMGR_MESSAGE * message)338 static int qmgr_message_read(QMGR_MESSAGE *message)
339 {
340 VSTRING *buf;
341 int rec_type;
342 long curr_offset;
343 long save_offset = message->rcpt_offset; /* save a flag */
344 int save_unread = message->rcpt_unread; /* save a count */
345 char *start;
346 int recipient_limit;
347 const char *error_text;
348 char *name;
349 char *value;
350 char *orig_rcpt = 0;
351 int count;
352 int dsn_notify = 0;
353 char *dsn_orcpt = 0;
354 int n;
355 int have_log_client_attr = 0;
356 static const char env_rec_types[] = REC_TYPE_ENVELOPE REC_TYPE_EXTRACT;
357 static const char extra_rec_type[] = {REC_TYPE_XTRA, 0};
358 const char *expected_rec_types;
359
360 /*
361 * Initialize. No early returns or we have a memory leak.
362 */
363 buf = vstring_alloc(100);
364
365 /*
366 * If we re-open this file, skip over on-file recipient records that we
367 * already looked at, and refill the in-core recipient address list.
368 *
369 * For the first time, the message recipient limit is calculated from the
370 * global recipient limit. This is to avoid reading little recipients
371 * when the active queue is near empty. When the queue becomes full, only
372 * the necessary amount is read in core. Such priming is necessary
373 * because there are no message jobs yet.
374 *
375 * For the next time, the recipient limit is based solely on the message
376 * jobs' positions in the job lists and/or job stacks.
377 */
378 if (message->rcpt_offset) {
379 if (message->rcpt_list.len)
380 msg_panic("%s: recipient list not empty on recipient reload",
381 message->queue_id);
382 if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0)
383 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
384 message->rcpt_offset = 0;
385 recipient_limit = message->rcpt_limit - message->rcpt_count;
386 } else {
387 recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count;
388 if (recipient_limit < message->rcpt_limit)
389 recipient_limit = message->rcpt_limit;
390 }
391 /* Keep interrupt latency in check. */
392 if (recipient_limit > 5000)
393 recipient_limit = 5000;
394 if (recipient_limit <= 0)
395 msg_panic("%s: no recipient slots available", message->queue_id);
396 if (msg_verbose)
397 msg_info("%s: recipient limit %d", message->queue_id, recipient_limit);
398
399 /*
400 * Read envelope records. XXX Rely on the front-end programs to enforce
401 * record size limits. Read up to recipient_limit recipients from the
402 * queue file, to protect against memory exhaustion. Recipient records
403 * may appear before or after the message content, so we keep reading
404 * from the queue file until we have enough recipients (rcpt_offset != 0)
405 * and until we know all the non-recipient information.
406 *
407 * Note that the total recipient count record is accurate only for fresh
408 * queue files. After some of the recipients are marked as done and the
409 * queue file is deferred, it can be used as upper bound estimate only.
410 * Fortunately, this poses no major problem on the scheduling algorithm,
411 * as the only impact is that the already deferred messages are not
412 * chosen by qmgr_job_candidate() as often as they could.
413 *
414 * On the first open, we must examine all non-recipient records.
415 *
416 * Optimization: when we know that recipient records are not mixed with
417 * non-recipient records, as is typical with mailing list mail, then we
418 * can avoid having to examine all the queue file records before we can
419 * start deliveries. This avoids some file system thrashing with huge
420 * mailing lists.
421 */
422 for (;;) {
423 expected_rec_types = env_rec_types;
424 if ((curr_offset = vstream_ftell(message->fp)) < 0)
425 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
426 if (curr_offset == message->data_offset && curr_offset > 0) {
427 if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0)
428 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
429 curr_offset += message->data_size;
430 expected_rec_types = extra_rec_type;
431 }
432 rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE);
433 start = vstring_str(buf);
434 if (msg_verbose > 1)
435 msg_info("record %c %s", rec_type, start);
436 if (rec_type == REC_TYPE_PTR) {
437 if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR)
438 break;
439 /* Need to update curr_offset after pointer jump. */
440 continue;
441 }
442 if (rec_type <= 0) {
443 msg_warn("%s: message rejected: missing end record",
444 message->queue_id);
445 break;
446 }
447 if (strchr(expected_rec_types, rec_type) == 0) {
448 msg_warn("Unexpected record type '%c' at offset %ld",
449 rec_type, (long) curr_offset);
450 rec_type = REC_TYPE_ERROR;
451 break;
452 }
453 if (rec_type == REC_TYPE_END) {
454 message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT;
455 break;
456 }
457
458 /*
459 * Map named attributes to pseudo record types, so that we don't have
460 * to pollute the queue file with records that are incompatible with
461 * past Postfix versions. Preferably, people should be able to back
462 * out from an upgrade without losing mail.
463 */
464 if (rec_type == REC_TYPE_ATTR) {
465 if ((error_text = split_nameval(start, &name, &value)) != 0) {
466 msg_warn("%s: bad attribute record: %s: %.200s",
467 message->queue_id, error_text, start);
468 rec_type = REC_TYPE_ERROR;
469 break;
470 }
471 if ((n = rec_attr_map(name)) != 0) {
472 start = value;
473 rec_type = n;
474 }
475 }
476
477 /*
478 * Process recipient records.
479 */
480 if (rec_type == REC_TYPE_RCPT) {
481 /* See also below for code setting orig_rcpt etc. */
482 if (message->rcpt_offset == 0) {
483 message->rcpt_unread--;
484 recipient_list_add(&message->rcpt_list, curr_offset,
485 dsn_orcpt ? dsn_orcpt : "",
486 dsn_notify ? dsn_notify : 0,
487 orig_rcpt ? orig_rcpt : "", start);
488 if (dsn_orcpt) {
489 myfree(dsn_orcpt);
490 dsn_orcpt = 0;
491 }
492 if (orig_rcpt) {
493 myfree(orig_rcpt);
494 orig_rcpt = 0;
495 }
496 if (dsn_notify)
497 dsn_notify = 0;
498 if (message->rcpt_list.len >= recipient_limit) {
499 if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0)
500 msg_fatal("vstream_ftell %s: %m",
501 VSTREAM_PATH(message->fp));
502 if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT)
503 /* We already examined all non-recipient records. */
504 break;
505 if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER)
506 /* Examine all remaining non-recipient records. */
507 continue;
508 /* Optimizations for "pure recipient" record sections. */
509 if (curr_offset > message->data_offset) {
510 /* We already examined all non-recipient records. */
511 message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT;
512 break;
513 }
514
515 /*
516 * Examine non-recipient records in the extracted
517 * segment. Note that this skips to the message start
518 * record, because the handler for that record changes
519 * the expectations for allowed record types.
520 */
521 if (vstream_fseek(message->fp, message->data_offset,
522 SEEK_SET) < 0)
523 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
524 continue;
525 }
526 }
527 continue;
528 }
529 if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) {
530 if (message->rcpt_offset == 0) {
531 message->rcpt_unread--;
532 if (dsn_orcpt) {
533 myfree(dsn_orcpt);
534 dsn_orcpt = 0;
535 }
536 if (orig_rcpt) {
537 myfree(orig_rcpt);
538 orig_rcpt = 0;
539 }
540 if (dsn_notify)
541 dsn_notify = 0;
542 }
543 continue;
544 }
545 if (rec_type == REC_TYPE_DSN_ORCPT) {
546 /* See also above for code clearing dsn_orcpt. */
547 if (dsn_orcpt != 0) {
548 msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>",
549 message->queue_id, dsn_orcpt);
550 myfree(dsn_orcpt);
551 dsn_orcpt = 0;
552 }
553 if (message->rcpt_offset == 0)
554 dsn_orcpt = mystrdup(start);
555 continue;
556 }
557 if (rec_type == REC_TYPE_DSN_NOTIFY) {
558 /* See also above for code clearing dsn_notify. */
559 if (dsn_notify != 0) {
560 msg_warn("%s: ignoring out-of-order DSN notify flags <%d>",
561 message->queue_id, dsn_notify);
562 dsn_notify = 0;
563 }
564 if (message->rcpt_offset == 0) {
565 if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n))
566 msg_warn("%s: ignoring malformed DSN notify flags <%.200s>",
567 message->queue_id, start);
568 else
569 dsn_notify = n;
570 continue;
571 }
572 }
573 if (rec_type == REC_TYPE_ORCP) {
574 /* See also above for code clearing orig_rcpt. */
575 if (orig_rcpt != 0) {
576 msg_warn("%s: ignoring out-of-order original recipient <%.200s>",
577 message->queue_id, orig_rcpt);
578 myfree(orig_rcpt);
579 orig_rcpt = 0;
580 }
581 if (message->rcpt_offset == 0)
582 orig_rcpt = mystrdup(start);
583 continue;
584 }
585
586 /*
587 * Process non-recipient records.
588 */
589 if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT)
590 /* We already examined all non-recipient records. */
591 continue;
592 if (rec_type == REC_TYPE_SIZE) {
593 if (message->data_offset == 0) {
594 if ((count = sscanf(start, "%ld %ld %d %d %ld %d",
595 &message->data_size, &message->data_offset,
596 &message->rcpt_unread, &message->rflags,
597 &message->cont_length,
598 &message->sendopts)) >= 3) {
599 /* Postfix >= 1.0 (a.k.a. 20010228). */
600 if (message->data_offset <= 0 || message->data_size <= 0) {
601 msg_warn("%s: invalid size record: %.100s",
602 message->queue_id, start);
603 rec_type = REC_TYPE_ERROR;
604 break;
605 }
606 if (message->rflags & ~QMGR_READ_FLAG_USER) {
607 msg_warn("%s: invalid flags in size record: %.100s",
608 message->queue_id, start);
609 rec_type = REC_TYPE_ERROR;
610 break;
611 }
612 /* Forward compatibility. */
613 message->sendopts &= SOPT_FLAG_ALL;
614 } else if (count == 1) {
615 /* Postfix < 1.0 (a.k.a. 20010228). */
616 qmgr_message_oldstyle_scan(message);
617 } else {
618 /* Can't happen. */
619 msg_warn("%s: message rejected: weird size record",
620 message->queue_id);
621 rec_type = REC_TYPE_ERROR;
622 break;
623 }
624 }
625 /* Postfix < 2.4 compatibility. */
626 if (message->cont_length == 0) {
627 message->cont_length = message->data_size;
628 } else if (message->cont_length < 0) {
629 msg_warn("%s: invalid size record: %.100s",
630 message->queue_id, start);
631 rec_type = REC_TYPE_ERROR;
632 break;
633 }
634 continue;
635 }
636 if (rec_type == REC_TYPE_TIME) {
637 if (message->arrival_time.tv_sec == 0)
638 REC_TYPE_TIME_SCAN(start, message->arrival_time);
639 continue;
640 }
641 if (rec_type == REC_TYPE_CTIME) {
642 if (message->create_time == 0)
643 message->create_time = atol(start);
644 continue;
645 }
646 if (rec_type == REC_TYPE_FILT) {
647 if (message->filter_xport != 0)
648 myfree(message->filter_xport);
649 message->filter_xport = mystrdup(start);
650 continue;
651 }
652 if (rec_type == REC_TYPE_INSP) {
653 if (message->inspect_xport != 0)
654 myfree(message->inspect_xport);
655 message->inspect_xport = mystrdup(start);
656 continue;
657 }
658 if (rec_type == REC_TYPE_RDR) {
659 if (message->redirect_addr != 0)
660 myfree(message->redirect_addr);
661 message->redirect_addr = mystrdup(start);
662 continue;
663 }
664 if (rec_type == REC_TYPE_FROM) {
665 if (message->sender == 0) {
666 message->sender = mystrdup(start);
667 opened(message->queue_id, message->sender,
668 message->cont_length, message->rcpt_unread,
669 "queue %s", message->queue_name);
670 }
671 continue;
672 }
673 if (rec_type == REC_TYPE_DSN_ENVID) {
674 /* Allow Milter override. */
675 if (message->dsn_envid != 0)
676 myfree(message->dsn_envid);
677 message->dsn_envid = mystrdup(start);
678 }
679 if (rec_type == REC_TYPE_DSN_RET) {
680 /* Allow Milter override. */
681 if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n))
682 msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s",
683 message->queue_id, start);
684 else
685 message->dsn_ret = n;
686 }
687 if (rec_type == REC_TYPE_ATTR) {
688 /* Allow extra segment to override envelope segment info. */
689 if (strcmp(name, MAIL_ATTR_ENCODING) == 0) {
690 if (message->encoding != 0)
691 myfree(message->encoding);
692 message->encoding = mystrdup(value);
693 }
694
695 /*
696 * Backwards compatibility. Before Postfix 2.3, the logging
697 * attributes were called client_name, etc. Now they are called
698 * log_client_name. etc., and client_name is used for the actual
699 * client information. To support old queue files we accept both
700 * names for the purpose of logging; the new name overrides the
701 * old one.
702 *
703 * XXX Do not use the "legacy" client_name etc. attribute values for
704 * initializing the logging attributes, when this file already
705 * contains the "modern" log_client_name etc. logging attributes.
706 * Otherwise, logging attributes that are not present in the
707 * queue file would be set with information from the real client.
708 */
709 else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) {
710 if (have_log_client_attr == 0 && message->client_name == 0)
711 message->client_name = mystrdup(value);
712 } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) {
713 if (have_log_client_attr == 0 && message->client_addr == 0)
714 message->client_addr = mystrdup(value);
715 } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) {
716 if (have_log_client_attr == 0 && message->client_port == 0)
717 message->client_port = mystrdup(value);
718 } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) {
719 if (have_log_client_attr == 0 && message->client_proto == 0)
720 message->client_proto = mystrdup(value);
721 } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) {
722 if (have_log_client_attr == 0 && message->client_helo == 0)
723 message->client_helo = mystrdup(value);
724 }
725 /* Original client attributes. */
726 else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) {
727 if (message->client_name != 0)
728 myfree(message->client_name);
729 message->client_name = mystrdup(value);
730 have_log_client_attr = 1;
731 } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) {
732 if (message->client_addr != 0)
733 myfree(message->client_addr);
734 message->client_addr = mystrdup(value);
735 have_log_client_attr = 1;
736 } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) {
737 if (message->client_port != 0)
738 myfree(message->client_port);
739 message->client_port = mystrdup(value);
740 have_log_client_attr = 1;
741 } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) {
742 if (message->client_proto != 0)
743 myfree(message->client_proto);
744 message->client_proto = mystrdup(value);
745 have_log_client_attr = 1;
746 } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) {
747 if (message->client_helo != 0)
748 myfree(message->client_helo);
749 message->client_helo = mystrdup(value);
750 have_log_client_attr = 1;
751 } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) {
752 if (message->sasl_method == 0)
753 message->sasl_method = mystrdup(value);
754 else
755 msg_warn("%s: ignoring multiple %s attribute: %s",
756 message->queue_id, MAIL_ATTR_SASL_METHOD, value);
757 } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) {
758 if (message->sasl_username == 0)
759 message->sasl_username = mystrdup(value);
760 else
761 msg_warn("%s: ignoring multiple %s attribute: %s",
762 message->queue_id, MAIL_ATTR_SASL_USERNAME, value);
763 } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) {
764 if (message->sasl_sender == 0)
765 message->sasl_sender = mystrdup(value);
766 else
767 msg_warn("%s: ignoring multiple %s attribute: %s",
768 message->queue_id, MAIL_ATTR_SASL_SENDER, value);
769 } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) {
770 if (message->log_ident == 0)
771 message->log_ident = mystrdup(value);
772 else
773 msg_warn("%s: ignoring multiple %s attribute: %s",
774 message->queue_id, MAIL_ATTR_LOG_IDENT, value);
775 } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) {
776 if (message->rewrite_context == 0)
777 message->rewrite_context = mystrdup(value);
778 else
779 msg_warn("%s: ignoring multiple %s attribute: %s",
780 message->queue_id, MAIL_ATTR_RWR_CONTEXT, value);
781 }
782
783 /*
784 * Optional tracing flags (verify, sendmail -v, sendmail -bv).
785 * This record is killed after a trace logfile report is sent and
786 * after the logfile is deleted.
787 */
788 else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) {
789 if (message->tflags == 0) {
790 message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value));
791 if (message->tflags == DEL_REQ_FLAG_RECORD)
792 message->tflags_offset = curr_offset;
793 else
794 message->tflags_offset = 0;
795 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0)
796 qmgr_vrfy_pend_count++;
797 }
798 }
799 continue;
800 }
801 if (rec_type == REC_TYPE_WARN) {
802 if (message->warn_offset == 0) {
803 message->warn_offset = curr_offset;
804 REC_TYPE_WARN_SCAN(start, message->warn_time);
805 }
806 continue;
807 }
808 if (rec_type == REC_TYPE_VERP) {
809 if (message->verp_delims == 0) {
810 if (message->sender == 0 || message->sender[0] == 0) {
811 msg_warn("%s: ignoring VERP request for null sender",
812 message->queue_id);
813 } else if (verp_delims_verify(start) != 0) {
814 msg_warn("%s: ignoring bad VERP request: \"%.100s\"",
815 message->queue_id, start);
816 } else {
817 if (msg_verbose)
818 msg_info("%s: enabling VERP for sender \"%.100s\"",
819 message->queue_id, message->sender);
820 message->single_rcpt = 1;
821 message->verp_delims = mystrdup(start);
822 }
823 }
824 continue;
825 }
826 }
827
828 /*
829 * Grr.
830 */
831 if (dsn_orcpt != 0) {
832 if (rec_type > 0)
833 msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>",
834 message->queue_id, dsn_orcpt);
835 myfree(dsn_orcpt);
836 }
837 if (orig_rcpt != 0) {
838 if (rec_type > 0)
839 msg_warn("%s: ignoring out-of-order original recipient <%.200s>",
840 message->queue_id, orig_rcpt);
841 myfree(orig_rcpt);
842 }
843
844 /*
845 * After sending a "delayed" warning, request sender notification when
846 * message delivery is completed. While "mail delayed" notifications are
847 * bad enough because they multiply the amount of email traffic, "delay
848 * cleared" notifications are even worse because they come in a sudden
849 * burst when the queue drains after a network outage.
850 */
851 if (var_dsn_delay_cleared && message->warn_time < 0)
852 message->tflags |= DEL_REQ_FLAG_REC_DLY_SENT;
853
854 /*
855 * Remember when we have read the last recipient batch. Note that we do
856 * it here after reading as reading might have used considerable amount
857 * of time.
858 */
859 message->refill_time = sane_time();
860
861 /*
862 * Avoid clumsiness elsewhere in the program. When sending data across an
863 * IPC channel, sending an empty string is more convenient than sending a
864 * null pointer.
865 */
866 if (message->dsn_envid == 0)
867 message->dsn_envid = mystrdup("");
868 if (message->encoding == 0)
869 message->encoding = mystrdup(MAIL_ATTR_ENC_NONE);
870 if (message->client_name == 0)
871 message->client_name = mystrdup("");
872 if (message->client_addr == 0)
873 message->client_addr = mystrdup("");
874 if (message->client_port == 0)
875 message->client_port = mystrdup("");
876 if (message->client_proto == 0)
877 message->client_proto = mystrdup("");
878 if (message->client_helo == 0)
879 message->client_helo = mystrdup("");
880 if (message->sasl_method == 0)
881 message->sasl_method = mystrdup("");
882 if (message->sasl_username == 0)
883 message->sasl_username = mystrdup("");
884 if (message->sasl_sender == 0)
885 message->sasl_sender = mystrdup("");
886 if (message->log_ident == 0)
887 message->log_ident = mystrdup("");
888 if (message->rewrite_context == 0)
889 message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL);
890 /* Postfix < 2.3 compatibility. */
891 if (message->create_time == 0)
892 message->create_time = message->arrival_time.tv_sec;
893
894 /*
895 * Clean up.
896 */
897 vstring_free(buf);
898
899 /*
900 * Sanity checks. Verify that all required information was found,
901 * including the queue file end marker.
902 */
903 if (message->rcpt_unread < 0
904 || (message->rcpt_offset == 0 && message->rcpt_unread != 0)) {
905 msg_warn("%s: rcpt count mismatch (%d)",
906 message->queue_id, message->rcpt_unread);
907 message->rcpt_unread = 0;
908 }
909 if (rec_type <= 0) {
910 /* Already logged warning. */
911 } else if (message->arrival_time.tv_sec == 0) {
912 msg_warn("%s: message rejected: missing arrival time record",
913 message->queue_id);
914 } else if (message->sender == 0) {
915 msg_warn("%s: message rejected: missing sender record",
916 message->queue_id);
917 } else if (message->data_offset == 0) {
918 msg_warn("%s: message rejected: missing size record",
919 message->queue_id);
920 } else {
921 return (0);
922 }
923 message->rcpt_offset = save_offset; /* restore flag */
924 message->rcpt_unread = save_unread; /* restore count */
925 recipient_list_free(&message->rcpt_list);
926 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
927 return (-1);
928 }
929
930 /* qmgr_message_update_warn - update the time of next delay warning */
931
qmgr_message_update_warn(QMGR_MESSAGE * message)932 void qmgr_message_update_warn(QMGR_MESSAGE *message)
933 {
934
935 /*
936 * After the "mail delayed" warning, optionally send a "delay cleared"
937 * notification.
938 */
939 if (qmgr_message_open(message)
940 || vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0
941 || rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT,
942 REC_TYPE_WARN_ARG(-1)) < 0
943 || vstream_fflush(message->fp))
944 msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp));
945 qmgr_message_close(message);
946 }
947
948 /* qmgr_message_kill_record - mark one message record as killed */
949
qmgr_message_kill_record(QMGR_MESSAGE * message,long offset)950 void qmgr_message_kill_record(QMGR_MESSAGE *message, long offset)
951 {
952 if (offset <= 0)
953 msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset);
954 if (qmgr_message_open(message)
955 || rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0
956 || vstream_fflush(message->fp))
957 msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp));
958 qmgr_message_close(message);
959 }
960
961 /* qmgr_message_sort_compare - compare recipient information */
962
qmgr_message_sort_compare(const void * p1,const void * p2)963 static int qmgr_message_sort_compare(const void *p1, const void *p2)
964 {
965 RECIPIENT *rcpt1 = (RECIPIENT *) p1;
966 RECIPIENT *rcpt2 = (RECIPIENT *) p2;
967 QMGR_QUEUE *queue1;
968 QMGR_QUEUE *queue2;
969 char *at1;
970 char *at2;
971 int result;
972
973 /*
974 * Compare most significant to least significant recipient attributes.
975 * The comparison function must be transitive, so NULL values need to be
976 * assigned an ordinal (we set NULL last).
977 */
978
979 queue1 = rcpt1->u.queue;
980 queue2 = rcpt2->u.queue;
981 if (queue1 != 0 && queue2 == 0)
982 return (-1);
983 if (queue1 == 0 && queue2 != 0)
984 return (1);
985 if (queue1 != 0 && queue2 != 0) {
986
987 /*
988 * Compare message transport.
989 */
990 if ((result = strcmp(queue1->transport->name,
991 queue2->transport->name)) != 0)
992 return (result);
993
994 /*
995 * Compare queue name (nexthop or recipient@nexthop).
996 */
997 if ((result = strcmp(queue1->name, queue2->name)) != 0)
998 return (result);
999 }
1000
1001 /*
1002 * Compare recipient domain.
1003 */
1004 at1 = strrchr(rcpt1->address, '@');
1005 at2 = strrchr(rcpt2->address, '@');
1006 if (at1 == 0 && at2 != 0)
1007 return (1);
1008 if (at1 != 0 && at2 == 0)
1009 return (-1);
1010 if (at1 != 0 && at2 != 0
1011 && (result = strcasecmp_utf8(at1, at2)) != 0)
1012 return (result);
1013
1014 /*
1015 * Compare recipient address.
1016 */
1017 return (strcmp(rcpt1->address, rcpt2->address));
1018 }
1019
1020 /* qmgr_message_sort - sort message recipient addresses by domain */
1021
qmgr_message_sort(QMGR_MESSAGE * message)1022 static void qmgr_message_sort(QMGR_MESSAGE *message)
1023 {
1024 qsort((void *) message->rcpt_list.info, message->rcpt_list.len,
1025 sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare);
1026 if (msg_verbose) {
1027 RECIPIENT_LIST list = message->rcpt_list;
1028 RECIPIENT *rcpt;
1029
1030 msg_info("start sorted recipient list");
1031 for (rcpt = list.info; rcpt < list.info + list.len; rcpt++)
1032 msg_info("qmgr_message_sort: %s", rcpt->address);
1033 msg_info("end sorted recipient list");
1034 }
1035 }
1036
1037 /* qmgr_resolve_one - resolve or skip one recipient */
1038
qmgr_resolve_one(QMGR_MESSAGE * message,RECIPIENT * recipient,const char * addr,RESOLVE_REPLY * reply)1039 static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient,
1040 const char *addr, RESOLVE_REPLY *reply)
1041 {
1042 #define QMGR_REDIRECT(rp, tp, np) do { \
1043 (rp)->flags = 0; \
1044 vstring_strcpy((rp)->transport, (tp)); \
1045 vstring_strcpy((rp)->nexthop, (np)); \
1046 } while (0)
1047
1048 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0)
1049 resolve_clnt_query_from(message->sender, addr, reply);
1050 else
1051 resolve_clnt_verify_from(message->sender, addr, reply);
1052 if (reply->flags & RESOLVE_FLAG_FAIL) {
1053 QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY,
1054 "4.3.0 address resolver failure");
1055 return (0);
1056 } else if (reply->flags & RESOLVE_FLAG_ERROR) {
1057 QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR,
1058 "5.1.3 bad address syntax");
1059 return (0);
1060 } else {
1061 return (0);
1062 }
1063 }
1064
1065 /* qmgr_message_resolve - resolve recipients */
1066
qmgr_message_resolve(QMGR_MESSAGE * message)1067 static void qmgr_message_resolve(QMGR_MESSAGE *message)
1068 {
1069 static ARGV *defer_xport_argv;
1070 RECIPIENT_LIST list = message->rcpt_list;
1071 RECIPIENT *recipient;
1072 QMGR_TRANSPORT *transport = 0;
1073 QMGR_QUEUE *queue = 0;
1074 RESOLVE_REPLY reply;
1075 VSTRING *queue_name;
1076 char *at;
1077 char **cpp;
1078 char *nexthop;
1079 ssize_t len;
1080 int status;
1081 DSN dsn;
1082 MSG_STATS stats;
1083 DSN *saved_dsn;
1084
1085 #define STREQ(x,y) (strcmp(x,y) == 0)
1086 #define STR vstring_str
1087 #define LEN VSTRING_LEN
1088
1089 resolve_clnt_init(&reply);
1090 queue_name = vstring_alloc(1);
1091 for (recipient = list.info; recipient < list.info + list.len; recipient++) {
1092
1093 /*
1094 * Redirect overrides all else. But only once (per entire message).
1095 * For consistency with the remainder of Postfix, rewrite the address
1096 * to canonical form before resolving it.
1097 */
1098 if (message->redirect_addr) {
1099 if (recipient > list.info) {
1100 recipient->u.queue = 0;
1101 continue;
1102 }
1103 message->rcpt_offset = 0;
1104 message->rcpt_unread = 0;
1105
1106 rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr,
1107 reply.recipient);
1108 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1109 if (qmgr_resolve_one(message, recipient,
1110 recipient->address, &reply) < 0)
1111 continue;
1112 if (!STREQ(recipient->address, STR(reply.recipient)))
1113 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1114 }
1115
1116 /*
1117 * Content filtering overrides the address resolver.
1118 *
1119 * XXX Bypass content_filter inspection for user-generated probes
1120 * (sendmail -bv). MTA-generated probes never have the "please filter
1121 * me" bits turned on, but we handle them here anyway for the sake of
1122 * future proofing.
1123 */
1124 #define FILTER_WITHOUT_NEXTHOP(filter, next) \
1125 (((next) = split_at((filter), ':')) == 0 || *(next) == 0)
1126
1127 #define RCPT_WITHOUT_DOMAIN(rcpt, next) \
1128 ((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0)
1129
1130 else if (message->filter_xport
1131 && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) {
1132 reply.flags = 0;
1133 vstring_strcpy(reply.transport, message->filter_xport);
1134 if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop)
1135 && *(nexthop = var_def_filter_nexthop) == 0
1136 && RCPT_WITHOUT_DOMAIN(recipient->address, nexthop))
1137 nexthop = var_myhostname;
1138 vstring_strcpy(reply.nexthop, nexthop);
1139 vstring_strcpy(reply.recipient, recipient->address);
1140 }
1141
1142 /*
1143 * Resolve the destination to (transport, nexthop, address). The
1144 * result address may differ from the one specified by the sender.
1145 */
1146 else {
1147 if (qmgr_resolve_one(message, recipient,
1148 recipient->address, &reply) < 0)
1149 continue;
1150 if (!STREQ(recipient->address, STR(reply.recipient)))
1151 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1152 }
1153
1154 /*
1155 * Bounce null recipients. This should never happen, but is most
1156 * likely the result of a fault in a different program, so aborting
1157 * the queue manager process does not help.
1158 */
1159 if (recipient->address[0] == 0) {
1160 QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR,
1161 "5.1.3 null recipient address");
1162 }
1163
1164 /*
1165 * Redirect a forced-to-expire message without defer log to the retry
1166 * service, so that its defer log will contain an appropriate reason.
1167 * Do not redirect such a message to the error service, because if
1168 * that request fails, a defer log would be created with reason
1169 * "bounce or trace service failure" which would make no sense. Note
1170 * that if the bounce service fails to create a defer log, the
1171 * message will be returned as undeliverable anyway, because it is
1172 * expired.
1173 */
1174 if ((message->qflags & QMGR_FORCE_EXPIRE) != 0) {
1175 QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY,
1176 "4.7.0 message is administratively expired");
1177 }
1178
1179 /*
1180 * Discard mail to the local double bounce address here, so this
1181 * system can run without a local delivery agent. They'd still have
1182 * to configure something for mail directed to the local postmaster,
1183 * though, but that is an RFC requirement anyway.
1184 *
1185 * XXX This lookup should be done in the resolver, and the mail should
1186 * be directed to a general-purpose null delivery agent.
1187 */
1188 if (reply.flags & RESOLVE_CLASS_LOCAL) {
1189 at = strrchr(STR(reply.recipient), '@');
1190 len = (at ? (at - STR(reply.recipient))
1191 : strlen(STR(reply.recipient)));
1192 if (strncasecmp_utf8(STR(reply.recipient),
1193 var_double_bounce_sender, len) == 0
1194 && !var_double_bounce_sender[len]) {
1195 status = sent(message->tflags, message->queue_id,
1196 QMGR_MSG_STATS(&stats, message), recipient,
1197 "none", DSN_SIMPLE(&dsn, "2.0.0",
1198 "undeliverable postmaster notification discarded"));
1199 if (status == 0) {
1200 deliver_completed(message->fp, recipient->offset);
1201 #if 0
1202 /* It's the default verification probe sender address. */
1203 msg_warn("%s: undeliverable postmaster notification discarded",
1204 message->queue_id);
1205 #endif
1206 } else
1207 message->flags |= status;
1208 continue;
1209 }
1210 }
1211
1212 /*
1213 * Optionally defer deliveries over specific transports, unless the
1214 * restriction is lifted temporarily.
1215 */
1216 if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) {
1217 if (defer_xport_argv == 0)
1218 defer_xport_argv = argv_split(var_defer_xports, CHARS_COMMA_SP);
1219 for (cpp = defer_xport_argv->argv; *cpp; cpp++)
1220 if (strcmp(*cpp, STR(reply.transport)) == 0)
1221 break;
1222 if (*cpp) {
1223 QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY,
1224 "4.3.2 deferred transport");
1225 }
1226 }
1227
1228 /*
1229 * Safety: defer excess address verification requests.
1230 */
1231 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0
1232 && qmgr_vrfy_pend_count > var_vrfy_pend_limit)
1233 QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY,
1234 "4.3.2 Too many address verification requests");
1235
1236 /*
1237 * Look up or instantiate the proper transport.
1238 */
1239 if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) {
1240 if ((transport = qmgr_transport_find(STR(reply.transport))) == 0)
1241 transport = qmgr_transport_create(STR(reply.transport));
1242 queue = 0;
1243 }
1244
1245 /*
1246 * This message is being flushed. If need-be unthrottle the
1247 * transport.
1248 */
1249 if ((message->qflags & QMGR_FLUSH_EACH) != 0
1250 && QMGR_TRANSPORT_THROTTLED(transport))
1251 qmgr_transport_unthrottle(transport);
1252
1253 /*
1254 * This transport is dead. Defer delivery to this recipient.
1255 */
1256 if (QMGR_TRANSPORT_THROTTLED(transport)) {
1257 saved_dsn = transport->dsn;
1258 if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) {
1259 nexthop = qmgr_error_nexthop(saved_dsn);
1260 vstring_strcpy(reply.nexthop, nexthop);
1261 myfree(nexthop);
1262 queue = 0;
1263 } else {
1264 qmgr_defer_recipient(message, recipient, saved_dsn);
1265 continue;
1266 }
1267 }
1268
1269 /*
1270 * The nexthop destination provides the default name for the
1271 * per-destination queue. When the delivery agent accepts only one
1272 * recipient per delivery, give each recipient its own queue, so that
1273 * deliveries to different recipients of the same message can happen
1274 * in parallel, and so that we can enforce per-recipient concurrency
1275 * limits and prevent one recipient from tying up all the delivery
1276 * agent resources. We use recipient@nexthop as queue name rather
1277 * than the actual recipient domain name, so that one recipient in
1278 * multiple equivalent domains cannot evade the per-recipient
1279 * concurrency limit. Split the address on the recipient delimiter if
1280 * one is defined, so that extended addresses don't get extra
1281 * delivery slots.
1282 *
1283 * Fold the result to lower case so that we don't have multiple queues
1284 * for the same name.
1285 *
1286 * Important! All recipients in a queue must have the same nexthop
1287 * value. It is OK to have multiple queues with the same nexthop
1288 * value, but only when those queues are named after recipients.
1289 *
1290 * The single-recipient code below was written for local(8) like
1291 * delivery agents, and assumes that all domains that deliver to the
1292 * same (transport + nexthop) are aliases for $nexthop. Delivery
1293 * concurrency is changed from per-domain into per-recipient, by
1294 * changing the queue name from nexthop into localpart@nexthop.
1295 *
1296 * XXX This assumption is incorrect when different destinations share
1297 * the same (transport + nexthop). In reality, such transports are
1298 * rarely configured to use single-recipient deliveries. The fix is
1299 * to decouple the per-destination recipient limit from the
1300 * per-destination concurrency.
1301 */
1302 vstring_strcpy(queue_name, STR(reply.nexthop));
1303 if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0
1304 && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0
1305 && transport->recipient_limit == 1) {
1306 /* Copy the recipient localpart. */
1307 at = strrchr(STR(reply.recipient), '@');
1308 len = (at ? (at - STR(reply.recipient))
1309 : strlen(STR(reply.recipient)));
1310 vstring_strncpy(queue_name, STR(reply.recipient), len);
1311 /* Remove the address extension from the recipient localpart. */
1312 if (*var_rcpt_delim && split_addr(STR(queue_name), var_rcpt_delim))
1313 vstring_truncate(queue_name, strlen(STR(queue_name)));
1314 /* Assume the recipient domain is equivalent to nexthop. */
1315 vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop));
1316 }
1317 lowercase(STR(queue_name));
1318
1319 /*
1320 * This transport is alive. Find or instantiate a queue for this
1321 * recipient.
1322 */
1323 if (queue == 0 || !STREQ(queue->name, STR(queue_name))) {
1324 if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0)
1325 queue = qmgr_queue_create(transport, STR(queue_name),
1326 STR(reply.nexthop));
1327 }
1328
1329 /*
1330 * This message is being flushed. If need-be unthrottle the queue.
1331 */
1332 if ((message->qflags & QMGR_FLUSH_EACH) != 0
1333 && QMGR_QUEUE_THROTTLED(queue))
1334 qmgr_queue_unthrottle(queue);
1335
1336 /*
1337 * This queue is dead. Defer delivery to this recipient.
1338 */
1339 if (QMGR_QUEUE_THROTTLED(queue)) {
1340 saved_dsn = queue->dsn;
1341 if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) {
1342 qmgr_defer_recipient(message, recipient, saved_dsn);
1343 continue;
1344 }
1345 }
1346
1347 /*
1348 * This queue is alive. Bind this recipient to this queue instance.
1349 */
1350 recipient->u.queue = queue;
1351 }
1352 resolve_clnt_free(&reply);
1353 vstring_free(queue_name);
1354 }
1355
1356 /* qmgr_message_assign - assign recipients to specific delivery requests */
1357
qmgr_message_assign(QMGR_MESSAGE * message)1358 static void qmgr_message_assign(QMGR_MESSAGE *message)
1359 {
1360 RECIPIENT_LIST list = message->rcpt_list;
1361 RECIPIENT *recipient;
1362 QMGR_ENTRY *entry = 0;
1363 QMGR_QUEUE *queue;
1364 QMGR_JOB *job = 0;
1365 QMGR_PEER *peer = 0;
1366
1367 /*
1368 * Try to bundle as many recipients in a delivery request as we can. When
1369 * the recipient resolves to the same site and transport as an existing
1370 * recipient, do not create a new queue entry, just move that recipient
1371 * to the recipient list of the existing queue entry. All this provided
1372 * that we do not exceed the transport-specific limit on the number of
1373 * recipients per transaction.
1374 */
1375 #define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit)))
1376
1377 for (recipient = list.info; recipient < list.info + list.len; recipient++) {
1378
1379 /*
1380 * Skip recipients with a dead transport or destination.
1381 */
1382 if ((queue = recipient->u.queue) == 0)
1383 continue;
1384
1385 /*
1386 * Lookup or instantiate the message job if necessary.
1387 */
1388 if (job == 0 || queue->transport != job->transport) {
1389 job = qmgr_job_obtain(message, queue->transport);
1390 peer = 0;
1391 }
1392
1393 /*
1394 * Lookup or instantiate job peer if necessary.
1395 */
1396 if (peer == 0 || queue != peer->queue)
1397 peer = qmgr_peer_obtain(job, queue);
1398
1399 /*
1400 * Lookup old or instantiate new recipient entry. We try to reuse the
1401 * last existing entry whenever the recipient limit permits.
1402 */
1403 entry = peer->entry_list.prev;
1404 if (message->single_rcpt || entry == 0
1405 || !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len))
1406 entry = qmgr_entry_create(peer, message);
1407
1408 /*
1409 * Add the recipient to the current entry and increase all those
1410 * recipient counters accordingly.
1411 */
1412 recipient_list_add(&entry->rcpt_list, recipient->offset,
1413 recipient->dsn_orcpt, recipient->dsn_notify,
1414 recipient->orig_addr, recipient->address);
1415 job->rcpt_count++;
1416 message->rcpt_count++;
1417 qmgr_recipient_count++;
1418 }
1419
1420 /*
1421 * Release the message recipient list and reinitialize it for the next
1422 * time.
1423 */
1424 recipient_list_free(&message->rcpt_list);
1425 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
1426
1427 /*
1428 * Note that even if qmgr_job_obtain() reset the job candidate cache of
1429 * all transports to which we assigned new recipients, this message may
1430 * have other jobs which we didn't touch at all this time. But the number
1431 * of unread recipients affecting the candidate selection might have
1432 * changed considerably, so we must invalidate the caches if it might be
1433 * of some use.
1434 */
1435 for (job = message->job_list.next; job; job = job->message_peers.next)
1436 if (job->selected_entries < job->read_entries
1437 && job->blocker_tag != job->transport->blocker_tag)
1438 job->transport->candidate_cache_current = 0;
1439 }
1440
1441 /* qmgr_message_move_limits - recycle unused recipient slots */
1442
qmgr_message_move_limits(QMGR_MESSAGE * message)1443 static void qmgr_message_move_limits(QMGR_MESSAGE *message)
1444 {
1445 QMGR_JOB *job;
1446
1447 for (job = message->job_list.next; job; job = job->message_peers.next)
1448 qmgr_job_move_limits(job);
1449 }
1450
1451 /* qmgr_message_free - release memory for in-core message structure */
1452
qmgr_message_free(QMGR_MESSAGE * message)1453 void qmgr_message_free(QMGR_MESSAGE *message)
1454 {
1455 QMGR_JOB *job;
1456
1457 if (message->refcount != 0)
1458 msg_panic("qmgr_message_free: reference len: %d", message->refcount);
1459 if (message->fp)
1460 msg_panic("qmgr_message_free: queue file is open");
1461 while ((job = message->job_list.next) != 0)
1462 qmgr_job_free(job);
1463 myfree(message->queue_id);
1464 myfree(message->queue_name);
1465 if (message->dsn_envid)
1466 myfree(message->dsn_envid);
1467 if (message->encoding)
1468 myfree(message->encoding);
1469 if (message->sender)
1470 myfree(message->sender);
1471 if (message->verp_delims)
1472 myfree(message->verp_delims);
1473 if (message->filter_xport)
1474 myfree(message->filter_xport);
1475 if (message->inspect_xport)
1476 myfree(message->inspect_xport);
1477 if (message->redirect_addr)
1478 myfree(message->redirect_addr);
1479 if (message->client_name)
1480 myfree(message->client_name);
1481 if (message->client_addr)
1482 myfree(message->client_addr);
1483 if (message->client_port)
1484 myfree(message->client_port);
1485 if (message->client_proto)
1486 myfree(message->client_proto);
1487 if (message->client_helo)
1488 myfree(message->client_helo);
1489 if (message->sasl_method)
1490 myfree(message->sasl_method);
1491 if (message->sasl_username)
1492 myfree(message->sasl_username);
1493 if (message->sasl_sender)
1494 myfree(message->sasl_sender);
1495 if (message->log_ident)
1496 myfree(message->log_ident);
1497 if (message->rewrite_context)
1498 myfree(message->rewrite_context);
1499 recipient_list_free(&message->rcpt_list);
1500 qmgr_message_count--;
1501 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0)
1502 qmgr_vrfy_pend_count--;
1503 myfree((void *) message);
1504 }
1505
1506 /* qmgr_message_alloc - create in-core message structure */
1507
qmgr_message_alloc(const char * queue_name,const char * queue_id,int qflags,mode_t mode)1508 QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id,
1509 int qflags, mode_t mode)
1510 {
1511 const char *myname = "qmgr_message_alloc";
1512 QMGR_MESSAGE *message;
1513 struct stat st;
1514
1515 if (msg_verbose)
1516 msg_info("%s: %s %s", myname, queue_name, queue_id);
1517
1518 /*
1519 * Create an in-core message structure.
1520 */
1521 message = qmgr_message_create(queue_name, queue_id, qflags);
1522
1523 /*
1524 * Extract message envelope information: time of arrival, sender address,
1525 * recipient addresses. Skip files with malformed envelope information.
1526 */
1527 #define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT)
1528
1529 if (qmgr_message_open(message) < 0) {
1530 qmgr_message_free(message);
1531 return (0);
1532 }
1533 if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) {
1534 msg_info("%s: skipped, still being delivered", queue_id);
1535 qmgr_message_close(message);
1536 qmgr_message_free(message);
1537 return (QMGR_MESSAGE_LOCKED);
1538 }
1539 if (qmgr_message_read(message) < 0) {
1540 qmgr_message_close(message);
1541 qmgr_message_free(message);
1542 return (0);
1543 } else {
1544
1545 /*
1546 * We have validated the queue file content, so it is safe to modify
1547 * the file properties now.
1548 */
1549 if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0)
1550 msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp));
1551
1552 /*
1553 * If this message is forced to expire, use the existing defer
1554 * logfile records and do not assign any deliveries, leaving the
1555 * refcount at zero. If this message is forced to expire, but no
1556 * defer logfile records are available, assign deliveries to the
1557 * retry transport so that the sender will still find out what
1558 * recipients are affected and why. Either way, do not assign normal
1559 * deliveries because that would be undesirable especially with mail
1560 * that was expired in the 'hold' queue.
1561 */
1562 if ((message->qflags & QMGR_FORCE_EXPIRE) != 0
1563 && stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_DEFER,
1564 queue_id), &st) == 0 && st.st_size > 0) {
1565 /* Use this defer log; don't assign deliveries (refcount == 0). */
1566 message->flags = 1; /* simplify downstream code */
1567 qmgr_message_close(message);
1568 return (message);
1569 }
1570
1571 /*
1572 * Reset the defer log. This code should not be here, but we must
1573 * reset the defer log *after* acquiring the exclusive lock on the
1574 * queue file and *before* resolving new recipients. Since all those
1575 * operations are encapsulated so nicely by this routine, the defer
1576 * log reset has to be done here as well.
1577 *
1578 * Note: it is safe to remove the defer logfile from a previous queue
1579 * run of this queue file, because the defer log contains information
1580 * about recipients that still exist in this queue file.
1581 */
1582 if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT)
1583 msg_fatal("%s: %s: remove %s %s: %m", myname,
1584 queue_id, MAIL_QUEUE_DEFER, queue_id);
1585 qmgr_message_sort(message);
1586 qmgr_message_resolve(message);
1587 qmgr_message_sort(message);
1588 qmgr_message_assign(message);
1589 qmgr_message_close(message);
1590 if (message->rcpt_offset == 0)
1591 qmgr_message_move_limits(message);
1592 return (message);
1593 }
1594 }
1595
1596 /* qmgr_message_realloc - refresh in-core message structure */
1597
qmgr_message_realloc(QMGR_MESSAGE * message)1598 QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message)
1599 {
1600 const char *myname = "qmgr_message_realloc";
1601
1602 /*
1603 * Sanity checks.
1604 */
1605 if (message->rcpt_offset <= 0)
1606 msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset);
1607 if (msg_verbose)
1608 msg_info("%s: %s %s offset %ld", myname, message->queue_name,
1609 message->queue_id, message->rcpt_offset);
1610
1611 /*
1612 * Extract recipient addresses. Skip files with malformed envelope
1613 * information.
1614 */
1615 if (qmgr_message_open(message) < 0)
1616 return (0);
1617 if (qmgr_message_read(message) < 0) {
1618 qmgr_message_close(message);
1619 return (0);
1620 } else {
1621 qmgr_message_sort(message);
1622 qmgr_message_resolve(message);
1623 qmgr_message_sort(message);
1624 qmgr_message_assign(message);
1625 qmgr_message_close(message);
1626 if (message->rcpt_offset == 0)
1627 qmgr_message_move_limits(message);
1628 return (message);
1629 }
1630 }
1631