LINUX.ORG.RU

[postgresql][LISTEN][NOTYFY][py-postgresql] мучаюсь с межпроцессными сообщениями...

 


0

1

Есть в новом постгресе-9 такая фича: когда один процесс меняет базу он может в спец созданный канал слать спец сообщение, а другие процессы прослушивают этот канал и получают уведомления о изменениях....

Postgres-9 Прочёл: http://www.postgresql.org/docs/current/static/sql-notify.html http://www.postgresql.org/docs/current/static/sql-listen.html http://www.postgresql.org/docs/current/static/sql-unlisten.html

В psql оно работает, но в принимающем процессе я получаю уведомления только после команды UNLISTEN <канал>

py-postgres-1.0 Прочёл: http://python.projects.postgresql.org/docs/1.0/notifyman.html но не смог разобраться.. Есть, допустим, канал signals, как с него получить переменную payload??

Кто уже работал с LISTEN NOTYFY?


Ответ на: комментарий от sdh

PSQL:

monitoring=> LISTEN signals;
LISTEN
monitoring=> UNLISTEN signals;
UNLISTEN
Asynchronous notification "signals" with payload "33974" received from server process with PID 12250.
Asynchronous notification "signals" with payload "33975" received from server process with PID 12361.
monitoring=> 

PY-POSTGRESQL:

#!/usr/bin/python3
"""-*- coding: <UTF-8> -*-"""
import postgresql

monitoring = postgresql.open('pq://user@server/monitoring')
monitoring.listen('signals')

for x in monitoring.iternotifies(20):
        if x is None:
                break

print (x)

За этот же промежуток времени python выводит х:

None

Хотя в канале signals в это время сообщения были...

sdh
() автор топика
Ответ на: комментарий от x905

И где и как селектом слушать внутренний сигнал постгреса?

PSQL:

monitoring=> NOTIFY signals, '1111';

Где его ловить?

Если с py-postgres не получится то напишу на psql...

sdh
() автор топика
Ответ на: комментарий от sdh

psql может «увидеть» посланный notify и без отключения unlisten - после любого запроса, например 'select 1'
подавать select 1 нужно в том же подключении, которое и встало на прослушку через listen, затем смотреть состояние соединения - посмотри исходника psql для уточнения

но это подход ущербен т.к. загружает канал и будет задержка равная интервалу опроса select; правильнее делать через прослушивание через poll(select) сокета соединения или взять готовое решение, например qt4 - там при получении сигнала будет событие; в qt3 нет этой поддержки - там я писал select на сокет соединения

x905 ★★★★★
()
Ответ на: комментарий от x905

psql может «увидеть» посланный notify и без отключения unlisten - после любого запроса, например 'select 1'

подавать select 1 нужно в том же подключении, которое и встало на прослушку через listen, затем смотреть состояние соединения - посмотри исходника psql для уточнения

Спасибо, помогло.

но это подход ущербен т.к. загружает канал и будет задержка равная интервалу опроса select; правильнее делать через прослушивание через poll(select) сокета соединения или взять готовое решение, например qt4 - там при получении сигнала будет событие;


Пока черновичок пишу, с qt4 разбираться не очень хочу... Вот бы с питонным менеджером сообщений разобраться и этого хватит.

http://python.projects.postgresql.org/docs/1.0/notifyman.html

Примеры:
http://goo.gl/m9QMK
http://goo.gl/K4cDj
Не работают!

sdh
() автор топика
#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
import select
import psycopg2
import psycopg2.extensions

def plain_version(s):
    a = s.split('.')
    return int(a[0])*1000+int(a[1])*100+int(a[2][:2])

ASYNCHRONOUS_SUPPORT = plain_version(psycopg2.__version__) >= plain_version('2.2.0')


def main():
    try:
        conn = psycopg2.connect("host=%(host)s dbname=%(dbname)s user=%(user)s password=%(password)s" % {'dbname': dbname, 'user': user, 'password': password, 'host': host})
        conn.set_isolation_level( psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    except psycopg2.OperationalError, e:
        print "Connection error: %s" % str(e)
        sys.exit(1)
    curs = conn.cursor()
    curs.execute("LISTEN att_get_text;")

    rlist = []
    if ASYNCHRONOUS_SUPPORT:
       rlist = [self.conn]
    else:
       rlist = [curs]

    while True:
       if select.select(rlist,[],[],5) == ([],[],[]):
           timeout()
       else:
           if ASYNCHRONOUS_SUPPORT:
              conn.poll()
              while conn.notifies:
                    notify = conn.notifies.pop()
                    got_notify()
           elif curs.isready():
                got_notify()


def got_nitify():
    pass

def timeout():
    pass

bukaka
()
Ответ на: комментарий от sdh

Нашёл ошибку!

#!/usr/bin/python3
"""-*- coding: <UTF-8> -*-"""
import postgresql

monitoring = postgresql.open('pq://user@server/monitoring')
monitoring.listen('signals')

for x in monitoring.iternotifies():
	print (x[0], x[1])
quit ()
sdh
() автор топика

и, к стати, LISTEN NOTIFY появились за долго до 9 версии. в 9ке добавился параметр у сигнала

bukaka
()
Ответ на: комментарий от bukaka

Этим параметром удобно нужную инфу передавать.

В общем для прослушки многих каналов с многих баз:

#!/usr/bin/python3
"""-*- coding: <UTF-8> -*-"""
import postgresql
from postgresql.notifyman import NotificationManager

monitoring0 = postgresql.open('pq://user0@server0/monitoring0')
monitoring1 = postgresql.open('pq://user1@server1/monitoring1')

monitoring0.listen('monitoring0_signals0')
monitoring0.listen('monitoring0_signals1')
monitoring1.listen('monitoring1_signals0')
monitoring1.listen('monitoring1_signals1')

nm = NotificationManager(monitoring0, monitoring1)
nm.settimeout(0.1)
for x in nm:    
    if x != None:
       monitoring, notifies = x
       for channel, payload, pid in notifies:
          print (channel, payload)
quit ()
sdh
() автор топика
Ответ на: комментарий от sdh

Оба примера имеют баг, сообщение выводится ровно через одно!

Половину сообщений пропадает!!!

sdh
() автор топика
Ответ на: комментарий от sdh

В примерах бага нет, но если в цыкле есть хотя бы одно обращение к базе, сигналы выводятся сразу через один:

#!/usr/bin/python3
"""-*- coding: <UTF-8> -*-"""
import postgresql

monitoring = postgresql.open('pq://user@server/monitoring')
signal_one = monitoring.prepare("SELECT x_y FROM signals WHERE id = $1")
monitoring.listen('signals')

signal = ['0']
for x in monitoring.iternotifies():
#	signal = signal_one(int(x[1]))
	print (x[0], x[1], signal[0][0])
quit ()
Сообщение выводит по порядку.

#!/usr/bin/python3
"""-*- coding: <UTF-8> -*-"""
import postgresql

monitoring = postgresql.open('pq://user@server/monitoring')
signal_one = monitoring.prepare("SELECT x_y FROM signals WHERE id = $1")
monitoring.listen('signals')

signal = ['0']
for x in monitoring.iternotifies():
	signal = signal_one(int(x[1]))
	print (x[0], x[1], signal[0][0])
quit ()

Сообщение выводит ровно через одно!!!

Помогите действенным советом...

sdh
() автор топика
Ответ на: комментарий от bukaka

Все проблемы у меня, почему то, решаются только с утра.

#!/usr/bin/python3
"""-*- coding: <UTF-8> -*-"""
import postgresql

monitoring = postgresql.open('pq://user@server/monitoring')
signal_one = monitoring.prepare("SELECT x_y FROM signals WHERE id = $1")
monitoring.listen('signals')

signal = ['0']
for x in monitoring.iternotifies():
   signal = signal_one(int(x[1]))
   monitoring.execute('SELECT 1') # без этой строки уведомления выводятся через одно!!!
   print (x[0], x[1], signal[0][0])
quit ()

Почему обращение к базе monitoring.execute('SELECT 1') влияет на итератор уведомлений я не знаю, может маны не дочитал, может баг py-postgresql... я им в список рассылки письмо напишу, пусть разбираются...

А вот:

#!/usr/bin/python3
"""-*- coding: <UTF-8> -*-"""
import postgresql

monitoring = postgresql.open('pq://user@server/monitoring')
monitoring.listen('signals')

for x in monitoring.iternotifies():
   print (x[0], x[1])
quit ()
если в цикле вообще не обращаться к базе, то уведомления выводятся все...

sdh
() автор топика
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.