LINUX.ORG.RU

вызов generic_make_request из обработчика запросов блочного устройства

 , , , ,


0

2

Есть проект: https://github.com/OrenKishon/stackbd - это блочное устройство, которое перенаправляет все запросы на некоторое другое устройство, попутно добавляя информацию об этих запросах в syslog. В качестве подхода к реализации обмена данными выбрано прямое выполнение запроса без использование очереди. Если же переделать код таким образом, что бы очередь использовалась, модуль повисает в том случае, если в устройство записать больше 8 секторов за раз (с помощью dd например, при count=9). С чтением при этом проблем нет.

Есть идеи, почему так, как лечить?

#include <linux/version.h>
#include <linux/module.h>
#include <linux/moduleparam.h>
#include <linux/init.h>
 
#include <linux/module.h>
#include <linux/kernel.h> /* printk() */
#include <linux/fs.h>     /* everything... */
#include <linux/errno.h>  /* error codes */
#include <linux/types.h>  /* size_t */
#include <linux/vmalloc.h>
#include <linux/genhd.h>
#include <linux/blkdev.h>
#include <linux/hdreg.h>
#include <linux/kthread.h>
 
#include <trace/events/block.h>
 
#include "../common/stackbd.h"
 
#define USE_BLKMQ 1
 
#if USE_BLKMQ
#include <linux/blk-mq.h>
#endif
 
#define LOGOUT(lvl, fmt, args...) printk(lvl "%s [task=%p] %s: " fmt, THIS_MODULE->name, current, __func__, ## args)
#define PINFO(fmt, args...) LOGOUT(KERN_INFO, fmt, ## args)
#define PWARN(fmt, args...) LOGOUT(KERN_WARNING, fmt, ## args)
#define PERROR(fmt, args...) LOGOUT(KERN_ERR, fmt, ## args)
 
#define STACKBD_BDEV_MODE (FMODE_READ | FMODE_WRITE | FMODE_EXCL)
#define DEBUGGG printk("stackbd: %d\n", __LINE__);
/*
 * We can tweak our hardware sector size, but the kernel talks to us
 * in terms of small sectors, always.
 */
#define KERNEL_SECTOR_SHIFT 9
#define KERNEL_SECTOR_SIZE (1 << KERNEL_SECTOR_SHIFT)
#define KERNEL_PAGE_SHIFT 12
#define KERNEL_PAGE_SIZE (1 << KERNEL_PAGE_SHIFT)
 
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 16, 0)
#   define DECLARE_BIO_VEC struct bio_vec
#   define ACCESS_BIO_VEC(x) (x)
#else
#   define DECLARE_BIO_VEC struct bio_vec *
#   define ACCESS_BIO_VEC(x) (*(x))
#endif
 
MODULE_LICENSE("Dual BSD/GPL");
 
static int major_num = 0;
module_param(major_num, int, 0);
static int LOGICAL_BLOCK_SIZE = 4096;
module_param(LOGICAL_BLOCK_SIZE, int, 0);
 
/*
 * The internal representation of our device.
 */
static struct stackbd_t {
    sector_t capacity; /* Sectors */
    struct gendisk *gd;
    spinlock_t lock;
    struct bio_list bio_list;    
    struct task_struct *thread;
    int is_active;
    struct block_device *bdev_raw;
    /* Our request queue */
    struct request_queue *queue;
#if USE_BLKMQ
    struct blk_mq_tag_set tag_set;
#endif
} stackbd;
 
struct bio_private
{
    void *bi_private_old;
    void *data;
    bool is_ready;
};
 
typedef struct hidden_cmd_s
{
    long ret;
} hidden_cmd_t;
 
 
static DECLARE_WAIT_QUEUE_HEAD(req_event);
 
int ald_buffer_read(
    unsigned long sector,
    unsigned long nsect,
    char *buffer
)
{
    int result = 0;
    unsigned nsize = nsect << KERNEL_SECTOR_SHIFT;
    int npages = ((nsize - 1) >> KERNEL_PAGE_SHIFT) + 1;
    struct bio *bio = bio_alloc(GFP_ATOMIC, npages);
    struct block_device *bdev = stackbd.bdev_raw;
 
    PINFO("begin; sector=%ld; nsect=%ld; buffer=%p\n", sector, nsect, buffer);
 
    if(unlikely(!bio))
    {
        PINFO("bio_alloc failed!\n");
        result = -ENOMEM;
        return result;
    }
    bio_set_dev(bio, bdev);
    bio->bi_iter.bi_sector = sector;
    bio_set_op_attrs(bio, REQ_OP_READ, 0);
    {
        char *ptr = buffer;
        do
        {
            struct page *page;
            page = virt_to_page(ptr);
            if(unlikely(!page))
            {
                PINFO("virt_to_page failed!\n");
                result = -ENOMEM;
                break;
            }
 
            {
                unsigned this_step = min((unsigned)(PAGE_SIZE - offset_in_page(ptr)), nsize);
                bio_add_page(bio, page, this_step, offset_in_page(ptr));
                nsize -= this_step;
                ptr += this_step;
            }
        } while(nsize > 0);
 
        if(likely(!result))
        {
            result = submit_bio_wait(bio);
        }
        bio_put(bio);
    }
    PINFO("end (%d)\n", result);
    return result;
}
 
int ald_buffer_write(
    unsigned long sector,
    unsigned long nsect,
    char *buffer
)
{
    int result = 0;
    unsigned nsize = nsect << KERNEL_SECTOR_SHIFT;
    int npages = ((nsize - 1) >> KERNEL_PAGE_SHIFT) + 1;
    struct bio *bio = bio_alloc(GFP_ATOMIC, npages);
    struct block_device *bdev = stackbd.bdev_raw;
 
    PINFO("begin; sector=%ld; nsect=%ld; buffer=%p\n", sector, nsect, buffer);
 
    if(unlikely(!bio))
    {
        PINFO("bio_alloc failed!\n");
        result = -ENOMEM;
        return result;
    }
    bio_set_dev(bio, bdev);
    bio->bi_iter.bi_sector = sector;
    bio_set_op_attrs(bio, REQ_OP_WRITE, 0);
    {
        char *ptr = buffer;
        do
        {
            struct page *page = virt_to_page(ptr);
 
            if(unlikely(!page))
            {
                PINFO("alloc page failed!\n");
                result = -ENOMEM;
                break;
            }
 
            {
                unsigned op = offset_in_page(ptr);
                unsigned this_step = min((unsigned)(KERNEL_PAGE_SIZE - op), nsize);
                bio_add_page(bio, page, this_step, op);
                nsize -= this_step;
                ptr += this_step;
            }
        } while(nsize > 0);
 
        if(likely(!result))
        {
            result = submit_bio_wait(bio);
        }
        bio_put(bio);
    }
    PINFO("end (%d)\n", result);
    return result;
}
 
#if USE_BLKMQ
static void pb_alloc(struct bio *bio, void *data)
{
    struct bio_private *pb = kmalloc(sizeof(struct bio_private), GFP_ATOMIC);
 
    pb->bi_private_old = bio->bi_private;
    pb->data = data;
    pb->is_ready = false;
    bio->bi_private = pb;
}
 
static void pb_free(struct bio *bio)
{
    struct bio_private *pb = bio->bi_private;
    void *t = bio->bi_private;
    bio->bi_private = pb->bi_private_old;
    kfree(t);
}
#endif
 
static void my_bio_complete(struct bio *bio, int ret)
{
#if USE_BLKMQ
   struct bio_private *pb = bio->bi_private;
   struct request *rq = pb->data;
 
   pb_free(bio);
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 18, 0)
   blk_mq_end_request(rq, ret ? BLK_STS_IOERR : BLK_STS_OK);
#else
   blk_mq_end_io(rq, ret ? BLK_STS_IOERR : BLK_STS_OK);
#endif
 
#else // #if USE_BLKMQ
    bio_endio(bio);
#endif // #if USE_BLKMQ
}
 
 
static void stackbd_io_fn(struct bio *bio)
{
    sector_t sector = bio->bi_iter.bi_sector;
    int size = bio->bi_iter.bi_size;
    int nsect = size >> KERNEL_SECTOR_SHIFT;
    DECLARE_BIO_VEC bvec;
    struct bvec_iter iter;
    u8 *buffer = kmalloc(size, GFP_ATOMIC);
    u8 *ptr = buffer;
 
    if (bio_data_dir(bio) == READ)
    {
        ald_buffer_read(sector, nsect, ptr);
 
        bio_for_each_segment(bvec, bio, iter)
        {
            u8 *dst = page_address(ACCESS_BIO_VEC(bvec).bv_page) + ACCESS_BIO_VEC(bvec).bv_offset;
            int len = ACCESS_BIO_VEC(bvec).bv_len;
            memcpy(dst, ptr, len);
            ptr += len;
        }
    }
    else
    {
        bio_for_each_segment(bvec, bio, iter)
        {
            u8 *src = page_address(ACCESS_BIO_VEC(bvec).bv_page) + ACCESS_BIO_VEC(bvec).bv_offset;
            int len = ACCESS_BIO_VEC(bvec).bv_len;
            memcpy(ptr, src, len);
            ptr += len;
        }
        ald_buffer_write(sector, nsect, buffer);
    }
    kfree(buffer);
    my_bio_complete(bio, 0);
}
 
static int stackbd_threadfn(void *data)
{
    struct bio *bio;
 
    set_user_nice(current, -20);
 
    while (!kthread_should_stop())
    {
        /* wake_up() is after adding bio to list. No need for condition */ 
        wait_event_interruptible(req_event, kthread_should_stop() ||
                !bio_list_empty(&stackbd.bio_list));
 
        spin_lock_irq(&stackbd.lock);
        if (bio_list_empty(&stackbd.bio_list))
        {
            spin_unlock_irq(&stackbd.lock);
            continue;
        }
 
        bio = bio_list_pop(&stackbd.bio_list);
        spin_unlock_irq(&stackbd.lock);
 
        stackbd_io_fn(bio);
    }
 
    return 0;
}
 
#if USE_BLKMQ
//#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 13, 3)
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 19, 0)
static blk_status_t hidden_queue_rq(struct blk_mq_hw_ctx *hctx, const struct blk_mq_queue_data* bd)
#elif LINUX_VERSION_CODE >= KERNEL_VERSION(3, 18, 0)
static blk_status_t hidden_queue_rq(struct blk_mq_hw_ctx *hctx, struct request* rq, bool last)
#else
static blk_status_t hidden_queue_rq(struct blk_mq_hw_ctx *hctx, struct request* rq)
#endif
{
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 19, 0)
    struct request *rq = bd->rq;
#endif
    struct bio *bio = rq->bio;
    pb_alloc(bio, rq);
    spin_lock_irq(&stackbd.lock);
    if (!stackbd.bdev_raw)
    {
        printk("stackbd: Request before bdev_raw is ready, aborting\n");
        goto abort;
    }
    if (!stackbd.is_active)
    {
        printk("stackbd: Device not active yet, aborting\n");
        goto abort;
    }
 
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 18, 0)
    blk_mq_start_request(rq);
#endif
 
    bio_list_add(&stackbd.bio_list, bio);
    wake_up(&req_event);
exit:
    spin_unlock_irq(&stackbd.lock);
 
//exit:
    return BLK_STS_OK; //always return ok
abort:
    my_bio_complete(bio, -EIO);
    goto exit;
}
 
static struct blk_mq_ops _mq_ops =
{
    .queue_rq = hidden_queue_rq,
#if LINUX_VERSION_CODE <= KERNEL_VERSION(4, 2, 0)
    .map_queue = blk_mq_map_queue
#endif
};
#else // #if USE_BLKMQ
/*
 * Handle an I/O request.
 */
static blk_qc_t stackbd_make_request(struct request_queue *q, struct bio *bio)
{
    printk("stackbd: make request %-5s block %-12lu #pages %-4hu total-size "
            "%-10u\n", bio_data_dir(bio) == WRITE ? "write" : "read",
            bio->bi_iter.bi_sector, bio->bi_vcnt, bio->bi_iter.bi_size);
 
//    printk("<%p> Make request %s %s %s\n", bio,
//           bio->bi_rw & REQ_SYNC ? "SYNC" : "",
//           bio->bi_rw & REQ_FLUSH ? "FLUSH" : "",
//           bio->bi_rw & REQ_NOIDLE ? "NOIDLE" : "");
//
    spin_lock_irq(&stackbd.lock);
    if (!stackbd.bdev_raw)
    {
        printk("stackbd: Request before bdev_raw is ready, aborting\n");
        goto abort;
    }
    if (!stackbd.is_active)
    {
        printk("stackbd: Device not active yet, aborting\n");
        goto abort;
    }
    bio_list_add(&stackbd.bio_list, bio);
    wake_up(&req_event);
    spin_unlock_irq(&stackbd.lock);
 
    goto exit;
 
abort:
    spin_unlock_irq(&stackbd.lock);
    printk("<%p> Abort request\n\n", bio);
    bio_io_error(bio);
exit:
    return BLK_QC_T_NONE;
}
#endif // #if USE_BLKMQ
 
static struct block_device *stackbd_bdev_open(char dev_path[])
{
    /* Open underlying device */
    struct block_device *bdev_raw = lookup_bdev(dev_path);
    printk("Opened %s\n", dev_path);
 
    if (IS_ERR(bdev_raw))
    {
        printk("stackbd: error opening raw device <%lu>\n", PTR_ERR(bdev_raw));
        return NULL;
    }
 
    if (!bdget(bdev_raw->bd_dev))
    {
        printk("stackbd: error bdget()\n");
        return NULL;
    }
 
    if (blkdev_get(bdev_raw, STACKBD_BDEV_MODE, &stackbd))
    {
        printk("stackbd: error blkdev_get()\n");
        bdput(bdev_raw);
        return NULL;
    }
 
    return bdev_raw;
}
 
static int stackbd_start(char dev_path[])
{
    unsigned max_sectors;
 
    if (!(stackbd.bdev_raw = stackbd_bdev_open(dev_path)))
        return -EFAULT;
 
    /* Set up our internal device */
    stackbd.capacity = get_capacity(stackbd.bdev_raw->bd_disk);
    printk("stackbd: Device real capacity: %lu\n", stackbd.capacity);
 
    set_capacity(stackbd.gd, stackbd.capacity);
 
    max_sectors = queue_max_hw_sectors(bdev_get_queue(stackbd.bdev_raw));
    blk_queue_max_hw_sectors(stackbd.queue, max_sectors);
    printk("stackbd: Max sectors: %u\n", max_sectors);
 
    stackbd.thread = kthread_create(stackbd_threadfn, NULL,
           stackbd.gd->disk_name);
    if (IS_ERR(stackbd.thread))
    {
        printk("stackbd: error kthread_create <%lu>\n",
               PTR_ERR(stackbd.thread));
        goto error_after_bdev;
    }
 
    printk("stackbd: done initializing successfully\n");
    stackbd.is_active = 1;
    wake_up_process(stackbd.thread);
 
    return 0;
 
error_after_bdev:
    blkdev_put(stackbd.bdev_raw, STACKBD_BDEV_MODE);
    bdput(stackbd.bdev_raw);
 
    return -EFAULT;
}
 
static int stackbd_ioctl(struct block_device *bdev, fmode_t mode,
             unsigned int cmd, unsigned long arg)
{
    char dev_path[80];
    void __user *argp = (void __user *)arg;    
 
    switch (cmd)
    {
    case STACKBD_DO_IT:
        printk("\n*** DO IT!!!!!!! ***\n\n");
 
        if (copy_from_user(dev_path, argp, sizeof(dev_path)))
            return -EFAULT;
 
        return stackbd_start(dev_path);
    default:
        return -ENOTTY;
    }
}
 
/*
 * The HDIO_GETGEO ioctl is handled in blkdev_ioctl(), which
 * calls this. We need to implement getgeo, since we can't
 * use tools such as fdisk to partition the drive otherwise.
 */
int stackbd_getgeo(struct block_device * block_device, struct hd_geometry * geo)
{
    long size;
 
    /* We have no real geometry, of course, so make something up. */
    size = stackbd.capacity * (LOGICAL_BLOCK_SIZE / KERNEL_SECTOR_SIZE);
    geo->cylinders = (size & ~0x3f) >> 6;
    geo->heads = 4;
    geo->sectors = 16;
    geo->start = 0;
    return 0;
}
 
/*
 * The device operations structure.
 */
static struct block_device_operations stackbd_ops = {
        .owner  = THIS_MODULE,
        .getgeo = stackbd_getgeo,
        .ioctl  = stackbd_ioctl,
};
 
static int __init stackbd_init(void)
{
    /* Set up our internal device */
    spin_lock_init(&stackbd.lock);
 
    /* blk_alloc_queue() instead of blk_init_queue() so it won't set up the
     * queue for requests.
     */
#if USE_BLKMQ
    stackbd.tag_set.ops = &_mq_ops;
    stackbd.tag_set.nr_hw_queues = 1;
    stackbd.tag_set.queue_depth = 128;
    stackbd.tag_set.numa_node = NUMA_NO_NODE;
    stackbd.tag_set.cmd_size = sizeof(hidden_cmd_t);
    stackbd.tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE | BLK_MQ_F_BLOCKING;
    stackbd.tag_set.driver_data = &stackbd;
 
    {
        int res = blk_mq_alloc_tag_set(&stackbd.tag_set);
        if (res)
        {
            PWARN("unable to allocate tag set (%d)\n", res);
            return -EFAULT;
        }
    }
    stackbd.queue = blk_mq_init_queue(&stackbd.tag_set);
    if (IS_ERR(stackbd.queue))
    {
        int res = PTR_ERR(stackbd.queue);
        PWARN("Failed to allocate queue (%d)", res);
        return -EFAULT;
    }
#else
    if (!(stackbd.queue = blk_alloc_queue(GFP_KERNEL)))
    {
        printk("stackbd: alloc_queue failed\n");
        return -EFAULT;
    }
 
    blk_queue_make_request(stackbd.queue, stackbd_make_request);
#endif
    blk_queue_logical_block_size(stackbd.queue, LOGICAL_BLOCK_SIZE);
 
    /* Get registered */
    if ((major_num = register_blkdev(major_num, STACKBD_NAME)) < 0)
    {
        printk("stackbd: unable to get major number\n");
        goto error_after_alloc_queue;
    }
 
    /* Gendisk structure */
    if (!(stackbd.gd = alloc_disk(16)))
        goto error_after_redister_blkdev;
    stackbd.gd->major = major_num;
    stackbd.gd->first_minor = 0;
    stackbd.gd->fops = &stackbd_ops;
    stackbd.gd->private_data = &stackbd;
    strcpy(stackbd.gd->disk_name, STACKBD_NAME_0);
    stackbd.gd->queue = stackbd.queue;
    add_disk(stackbd.gd);
 
    printk("stackbd: init done\n");
 
    return 0;
 
error_after_redister_blkdev:
    unregister_blkdev(major_num, STACKBD_NAME);
error_after_alloc_queue:
    blk_cleanup_queue(stackbd.queue);
 
    return -EFAULT;
}
 
static void __exit stackbd_exit(void)
{
    printk("stackbd: exit\n");
 
    if (stackbd.is_active)
    {
        kthread_stop(stackbd.thread);
        blkdev_put(stackbd.bdev_raw, STACKBD_BDEV_MODE);
        bdput(stackbd. bdev_raw);
    }
 
    del_gendisk(stackbd.gd);
    put_disk(stackbd.gd);
    unregister_blkdev(major_num, STACKBD_NAME);
    blk_cleanup_queue(stackbd.queue);
#if USE_BLKMQ
    if (stackbd.tag_set.tags)
        blk_mq_free_tag_set(&stackbd.tag_set);
#endif
}
 
module_init(stackbd_init);
module_exit(stackbd_exit);



Последнее исправление: zenbooster (всего исправлений: 1)

Если же переделать код таким образом, что бы очередь использовалась, модуль повисает в том случае, если в устройство записать больше 8 секторов за раз

<КО>Похоже, проблема в том, что переделывать нужно правильно, а ты где-то допустил ошибку</КО>

попутно добавляя информацию об этих запросах в syslog

Отличная архитектура, чтобы своими руками заДДОСить printk

ttnl ★★★★★
()

Как минимум в одном месте, при ошибке спинлок не освобождается.

А лечить отладкой, как в голове, так и на машине. Логично собрать кастомное отладочное ядро и красноглазить на виртуалке.

Deleted
()

Повисает возможно потому, что page cache сбрасывается после записи 8 секторов (4096, страница).

Попробуй вот так:

$ dd if=/dev/zero of=<dev> bs=4096 count=1 oflag=direct
$ sync
kirk_johnson ★☆
()
Последнее исправление: kirk_johnson (всего исправлений: 3)

Плюс, у тебя spin_lock_irq() в hidden_queue_rq(). Я хз почему ты не хочешь, следуя логике mq, сделать lockless структуру (привязать список запросов к CPU и просто вырубать preempt, вставлять в список и врубать preempt обратно), но суть в том, что возможно тебе нужен spin_lock_irqsave() в этом контексте.

И complete у тебя довольно странный. Вместо blk_mq_end_io() вроде как стоит позвать blk_mq_complete_request().

kirk_johnson ★☆
()
Последнее исправление: kirk_johnson (всего исправлений: 2)
Ответ на: комментарий от Deleted

Как минимум в одном месте, при ошибке спинлок не освобождается.

В hidden_queue_rq перенёс метку exit перед spin_unlock_irq.

zenbooster
() автор топика
Ответ на: комментарий от kirk_johnson

Попробовал, так работает с любым количеством.

zenbooster
() автор топика
Ответ на: комментарий от kirk_johnson

Я ориентировался на https://habr.com/ru/company/veeam/blog/446148/ и подобные примеры. Можно сделать и с blk_mq_complete_request(), но может ли это быть причиной бага?

lockless структуру надо бы попробовать сделать, только надо прояснить несколько моментов... Я так понял, что привязать список запросов к CPU можно с помощью DEFINE_PER_CPU или alloc_percpu(), free_percpu(). Как можно вырубать / врубать preempt?

zenbooster
() автор топика
Ответ на: комментарий от ttnl

Отличная архитектура, чтобы своими руками заДДОСить printk

Для опытов на виртуалке, почему бы и нет?

zenbooster
() автор топика
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.