LINUX.ORG.RU

Сообщения scisearcher

 

перезапустить DAG, который нормально завершился

Форум — Development

Пусть у нас имеется DAG, который запускается один раз в сутки. Он работает без выбрасывания исключений. Но, выясняется, что DAG имеет багу и не совсем верно выполняет действия. Ошибка подправлена, но даг не стал перевыполнять расчеты для старых временных отрезков. Подскажите, как перезапустить даги, отработавшие без выброса исключений? Может в интерфейсе есть кнопочка какая? Переименовать может нужно (как крайний вариант)?

 , ,

scisearcher
()

Как вы отключаете DAGs Airflow

Форум — Development

Есть одноразовые даги. Они отработали и не нужны сейчас, и даже вредны - случайный запуск подпортит данные. Но, в будующем их код может потребоваться. Какие лучшие практики отлючения таких дагов известны?

Из идей приходит только - сделать архивный каталог и просто перемещать туда даги.

P.S. в другом каталоге даги успешно находятся и запускаются - не получится их просто перемещать

 , ,

scisearcher
()

DAG застрял

Форум — Development

Нужно, что бы даг запускался в 5 утра МСК (1 UTC). Выкатил вечером и снял с паузы сразу, часов в 20-ть - даг автоматом не запустился в 5 утра, и я запустил его днем руками - один раз. Далее, происходят не понятные вещи - уже несколько суток даг не запускается, но интервал Next Run меняется - сейчас

[2022-10-29, 01:00:00 ... 2022-10-30, 01:00:00]
вчера было
[2022-10-28, 01:00:00 ... 2022-10-29, 01:00:00]
...

Код был такой:

default_args = {
    'retries': 3,
    'retry_delay': dt.timedelta(minutes=30),
}

with DAG(
    dag_id='my_dag_id',
    start_date=days_ago(0),
    schedule_interval="0 1 * * *",
    catchup=False,
    is_paused_upon_creation=True,
    default_args=default_args
) as dag:

Я хотел поставить запуск день назад, чтобы даг сработал на интервале. Хорошо, правим. Где-то в 12 МСК (9UTC) - 2022-10-29 поменял:

    start_date=days_ago(1),

Даг запустился автоматом и отработал интервал день назад:

(LogicalDate=2022-10-28, 01:00:00, 
RunType=scheduled, 
QueuedAt=2022-10-29, 09:02:56, 
StartDate=2022-10-29, 09:02:56, 
EndDate=2022-10-29, 09:12:10) 

Но, это не изменило его состояния, по прежднему

NextRun = [2022-10-29, 01:00:00 UTC; 2022-10-30, 01:00:00 UTC]
Run After = [2022-10-30, 01:00:00 UTC]
Т.е. сам он не запустится автоматом.

Как теперь вывести DAG из ступора? Чтобы он сам в 1UTC запускался.

 ,

scisearcher
()

запускать DAG каждый час в заданное число минут

Форум — Development

Нужно чтобы даг запускался каждый час в 15 минут. При этом не нужно просчитавыть интервал назад в прошлое. Казалось бы вот так нужно конфигурировать даг:

with DAG(
    # ...
    schedule_interval=timedelta(hours=1),
    start_date=days_ago(0, hour=0, minute=15),
    catchup=False,
    is_paused_upon_creation=False,
    # ...

Но, если даг запускается, например, в 15:43 то, он начинает выполняться в 15:43, 16:43, 17:43 .... Но, хотелось бы увидеть 16:15, 17:15, 18:15 ... Как это можно сделать?

 , ,

scisearcher
()

отключить примеры в контейнере с airflow в правилах docker

Форум — Development

Помогите отключить примеры airflow правилах сборки docker.

Проблема такая. Если пишу в правилах сборки

RUN airflow db init
RUN export AIRFLOW__CORE__LOAD_EXAMPLES="false" && airflow db reset -y
то примеры не отключаются.

Если запущу докер и там выполню

export AIRFLOW__CORE__LOAD_EXAMPLES="false" && airflow db reset -y
то примеры отключатся - и это хотелось бы сделать в правилах докера.

 , ,

scisearcher
()

pandas вычислить time_local

Форум — Development

Имеется датафрейм с временем UTC и timezone в секундах. Как без циклов вычислить столбец с локальным временем:

import pandas as pd
import datetime as dt

time_sec = 1665170104
offset_utc = dt.timedelta(seconds=0)
tz_utc = dt.timezone(offset_utc) 


df = pd.DataFrame({
    'time_utc': [
        dt.datetime.fromtimestamp(time_sec).astimezone(tz_utc), 
        dt.datetime.fromtimestamp(time_sec).astimezone(tz_utc)], 
    'timezone': [60*60*3, 60*60*2]
})
 	time_utc 	                 timezone
0 	2022-10-07 19:15:04+00:00 	 10800
1 	2022-10-07 19:15:04+00:00 	 7200

Нужно посчитать time_local для каждой строчки. Но не получется что-то

df.time_utc.dt.tz_convert(что_то.timedelta(seconds=df.timezone))

 , ,

scisearcher
()

sqlalchemy Session в разных процессах (потоках)

Форум — Development

Имеется модуль с session_scope. Могу ли я смело его подключать к своим отдельным скриптам, содержимое которых будет выполняться параллельно в отдельных airflow dag?

engine_mydb = create_engine(mydb_setting)

@contextmanager
def session_scope():
    try:
        session = Session(bind=engine_mydb)
        yield session
        session.commit()
    except Exception as e:
        send_error(e)
        session.rollback()
    finally:
        session.close()

 ,

scisearcher
()

Как вы передаете DataFrame между Airflow операторами?

Форум — Development

Разбираюсь с airflow, сделал Python-операторы, 1) загрузил pandas DataFrame в Питон операторе, 2) почистил DataFrame, 3) трансформировал … . А как передать операторы? В csv или json сохранять на диск? Метаданные (хотя они точто не для датафреймов)? Библиотеки может есть какие?

Загрузку точно нужно отделить, т.к. частенько в коннекте с источником данных сбои.

 , ,

scisearcher
()

ускорить в рамках Python вставки INSERT ON CONFLICT

Форум — Development

Когда реализовал загрузку таблички в Postgres таким образом: с помощью pandas считал все данные и сохранил их в csv-файл и затем загрузил с помощью engine.raw_connection().cursor().copy_from() от sqlalchemy. При это табличка дропается и создается заново. Работает быстро (несколько минимут), но возникла проблема - кончается оперативная память. Датафрейм уже занимает под 10 G и вынужден перейти к дописыванию таблички.

При дописывании возникает проблема. Строки таблицы с последнего момента записывания меняются (глубина несколько суток). Поэтому использую «INSERT ON CONFLICT UPDATE» в Питоновском цикле с execute от sqlalchemy. Все работает, но бесконечно долго. 10 тыс. записей висят несколько часов. Порекомендуйте как ускорится можно?

 , ,

scisearcher
()

поставить python3.6 в Ubuntu20.04

Форум — Development

Привет, ЛОР! Подскажите, как поставить в Ubuntu20.04 Python версии 3.6? Затягивает 3.8. Вот что пробовал собрать в docker:

FROM ubuntu:20.04
RUN apt-get update
RUN apt-get install -y python3.6

и вот так:

FROM ubuntu:20.04
RUN add-apt-repository -y ppa:jblgf0/python  #  Unable to locate package software-properties-common

RUN apt-get update
RUN apt-get install -y python3.6

FROM ubuntu:20.04
RUN apt-get install -y software-properties-common #  Unable to locate package software-properties-common
RUN add-apt-repository -y ppa:jblgf0/python
RUN apt-get update
RUN apt-get install -y python3.6
FROM ubuntu:20.04
RUN apt-get update
RUN apt-get install -y software-properties-common
RUN add-apt-repository -y ppa:jblgf0/python  # The repository 'http://ppa.launchpad.net/jblgf0/python/ubuntu focal Release' does not have a Release file.

и кучу других рецептов из гугля, но пока ничего не получилось. В Убунту 18 Питон 3.6 ставиться по умолчанию, но 18-я сильно не подходит. Нужно в 20-й Убунте установиться.

 , ,

scisearcher
()

выборка актуальных по дате данных

Форум — Development

Имеется табличка, в которой тип группы меняется с течением времени:

select * from user_groups_by_types ;
 id | change_date | user_group | group_type 
----+-------------+------------+------------
  1 | 2022-05-01  | A          | type1
  2 | 2022-05-05  | A          | type2
  3 | 2022-05-06  | B          | type1

Требуется выбрать для каждой группы актуальный тип (т.е. самый последний по дате тип). Решение для одной конкретной группы очевидно. Как это обобщить на все группы?

select * from user_groups_by_types where user_group = 'A' and change_date = (select max(change_date) from user_groups_by_types where user_group = 'A');
 id | change_date | user_group | group_type 
----+-------------+------------+------------
  2 | 2022-05-05  | A          | type2

 ,

scisearcher
()

добавить MetaData к существующей таблице (SQLAlchemy)

Форум — Development

Существуют таблицы, они заполнены и используются. И, возникла необходимость в миграции. Выбрал alembic и он требует MetaData object.

Как можно добавить к существующей таблице объект MetaData?

Таблица создавалась средствами SQLAlchemy:


class ATable(Base):
    
    __tablename__ = 'atable'

    id = Column(Integer, primary_key=True)
    type = Column(String, nullable=False)
    name = Column(String, nullable=False)
    # ...
    def __init__(self, **kwa):
        self.__dict__.update(kwa)
    
    def __repr__(self):
        return str(self.__dict__)

# ...
from sqlalchemy import create_engine
from sqlalchemy import MetaData
    
engine = create_engine(dbsetting, echo=True)
metaData = MetaData(bind=engine)
metaData.create_all(tables=[ATable.__table__])

P.S. Хотя, может эти метаданные есть, но где их искать?

ERROR [alembic.util.messaging] Can't proceed with --autogenerate option; environment script /home/user/project/alembic/env.py does not provide a MetaData object or sequence of objects to the context.
  FAILED: Can't proceed with --autogenerate option; environment script /home/user/project/alembic/env.py does not provide a
  MetaData object or sequence of objects to the context.

 ,

scisearcher
()

pandas to_sql делать записи порциями

Форум — Development

Помогите решить проблему записи большого датафрейма в postgres таблицу. Есть датафрейм df, который с небольшим количеством данных (несколько сот записей тестирвовал) успешно и быстро сохраняется в postgres таблицу:

df.to_sql(
    'my_table', my_engine, chunksize=100, if_exists='replace')
Но, в требуемом режиме (продовском), когда число записей достигает приблизительно 500 тыс. строк ничего не происходит. Оператор висит несколько часов и молча завершается без всяких сообщений. Данные при это не попадают в postgres - нет ни одной записи. Табличка создается, если ее удалить предварительно. При этом датафрейм имеется и его строки успешно печатаются в консоли ...

Что можно сделать? Почему нет ни одной записи, хотяюбы первые 100 штук?

 ,

scisearcher
()

сджойинить датафреймы по приблизительно одинаковым временным меткам

Форум — Development

Как сджойнить df1 и df2 с точностью 1 минута? Нужно, чтобы близкие с точностью до 1 минуты временные метки совпадали.

df1 = pd.DataFrame(
    {'type': ['x', 'y', 'z']},
    index=pd.to_datetime(['2021-01-21 11:00:00.001', '2021-01-21 12:12:00.999',  '2021-01-21 13:13:01.888']))
df1.index.name='date1'
df1.reset_index()

df2 = pd.DataFrame(
    {'value': [10, 15, 12]},
    index=pd.to_datetime(['2021-01-21 10:59:59.999', '2021-01-21 12:12:00.999',  '2021-01-21 13:13:02.001']))
df2.index.name='date2'
df2.reset_index()

P.S. попытка с resample('1h') не удалась, т.к. метку 2021-01-21 10:59:59.999 относит к 10 часам, а не к 11

 , ,

scisearcher
()

RSS подписка на новые темы