LINUX.ORG.RU

питонщики, помогите избавиться от нагромождения строк.

 


1

1
result=[('uid=canscan,cn=mail,cn=users,dc=domain,dc=localnet', {'mailPrimaryAddress': ['canscan@domain.com']})]

из этого мне надо получить canscan.

Я Питоне как корова на льду, и вот какой п-ц у меня получился.

                       if result:


                                send_as_uid = result[0][0]
                                send_as_uid = send_as_uid.split(',')
                                send_as_uid = str(send_as_uid[0])
                                send_as_uid = send_as_uid.split('=')
                                send_as_uid = str(send_as_uid[1])
                                debug("send_as_uid=%r", send_as_uid)

output:

send_as_uid='canscan'

наверно можно это сделать покороче?:)

★★★★

Последнее исправление: constin (всего исправлений: 2)
dict(el.split('=', 1) for el in result[0][0].split(','))['uid']

или регуляркой
pawnhearts ★★★★★
()
Последнее исправление: pawnhearts (всего исправлений: 1)
Ответ на: комментарий от Yorween

result[0][0].split(',')[0].split('=')[1]

ну это тоже самое, только в одну строчку. Так я тоже могу.

а нельзя как-то красиво типа

result[0][1].get("mailPrimaryAddress", []):

только с uid?

constin ★★★★
() автор топика
Ответ на: комментарий от constin

Не знаю, как тут применить «красиво» :)

Попробуй как предложил pawnhearts через регулярку.

import re
re.search('(uid=)([a-z]+)(,cn=)', str(result)).group(2)

Yorween
()
Ответ на: комментарий от constin

хотя, пойдет

Заверни хотя бы в функцию, типа extract_data(result, 'uid'), или как-то так.

no-such-file ★★★★★
()
(dev)  sergey@sergey-pc  ~/Downloads/gnatsd-v1.4.0-linux-amd64  ptpython
>>> s = 'uid=canscan,cn=mail,cn=users,dc=domain,dc=localnet'

>>> s.split('uid=')[1].split(',')[0]
'canscan'
tz4678 ★★
()

странно что никто не предложил взять слайс до запятой :3 наверно эффективней чем с find не получится

anonymous
()
result=[('uid=canscan,cn=mail,cn=users,dc=domain,dc=localnet', {'mailPrimaryAddress': ['canscan@domain.com']})]

def convert(data):
    def f(row):
        ldap, options = row

        return {
            'ldap': dict(re.findall(r'(\w+)=(\w+),*', ldap)),
            'options': options}

    return map(f, data)

list(convert(result))

Например

anonymous
()
Ответ на: комментарий от anonymous

При желании содержимое f можешь впихнуть в лямбду map'а

anonymous
()
Ответ на: комментарий от anonymous
result=[('uid=canscan,cn=mail,cn=users,dc=domain,dc=localnet', {'mailPrimaryAddress': ['canscan@domain.com']})]

list(map(lambda row: {'ldap': dict(re.findall(r'(\w+)=(\w+),*', row[0])), 'options': row[1]}, result))

Вот вариант в одну строку

anonymous
()
Ответ на: комментарий от anonymous
list(map(lambda row: {'ldap': dict(re.findall(r'(\w+)=(\w+),*', row[0])), 'options': row[1]}, result))[0]['ldap']['uid']

Что бы было понятно

anonymous
()
def extract_uid(result):
    r = result[0][0]         # ?
    si = r.find('uid=') + 4
    ei = r.find(',', si)
    return r[si:ei]

...

if result:
    uid = extract_uid(result)
    debug("send_as_uid=%r", uid)

anonymous
()
Ответ на: комментарий от DonkeyHot

Судя по result, лучше всего заказать ldapsearch-у аттрибут uid, и достать его по [1]['uid']

не могу, я потом этот резалт много где использую, а запросами не хочу ldap сервер нагружать.

почти готовый скрипт, надо тесты прогнать.


#!/usr/bin/python2.7 -u
#
# by Konstantin Frank


import univention.uldap
import optparse
import sys
import re
import traceback
import syslog
from ldap.filter import filter_format
from univention.config_registry import ConfigRegistry
import univention.admin.modules

usage = "help"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-b", "--ldap_base", dest="ldap_base", help="ldap base")
parser.add_option("-s", "--sender", dest="send_as", help="send_as address (for use with -t)")
parser.add_option("-u", "--user", dest="sasl_username", help="sasl username (for use with -t)")
parser.add_option("-t", "--test", dest="test", help="test run", action="store_true", default=False)
options, args = parser.parse_args()

syslog.openlog(ident="kopano_send_as_filter", logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
ucr = ConfigRegistry()
ucr.load()
univention.admin.modules.update()
usersmod = univention.admin.modules.get("users/user")
_do_debug = True

# should we pass mails from external mail servers without checking?  If this case we will have potential mailforged SPAM to us.
pass_not_authenticated = False


def debug(msg, *args):
	if _do_debug:
		msg = "kopano_send_as_filter: {}".format(msg % args)
		if options.test:
			sys.stderr.write("{}\n".format(msg))
		else:
			syslog.syslog(syslog.LOG_DEBUG, msg)


def send_as_filter(attrib):
	## TEST## sasl_username = attrib.get("sasl_username", "kfrank")
	####!!!!!send_as = attrib.get("sasl_username", None)
	# TEST# send_as = attrib.get("send_as", None)


	sasl_username = attrib.get("sasl_username", None)
        send_as = attrib.get("sender", None)

	

	debug("sasl=%r send_as=%r", sasl_username, send_as)
	debug("attrib=%r", attrib)
	
	#####################################################################
	# PASS if no sasl auth send_as email is not from out mail domains.
	#####################################################################
	
	
	ucs_mail_domains = ucr.get("mail/hosteddomains").split(' ')
	if ucs_mail_domains and not sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
	    #if not send_as.split('@')[1] in ucs_mail_domains:
	    return "DUNNO not our domain"
	elif ucs_mail_domains and sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
	    return "REJECT not allowed for authenticated user to send from not our domain"
	else:
	    return "DUNNO Error: cannot get ucs domain list"

	#####################################################################
	#special exeption for user with login "relay"
	#####################################################################
	if sasl_username == "relay":
	    return "DUNNO local relay server"
	#####################################################################

	if pass_not_authenticated:
		if not sasl_username:
			return "DUNNO not sasl_authenticated. pass_not_authenticated = True option is enabled"

	if not options.ldap_base:
		return "443 LDAP base not set."
	elif not send_as:
		return "REJECT Access denied for empty send_as."
	else:
		try:
			ldap = univention.uldap.getMachineConnection(ldap_master=False, secret_file="/etc/listfilter.secret")

			user_dn = ""
			users_groups = []
			allowed_user_dns = []
			alternative_user_address = []




			# try the ldap stuff, if that fails send email anyway
			# get send_as restriction
			ldap_attr = ["kopanoSendAsPrivilege","mailPrimaryAddress"]
			ldap_filter = filter_format(
				'(|(mailPrimaryAddress=%s)(mailAlternativeAddress=%s)(mail=%s))',
                                (send_as, send_as, send_as))
			result = ldap.search(base=options.ldap_base, filter=ldap_filter, attr=ldap_attr)



			if result:
    
				######################################################################
				#check if sasl_uid == send_as uid, if yes = pass.
				######################################################################


				send_as_uid = result[0][0].split(',')[0].split('=')[1]
				debug("send_as_uid=%r", send_as_uid)
				debug("sasl_username=%r", sasl_username)
				
				if send_as_uid:
				    if not sasl_username:
                                                return "REJECT Access denied for not authenticated sasl_username to send as %s" % send_as
				    if send_as_uid == sasl_username:
                            		debug("send_as_uid=%r, sasl_username=%r", send_as_uid, sasl_username)
                            		return "DUNNO user is mail owner"



				######################################################################
                                #check if send_as_uid has kopanoSendAsPrivileg and compare kopanoSendAsPrivileg list with sasl_user uid
                                ######################################################################

				
				# get allowed user dns
				for u in result[0][1].get("kopanoSendAsPrivilege", []):
				    allowed_user_dns.append(u)

				# check if there are restrictions, check sasl_username first
				if allowed_user_dns:
					debug("allowed_user_dns=%r", allowed_user_dns)
					if not sasl_username:
						return "REJECT Access denied for not authenticated sasl_username to send as user %s" % send_as 

					# get dn of sasl_username
					user_filter = filter_format(
						'(uid=%s)',
						(sasl_username,)
					)
					ldap_filter = usersmod.lookup_filter(user_filter)
					user_result = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["uidNumber"])
					if user_result:
						user_dn = user_result[0][1].values()[0][0]
						debug("user_dn=%r", user_dn)

						# check useruidNiumber in kopanoSendAsPrivilege
						if allowed_user_dns and \
								user_dn and \
								user_dn in allowed_user_dns:
							return "DUNNO allowed per user uidNumber"

					return "REJECT Access denied for %s to send as user  %s" % (sasl_username, send_as)
				else:
					return "REJECT Access denied for %s to send as user  %s" % (sasl_username, send_as)
			else:
				return "DUNNO %s is not our email address" % send_as
		except Exception:
			return "DUNNO Error with sasl_username={} send_as={} attrib={}, traceback={}".format(
				sasl_username, send_as, attrib, traceback.format_exc().replace("\n", " "))

# main
attr = {}

# testing
if options.test:
	_do_debug = True
	if not options.sasl_username or not options.send_as:
		sys.stderr.write("sasl_username and send_as are required\n")
		parser.print_help()
		sys.exit(1)
	attr["sasl_username"] = options.sasl_username
	attr["send_as"] = options.send_as
	action = send_as_filter(attr)
	print("action={}\n".format(action))
else:
	# read from stdin python -u is required for unbufferd streams
	while True:
		data = sys.stdin.readline()
		m = re.match(r'([^=]+)=(.*)\n', data)
		if m:
			attr[m.group(1).strip()] = m.group(2).strip()

		elif data == "\n":
			if attr.get("request", None) == "smtpd_access_policy":
				action = listfilter(attr)
				debug("action=%s", action)
				sys.stdout.write("action=%s\n\n" % action)
			else:
				sys.stderr.write("unknown action in %s\n" % attr)
				sys.stdout.write("defer_if_permit Service temporarily unavailable\n")
				syslog.syslog(syslog.LOG_ERR, "unknown action in '{}', exiting.".format(attr))
				sys.exit(1)
			attr = {}
		elif data == "":
			# Postfix telling us to shutdown (max_idle).
			debug("shutting down (max_idle)")
			sys.exit(0)
		else:
			syslog.syslog(syslog.LOG_ERR, "received bad data: '{}', exiting.".format(data))
			sys.exit(1)
constin ★★★★
() автор топика
Последнее исправление: constin (всего исправлений: 1)
Ответ на: комментарий от constin

str.find() возвращает индекс подстроки. 4 — это количество символов подстроки «uid=», то есть начальный индекс равен индексу подстроки «uid=» + 4 её символа.

если подстрока не найдена, str.find() вернёт -1, второй вызов, соответственно, тоже. результат r[-1:-1] — пустая строка.

единственное, надо смотреть, чтобы в начальной строке не встретилось, например, «guid=...» перед «uid=». ну и [[0]][[0]] эти дурацкие бы расписать, а то непонятно. а хотя...

anonymous
()
Ответ на: комментарий от oldstable

Это иллюстрация того что нужно немного уметь в пограммирование. выше есть код который парсит всю строку ldap'а в словарь.

anonymous
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.