История изменений
Исправление constin, (текущая версия) :
Судя по result, лучше всего заказать ldapsearch-у аттрибут uid, и достать его по [1]['uid']
не могу, я потом этот резалт много где использую, а запросами не хочу ldap сервер нагружать.
почти готовый скрипт, надо тесты прогнать.
#!/usr/bin/python2.7 -u
#
# by Konstantin Frank
import univention.uldap
import optparse
import sys
import re
import traceback
import syslog
from ldap.filter import filter_format
from univention.config_registry import ConfigRegistry
import univention.admin.modules
usage = "help"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-b", "--ldap_base", dest="ldap_base", help="ldap base")
parser.add_option("-s", "--sender", dest="send_as", help="send_as address (for use with -t)")
parser.add_option("-u", "--user", dest="sasl_username", help="sasl username (for use with -t)")
parser.add_option("-t", "--test", dest="test", help="test run", action="store_true", default=False)
options, args = parser.parse_args()
syslog.openlog(ident="kopano_send_as_filter", logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
ucr = ConfigRegistry()
ucr.load()
univention.admin.modules.update()
usersmod = univention.admin.modules.get("users/user")
_do_debug = True
# should we pass mails from external mail servers without checking? If this case we will have potential mailforged SPAM to us.
pass_not_authenticated = False
def debug(msg, *args):
if _do_debug:
msg = "kopano_send_as_filter: {}".format(msg % args)
if options.test:
sys.stderr.write("{}\n".format(msg))
else:
syslog.syslog(syslog.LOG_DEBUG, msg)
def send_as_filter(attrib):
## TEST## sasl_username = attrib.get("sasl_username", "kfrank")
####!!!!!send_as = attrib.get("sasl_username", None)
# TEST# send_as = attrib.get("send_as", None)
sasl_username = attrib.get("sasl_username", None)
send_as = attrib.get("sender", None)
debug("sasl=%r send_as=%r", sasl_username, send_as)
debug("attrib=%r", attrib)
#####################################################################
# PASS if no sasl auth send_as email is not from out mail domains.
#####################################################################
ucs_mail_domains = ucr.get("mail/hosteddomains").split(' ')
if ucs_mail_domains and not sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
#if not send_as.split('@')[1] in ucs_mail_domains:
return "DUNNO not our domain"
elif ucs_mail_domains and sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
return "REJECT not allowed for authenticated user to send from not our domain"
else:
return "DUNNO Error: cannot get ucs domain list"
#####################################################################
#special exeption for user with login "relay"
#####################################################################
if sasl_username == "relay":
return "DUNNO local relay server"
#####################################################################
if pass_not_authenticated:
if not sasl_username:
return "DUNNO not sasl_authenticated. pass_not_authenticated = True option is enabled"
if not options.ldap_base:
return "443 LDAP base not set."
elif not send_as:
return "REJECT Access denied for empty send_as."
else:
try:
ldap = univention.uldap.getMachineConnection(ldap_master=False, secret_file="/etc/listfilter.secret")
user_dn = ""
users_groups = []
allowed_user_dns = []
alternative_user_address = []
# try the ldap stuff, if that fails send email anyway
# get send_as restriction
ldap_attr = ["kopanoSendAsPrivilege","mailPrimaryAddress"]
ldap_filter = filter_format(
'(|(mailPrimaryAddress=%s)(mailAlternativeAddress=%s)(mail=%s))',
(send_as, send_as, send_as))
result = ldap.search(base=options.ldap_base, filter=ldap_filter, attr=ldap_attr)
if result:
######################################################################
#check if sasl_uid == send_as uid, if yes = pass.
######################################################################
send_as_uid = result[0][0].split(',')[0].split('=')[1]
debug("send_as_uid=%r", send_as_uid)
debug("sasl_username=%r", sasl_username)
if send_as_uid:
if not sasl_username:
return "REJECT Access denied for not authenticated sasl_username to send as %s" % send_as
if send_as_uid == sasl_username:
debug("send_as_uid=%r, sasl_username=%r", send_as_uid, sasl_username)
return "DUNNO user is mail owner"
######################################################################
#check if send_as_uid has kopanoSendAsPrivileg and compare kopanoSendAsPrivileg list with sasl_user uid
######################################################################
# get allowed user dns
for u in result[0][1].get("kopanoSendAsPrivilege", []):
allowed_user_dns.append(u)
# check if there are restrictions, check sasl_username first
if allowed_user_dns:
debug("allowed_user_dns=%r", allowed_user_dns)
if not sasl_username:
return "REJECT Access denied for not authenticated sasl_username to send as user %s" % send_as
# get dn of sasl_username
user_filter = filter_format(
'(uid=%s)',
(sasl_username,)
)
ldap_filter = usersmod.lookup_filter(user_filter)
user_result = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["uidNumber"])
if user_result:
user_dn = user_result[0][1].values()[0][0]
debug("user_dn=%r", user_dn)
# check useruidNiumber in kopanoSendAsPrivilege
if allowed_user_dns and \
user_dn and \
user_dn in allowed_user_dns:
return "DUNNO allowed per user uidNumber"
return "REJECT Access denied for %s to send as user %s" % (sasl_username, send_as)
else:
return "REJECT Access denied for %s to send as user %s" % (sasl_username, send_as)
else:
return "DUNNO %s is not our email address" % send_as
except Exception:
return "DUNNO Error with sasl_username={} send_as={} attrib={}, traceback={}".format(
sasl_username, send_as, attrib, traceback.format_exc().replace("\n", " "))
# main
attr = {}
# testing
if options.test:
_do_debug = True
if not options.sasl_username or not options.send_as:
sys.stderr.write("sasl_username and send_as are required\n")
parser.print_help()
sys.exit(1)
attr["sasl_username"] = options.sasl_username
attr["send_as"] = options.send_as
action = send_as_filter(attr)
print("action={}\n".format(action))
else:
# read from stdin python -u is required for unbufferd streams
while True:
data = sys.stdin.readline()
m = re.match(r'([^=]+)=(.*)\n', data)
if m:
attr[m.group(1).strip()] = m.group(2).strip()
elif data == "\n":
if attr.get("request", None) == "smtpd_access_policy":
action = listfilter(attr)
debug("action=%s", action)
sys.stdout.write("action=%s\n\n" % action)
else:
sys.stderr.write("unknown action in %s\n" % attr)
sys.stdout.write("defer_if_permit Service temporarily unavailable\n")
syslog.syslog(syslog.LOG_ERR, "unknown action in '{}', exiting.".format(attr))
sys.exit(1)
attr = {}
elif data == "":
# Postfix telling us to shutdown (max_idle).
debug("shutting down (max_idle)")
sys.exit(0)
else:
syslog.syslog(syslog.LOG_ERR, "received bad data: '{}', exiting.".format(data))
sys.exit(1)
Исходная версия constin, :
Судя по result, лучше всего заказать ldapsearch-у аттрибут uid, и достать его по [1]['uid']
не могу, я потом этот резалт много где использую, а запросами не хочу ldap сервер нагружать.
почти готовый скрипт, надо тесты прогнать.
#!/usr/bin/python2.7 -u
#
# by Konstantin Frank
import univention.uldap
import optparse
import sys
import re
import traceback
import syslog
from ldap.filter import filter_format
from univention.config_registry import ConfigRegistry
import univention.admin.modules
usage = "help"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-b", "--ldap_base", dest="ldap_base", help="ldap base")
parser.add_option("-s", "--sender", dest="send_as", help="send_as address (for use with -t)")
parser.add_option("-u", "--user", dest="sasl_username", help="sasl username (for use with -t)")
parser.add_option("-t", "--test", dest="test", help="test run", action="store_true", default=False)
options, args = parser.parse_args()
syslog.openlog(ident="kopano_send_as_filter", logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
ucr = ConfigRegistry()
ucr.load()
univention.admin.modules.update()
usersmod = univention.admin.modules.get("users/user")
_do_debug = True
# should we pass mails from external mail servers without checking? If this case we will have potential mailforged SPAM to us.
pass_not_authenticated = False
def debug(msg, *args):
if _do_debug:
msg = "kopano_send_as_filter: {}".format(msg % args)
if options.test:
sys.stderr.write("{}\n".format(msg))
else:
syslog.syslog(syslog.LOG_DEBUG, msg)
def send_as_filter(attrib):
sasl_username = attrib.get("sasl_username", "kfrank")
####!!!!!send_as = attrib.get("sasl_username", None)
send_as = attrib.get("send_as", None)
#sasl_username = attrib.get("sasl_username", None)
#send_as = attrib.get("sender", None)
debug("sasl=%r send_as=%r", sasl_username, send_as)
debug("attrib=%r", attrib)
#####################################################################
# PASS if no sasl auth send_as email is not from out mail domains.
#####################################################################
ucs_mail_domains = ucr.get("mail/hosteddomains").split(' ')
if ucs_mail_domains and not sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
#if not send_as.split('@')[1] in ucs_mail_domains:
return "DUNNO not our domain"
elif ucs_mail_domains and sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
return "REJECT not allowed for authenticated user to send from not our domain"
else:
return "DUNNO Error: cannot get ucs domain list"
#####################################################################
#special exeption for user with login "relay"
#####################################################################
if sasl_username == "relay":
return "DUNNO local relay server"
#####################################################################
if pass_not_authenticated:
if not sasl_username:
return "DUNNO not sasl_authenticated. pass_not_authenticated = True option is enabled"
if not options.ldap_base:
return "443 LDAP base not set."
elif not send_as:
return "REJECT Access denied for empty send_as."
else:
try:
ldap = univention.uldap.getMachineConnection(ldap_master=False, secret_file="/etc/listfilter.secret")
user_dn = ""
users_groups = []
allowed_user_dns = []
alternative_user_address = []
# try the ldap stuff, if that fails send email anyway
# get send_as restriction
ldap_attr = ["kopanoSendAsPrivilege","mailPrimaryAddress"]
ldap_filter = filter_format(
'(|(mailPrimaryAddress=%s)(mailAlternativeAddress=%s)(mail=%s))',
(send_as, send_as, send_as))
result = ldap.search(base=options.ldap_base, filter=ldap_filter, attr=ldap_attr)
if result:
######################################################################
#check if sasl_uid == send_as uid, if yes = pass.
######################################################################
send_as_uid = result[0][0].split(',')[0].split('=')[1]
debug("send_as_uid=%r", send_as_uid)
debug("sasl_username=%r", sasl_username)
if send_as_uid:
if not sasl_username:
return "REJECT Access denied for not authenticated sasl_username to send as %s" % send_as
if send_as_uid == sasl_username:
debug("send_as_uid=%r, sasl_username=%r", send_as_uid, sasl_username)
return "DUNNO user is mail owner"
######################################################################
#check if send_as_uid has kopanoSendAsPrivileg and compare kopanoSendAsPrivileg list with sasl_user uid
######################################################################
# get allowed user dns
for u in result[0][1].get("kopanoSendAsPrivilege", []):
allowed_user_dns.append(u)
# check if there are restrictions, check sasl_username first
if allowed_user_dns:
debug("allowed_user_dns=%r", allowed_user_dns)
if not sasl_username:
return "REJECT Access denied for not authenticated sasl_username to send as user %s" % send_as
# get dn of sasl_username
user_filter = filter_format(
'(uid=%s)',
(sasl_username,)
)
ldap_filter = usersmod.lookup_filter(user_filter)
user_result = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["uidNumber"])
if user_result:
user_dn = user_result[0][1].values()[0][0]
debug("user_dn=%r", user_dn)
# check useruidNiumber in kopanoSendAsPrivilege
if allowed_user_dns and \
user_dn and \
user_dn in allowed_user_dns:
return "DUNNO allowed per user uidNumber"
return "REJECT Access denied for %s to send as user %s" % (sasl_username, send_as)
else:
return "REJECT Access denied for %s to send as user %s" % (sasl_username, send_as)
else:
return "DUNNO %s is not our email address" % send_as
except Exception:
return "DUNNO Error with sasl_username={} send_as={} attrib={}, traceback={}".format(
sasl_username, send_as, attrib, traceback.format_exc().replace("\n", " "))
# main
attr = {}
# testing
if options.test:
_do_debug = True
if not options.sasl_username or not options.send_as:
sys.stderr.write("sasl_username and send_as are required\n")
parser.print_help()
sys.exit(1)
attr["sasl_username"] = options.sasl_username
attr["send_as"] = options.send_as
action = send_as_filter(attr)
print("action={}\n".format(action))
else:
# read from stdin python -u is required for unbufferd streams
while True:
data = sys.stdin.readline()
m = re.match(r'([^=]+)=(.*)\n', data)
if m:
attr[m.group(1).strip()] = m.group(2).strip()
elif data == "\n":
if attr.get("request", None) == "smtpd_access_policy":
action = listfilter(attr)
debug("action=%s", action)
sys.stdout.write("action=%s\n\n" % action)
else:
sys.stderr.write("unknown action in %s\n" % attr)
sys.stdout.write("defer_if_permit Service temporarily unavailable\n")
syslog.syslog(syslog.LOG_ERR, "unknown action in '{}', exiting.".format(attr))
sys.exit(1)
attr = {}
elif data == "":
# Postfix telling us to shutdown (max_idle).
debug("shutting down (max_idle)")
sys.exit(0)
else:
syslog.syslog(syslog.LOG_ERR, "received bad data: '{}', exiting.".format(data))
sys.exit(1)