LINUX.ORG.RU

История изменений

Исправление constin, (текущая версия) :

Судя по result, лучше всего заказать ldapsearch-у аттрибут uid, и достать его по [1]['uid']

не могу, я потом этот резалт много где использую, а запросами не хочу ldap сервер нагружать.

почти готовый скрипт, надо тесты прогнать.


#!/usr/bin/python2.7 -u
#
# by Konstantin Frank


import univention.uldap
import optparse
import sys
import re
import traceback
import syslog
from ldap.filter import filter_format
from univention.config_registry import ConfigRegistry
import univention.admin.modules

usage = "help"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-b", "--ldap_base", dest="ldap_base", help="ldap base")
parser.add_option("-s", "--sender", dest="send_as", help="send_as address (for use with -t)")
parser.add_option("-u", "--user", dest="sasl_username", help="sasl username (for use with -t)")
parser.add_option("-t", "--test", dest="test", help="test run", action="store_true", default=False)
options, args = parser.parse_args()

syslog.openlog(ident="kopano_send_as_filter", logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
ucr = ConfigRegistry()
ucr.load()
univention.admin.modules.update()
usersmod = univention.admin.modules.get("users/user")
_do_debug = True

# should we pass mails from external mail servers without checking?  If this case we will have potential mailforged SPAM to us.
pass_not_authenticated = False


def debug(msg, *args):
	if _do_debug:
		msg = "kopano_send_as_filter: {}".format(msg % args)
		if options.test:
			sys.stderr.write("{}\n".format(msg))
		else:
			syslog.syslog(syslog.LOG_DEBUG, msg)


def send_as_filter(attrib):
	## TEST## sasl_username = attrib.get("sasl_username", "kfrank")
	####!!!!!send_as = attrib.get("sasl_username", None)
	# TEST# send_as = attrib.get("send_as", None)


	sasl_username = attrib.get("sasl_username", None)
        send_as = attrib.get("sender", None)

	

	debug("sasl=%r send_as=%r", sasl_username, send_as)
	debug("attrib=%r", attrib)
	
	#####################################################################
	# PASS if no sasl auth send_as email is not from out mail domains.
	#####################################################################
	
	
	ucs_mail_domains = ucr.get("mail/hosteddomains").split(' ')
	if ucs_mail_domains and not sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
	    #if not send_as.split('@')[1] in ucs_mail_domains:
	    return "DUNNO not our domain"
	elif ucs_mail_domains and sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
	    return "REJECT not allowed for authenticated user to send from not our domain"
	else:
	    return "DUNNO Error: cannot get ucs domain list"

	#####################################################################
	#special exeption for user with login "relay"
	#####################################################################
	if sasl_username == "relay":
	    return "DUNNO local relay server"
	#####################################################################

	if pass_not_authenticated:
		if not sasl_username:
			return "DUNNO not sasl_authenticated. pass_not_authenticated = True option is enabled"

	if not options.ldap_base:
		return "443 LDAP base not set."
	elif not send_as:
		return "REJECT Access denied for empty send_as."
	else:
		try:
			ldap = univention.uldap.getMachineConnection(ldap_master=False, secret_file="/etc/listfilter.secret")

			user_dn = ""
			users_groups = []
			allowed_user_dns = []
			alternative_user_address = []




			# try the ldap stuff, if that fails send email anyway
			# get send_as restriction
			ldap_attr = ["kopanoSendAsPrivilege","mailPrimaryAddress"]
			ldap_filter = filter_format(
				'(|(mailPrimaryAddress=%s)(mailAlternativeAddress=%s)(mail=%s))',
                                (send_as, send_as, send_as))
			result = ldap.search(base=options.ldap_base, filter=ldap_filter, attr=ldap_attr)



			if result:
    
				######################################################################
				#check if sasl_uid == send_as uid, if yes = pass.
				######################################################################


				send_as_uid = result[0][0].split(',')[0].split('=')[1]
				debug("send_as_uid=%r", send_as_uid)
				debug("sasl_username=%r", sasl_username)
				
				if send_as_uid:
				    if not sasl_username:
                                                return "REJECT Access denied for not authenticated sasl_username to send as %s" % send_as
				    if send_as_uid == sasl_username:
                            		debug("send_as_uid=%r, sasl_username=%r", send_as_uid, sasl_username)
                            		return "DUNNO user is mail owner"



				######################################################################
                                #check if send_as_uid has kopanoSendAsPrivileg and compare kopanoSendAsPrivileg list with sasl_user uid
                                ######################################################################

				
				# get allowed user dns
				for u in result[0][1].get("kopanoSendAsPrivilege", []):
				    allowed_user_dns.append(u)

				# check if there are restrictions, check sasl_username first
				if allowed_user_dns:
					debug("allowed_user_dns=%r", allowed_user_dns)
					if not sasl_username:
						return "REJECT Access denied for not authenticated sasl_username to send as user %s" % send_as 

					# get dn of sasl_username
					user_filter = filter_format(
						'(uid=%s)',
						(sasl_username,)
					)
					ldap_filter = usersmod.lookup_filter(user_filter)
					user_result = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["uidNumber"])
					if user_result:
						user_dn = user_result[0][1].values()[0][0]
						debug("user_dn=%r", user_dn)

						# check useruidNiumber in kopanoSendAsPrivilege
						if allowed_user_dns and \
								user_dn and \
								user_dn in allowed_user_dns:
							return "DUNNO allowed per user uidNumber"

					return "REJECT Access denied for %s to send as user  %s" % (sasl_username, send_as)
				else:
					return "REJECT Access denied for %s to send as user  %s" % (sasl_username, send_as)
			else:
				return "DUNNO %s is not our email address" % send_as
		except Exception:
			return "DUNNO Error with sasl_username={} send_as={} attrib={}, traceback={}".format(
				sasl_username, send_as, attrib, traceback.format_exc().replace("\n", " "))

# main
attr = {}

# testing
if options.test:
	_do_debug = True
	if not options.sasl_username or not options.send_as:
		sys.stderr.write("sasl_username and send_as are required\n")
		parser.print_help()
		sys.exit(1)
	attr["sasl_username"] = options.sasl_username
	attr["send_as"] = options.send_as
	action = send_as_filter(attr)
	print("action={}\n".format(action))
else:
	# read from stdin python -u is required for unbufferd streams
	while True:
		data = sys.stdin.readline()
		m = re.match(r'([^=]+)=(.*)\n', data)
		if m:
			attr[m.group(1).strip()] = m.group(2).strip()

		elif data == "\n":
			if attr.get("request", None) == "smtpd_access_policy":
				action = listfilter(attr)
				debug("action=%s", action)
				sys.stdout.write("action=%s\n\n" % action)
			else:
				sys.stderr.write("unknown action in %s\n" % attr)
				sys.stdout.write("defer_if_permit Service temporarily unavailable\n")
				syslog.syslog(syslog.LOG_ERR, "unknown action in '{}', exiting.".format(attr))
				sys.exit(1)
			attr = {}
		elif data == "":
			# Postfix telling us to shutdown (max_idle).
			debug("shutting down (max_idle)")
			sys.exit(0)
		else:
			syslog.syslog(syslog.LOG_ERR, "received bad data: '{}', exiting.".format(data))
			sys.exit(1)

Исходная версия constin, :

Судя по result, лучше всего заказать ldapsearch-у аттрибут uid, и достать его по [1]['uid']

не могу, я потом этот резалт много где использую, а запросами не хочу ldap сервер нагружать.

почти готовый скрипт, надо тесты прогнать.


#!/usr/bin/python2.7 -u
#
# by Konstantin Frank


import univention.uldap
import optparse
import sys
import re
import traceback
import syslog
from ldap.filter import filter_format
from univention.config_registry import ConfigRegistry
import univention.admin.modules

usage = "help"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-b", "--ldap_base", dest="ldap_base", help="ldap base")
parser.add_option("-s", "--sender", dest="send_as", help="send_as address (for use with -t)")
parser.add_option("-u", "--user", dest="sasl_username", help="sasl username (for use with -t)")
parser.add_option("-t", "--test", dest="test", help="test run", action="store_true", default=False)
options, args = parser.parse_args()

syslog.openlog(ident="kopano_send_as_filter", logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
ucr = ConfigRegistry()
ucr.load()
univention.admin.modules.update()
usersmod = univention.admin.modules.get("users/user")
_do_debug = True

# should we pass mails from external mail servers without checking?  If this case we will have potential mailforged SPAM to us.
pass_not_authenticated = False


def debug(msg, *args):
	if _do_debug:
		msg = "kopano_send_as_filter: {}".format(msg % args)
		if options.test:
			sys.stderr.write("{}\n".format(msg))
		else:
			syslog.syslog(syslog.LOG_DEBUG, msg)


def send_as_filter(attrib):
	sasl_username = attrib.get("sasl_username", "kfrank")
	####!!!!!send_as = attrib.get("sasl_username", None)
	send_as = attrib.get("send_as", None)


	#sasl_username = attrib.get("sasl_username", None)
        #send_as = attrib.get("sender", None)

	

	debug("sasl=%r send_as=%r", sasl_username, send_as)
	debug("attrib=%r", attrib)
	
	#####################################################################
	# PASS if no sasl auth send_as email is not from out mail domains.
	#####################################################################
	
	
	ucs_mail_domains = ucr.get("mail/hosteddomains").split(' ')
	if ucs_mail_domains and not sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
	    #if not send_as.split('@')[1] in ucs_mail_domains:
	    return "DUNNO not our domain"
	elif ucs_mail_domains and sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
	    return "REJECT not allowed for authenticated user to send from not our domain"
	else:
	    return "DUNNO Error: cannot get ucs domain list"

	#####################################################################
	#special exeption for user with login "relay"
	#####################################################################
	if sasl_username == "relay":
	    return "DUNNO local relay server"
	#####################################################################

	if pass_not_authenticated:
		if not sasl_username:
			return "DUNNO not sasl_authenticated. pass_not_authenticated = True option is enabled"

	if not options.ldap_base:
		return "443 LDAP base not set."
	elif not send_as:
		return "REJECT Access denied for empty send_as."
	else:
		try:
			ldap = univention.uldap.getMachineConnection(ldap_master=False, secret_file="/etc/listfilter.secret")

			user_dn = ""
			users_groups = []
			allowed_user_dns = []
			alternative_user_address = []




			# try the ldap stuff, if that fails send email anyway
			# get send_as restriction
			ldap_attr = ["kopanoSendAsPrivilege","mailPrimaryAddress"]
			ldap_filter = filter_format(
				'(|(mailPrimaryAddress=%s)(mailAlternativeAddress=%s)(mail=%s))',
                                (send_as, send_as, send_as))
			result = ldap.search(base=options.ldap_base, filter=ldap_filter, attr=ldap_attr)



			if result:
    
				######################################################################
				#check if sasl_uid == send_as uid, if yes = pass.
				######################################################################


				send_as_uid = result[0][0].split(',')[0].split('=')[1]
				debug("send_as_uid=%r", send_as_uid)
				debug("sasl_username=%r", sasl_username)
				
				if send_as_uid:
				    if not sasl_username:
                                                return "REJECT Access denied for not authenticated sasl_username to send as %s" % send_as
				    if send_as_uid == sasl_username:
                            		debug("send_as_uid=%r, sasl_username=%r", send_as_uid, sasl_username)
                            		return "DUNNO user is mail owner"



				######################################################################
                                #check if send_as_uid has kopanoSendAsPrivileg and compare kopanoSendAsPrivileg list with sasl_user uid
                                ######################################################################

				
				# get allowed user dns
				for u in result[0][1].get("kopanoSendAsPrivilege", []):
				    allowed_user_dns.append(u)

				# check if there are restrictions, check sasl_username first
				if allowed_user_dns:
					debug("allowed_user_dns=%r", allowed_user_dns)
					if not sasl_username:
						return "REJECT Access denied for not authenticated sasl_username to send as user %s" % send_as 

					# get dn of sasl_username
					user_filter = filter_format(
						'(uid=%s)',
						(sasl_username,)
					)
					ldap_filter = usersmod.lookup_filter(user_filter)
					user_result = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["uidNumber"])
					if user_result:
						user_dn = user_result[0][1].values()[0][0]
						debug("user_dn=%r", user_dn)

						# check useruidNiumber in kopanoSendAsPrivilege
						if allowed_user_dns and \
								user_dn and \
								user_dn in allowed_user_dns:
							return "DUNNO allowed per user uidNumber"

					return "REJECT Access denied for %s to send as user  %s" % (sasl_username, send_as)
				else:
					return "REJECT Access denied for %s to send as user  %s" % (sasl_username, send_as)
			else:
				return "DUNNO %s is not our email address" % send_as
		except Exception:
			return "DUNNO Error with sasl_username={} send_as={} attrib={}, traceback={}".format(
				sasl_username, send_as, attrib, traceback.format_exc().replace("\n", " "))

# main
attr = {}

# testing
if options.test:
	_do_debug = True
	if not options.sasl_username or not options.send_as:
		sys.stderr.write("sasl_username and send_as are required\n")
		parser.print_help()
		sys.exit(1)
	attr["sasl_username"] = options.sasl_username
	attr["send_as"] = options.send_as
	action = send_as_filter(attr)
	print("action={}\n".format(action))
else:
	# read from stdin python -u is required for unbufferd streams
	while True:
		data = sys.stdin.readline()
		m = re.match(r'([^=]+)=(.*)\n', data)
		if m:
			attr[m.group(1).strip()] = m.group(2).strip()

		elif data == "\n":
			if attr.get("request", None) == "smtpd_access_policy":
				action = listfilter(attr)
				debug("action=%s", action)
				sys.stdout.write("action=%s\n\n" % action)
			else:
				sys.stderr.write("unknown action in %s\n" % attr)
				sys.stdout.write("defer_if_permit Service temporarily unavailable\n")
				syslog.syslog(syslog.LOG_ERR, "unknown action in '{}', exiting.".format(attr))
				sys.exit(1)
			attr = {}
		elif data == "":
			# Postfix telling us to shutdown (max_idle).
			debug("shutting down (max_idle)")
			sys.exit(0)
		else:
			syslog.syslog(syslog.LOG_ERR, "received bad data: '{}', exiting.".format(data))
			sys.exit(1)