Commit 479c778a authored by Pavel Emelyanov's avatar Pavel Emelyanov Committed by Andrei Vagin

page-xfer: Introduce fully asynchronous read

Add a queue of async-read jobs into page-xfer. When the
page_server_sk gets a read event from epoll it reads as
many bytes into page_server_iov + page buffer as recv
allows and returns.

Once the full iov+data is ready the requestor is notified
and the next async read is started.

This patch removes calls to recv(...MSG_WAITALL) from all
remote async paths.
Signed-off-by: 's avatarPavel Emelyanov <xemul@virtuozzo.com>
Acked-by: 's avatarMike Rapoport <rppt@linux.vnet.ibm.com>
parent adea705b
......@@ -57,5 +57,9 @@ extern int request_remote_pages(int pid, unsigned long addr, int nr_pages);
extern int receive_remote_pages_info(int *nr_pages, unsigned long *addr, int *pid);
extern int receive_remote_pages(int len, void *buf);
typedef int (*ps_async_read_complete)(int pid, unsigned long vaddr, int nr_pages, void *);
extern int page_server_start_async_read(void *buf, int nr_pages,
ps_async_read_complete complete, void *priv);
extern int page_server_async_read(void);
#endif /* __CR_PAGE_XFER__H__ */
......@@ -950,6 +950,91 @@ out:
return ret ? : status;
}
struct ps_async_read {
unsigned long rb; /* read bytes */
unsigned long goal;
struct page_server_iov pi;
void *pages;
ps_async_read_complete complete;
void *priv;
struct list_head l;
};
static LIST_HEAD(async_reads);
int page_server_start_async_read(void *buf, int nr_pages,
ps_async_read_complete complete, void *priv)
{
struct ps_async_read *ar;
ar = xmalloc(sizeof(*ar));
if (ar == NULL)
return -1;
ar->pages = buf;
ar->rb = 0;
ar->goal = sizeof(ar->pi) + nr_pages * PAGE_SIZE;
ar->complete = complete;
ar->priv = priv;
list_add_tail(&ar->l, &async_reads);
return 0;
}
/*
* There are two possible event types we need to handle:
* - page info is available as a reply to request_remote_page
* - page data is available, and it follows page info we've just received
* Since the on dump side communications are completely synchronous,
* we can return to epoll right after the reception of page info and
* for sure the next time socket event will occur we'll get page data
* related to info we've just received
*/
int page_server_async_read(void)
{
struct ps_async_read *ar;
int ret, need;
void *buf;
BUG_ON(list_empty(&async_reads));
ar = list_first_entry(&async_reads, struct ps_async_read, l);
if (ar->rb < sizeof(ar->pi)) {
/* Header */
buf = ((void *)&ar->pi) + ar->rb;
need = sizeof(ar->pi) - ar->rb;
} else {
/* Page(s) data itself */
buf = ar->pages + (ar->rb - sizeof(ar->pi));
need = ar->goal - ar->rb;
}
ret = recv(page_server_sk, buf, need, MSG_DONTWAIT);
if (ret < 0) {
pr_perror("Error reading async data from page server\n");
return -1;
}
ar->rb += ret;
if (ar->rb < ar->goal)
return 0;
/*
* IO complete -- notify the caller and drop the request
*/
BUG_ON(ar->rb > ar->goal);
ret = ar->complete((int)ar->pi.dst_id, (unsigned long)ar->pi.vaddr,
(int)ar->pi.nr_pages, ar->priv);
list_del(&ar->l);
xfree(ar);
return ret;
}
int request_remote_pages(int pid, unsigned long addr, int nr_pages)
{
struct page_server_iov pi = {
......
......@@ -420,6 +420,23 @@ static int maybe_read_page_local(struct page_read *pr, unsigned long vaddr,
return ret;
}
static int read_page_complete(int pid, unsigned long vaddr, int nr_pages, void *priv)
{
int ret = 0;
struct page_read *pr = priv;
if (pr->pid != pid) {
pr_err("Out of order read completed (want %d have %d)\n",
pr->pid, pid);
return -1;
}
if (pr->io_complete)
ret = pr->io_complete(pr, vaddr, nr_pages);
return ret;
}
static int maybe_read_page_remote(struct page_read *pr, unsigned long vaddr,
int nr, void *buf, unsigned flags)
{
......@@ -427,8 +444,10 @@ static int maybe_read_page_remote(struct page_read *pr, unsigned long vaddr,
/* We always do PR_ASAP mode here (FIXME?) */
ret = request_remote_pages(pr->pid, vaddr, nr);
if ((ret < 0) || (flags & PR_ASYNC))
if (ret < 0)
return ret;
if (flags & PR_ASYNC)
return page_server_start_async_read(buf, nr, read_page_complete, pr);
/*
* Note, that for async remote page_read, the actual
......
......@@ -55,16 +55,9 @@ struct lazy_iovec {
struct lazy_pages_info;
struct sk_event_data {
int nr_pages;
unsigned long addr;
struct lazy_pages_info *lpi;
};
struct lazy_pages_fd {
int fd;
int (*event)(struct lazy_pages_fd *);
struct sk_event_data *ev_data;
};
struct lazy_pages_info {
......@@ -120,18 +113,6 @@ static void lpi_fini(struct lazy_pages_info *lpi)
free(lpi);
}
static struct lazy_pages_info *pid2lpi(int pid)
{
struct lazy_pages_info *lpi;
list_for_each_entry(lpi, &lpis, l) {
if (lpi->pid == pid)
return lpi;
}
return NULL;
}
static int epoll_nr_fds(int nr_tasks)
{
if (opts.use_page_server)
......@@ -921,56 +902,12 @@ close_uffd:
return -1;
}
/*
* There are two possible event types we need to handle:
* - page info is available as a reply to request_remote_page
* - page data is available, and it follows page info we've just received
* Since the on dump side communications are completely synchronous,
* we can return to epoll right after the reception of page info and
* for sure the next time socket event will occur we'll get page data
* related to info we've just received
*/
static int page_server_event(struct lazy_pages_fd *lpfd)
{
struct lazy_pages_info *lpi;
int pid, nr_pages;
unsigned long addr;
lpi = lpfd->ev_data->lpi;
if (!lpi) {
if (receive_remote_pages_info(&nr_pages, &addr, &pid))
return -1;
lpi = pid2lpi(pid);
if (!lpi)
return -1;
lpfd->ev_data->lpi = lpi;
lpfd->ev_data->nr_pages = nr_pages;
lpfd->ev_data->addr = addr;
return 0;
} else {
lpi = lpfd->ev_data->lpi;
nr_pages = lpfd->ev_data->nr_pages;
addr = lpfd->ev_data->addr;
memset(lpfd->ev_data, 0, sizeof(*lpfd->ev_data));
if (receive_remote_pages(nr_pages * PAGE_SIZE, lpi->buf))
return -1;
return complete_page_fault(lpi, addr, nr_pages);
}
return page_server_async_read();
}
static struct sk_event_data sk_event_data;
static struct lazy_pages_fd page_server_sk_fd = {
.event = page_server_event,
.ev_data = &sk_event_data,
};
static struct lazy_pages_fd page_server_sk_fd;
static int prepare_page_server_socket(int epollfd)
{
......@@ -980,6 +917,7 @@ static int prepare_page_server_socket(int epollfd)
if (sk < 0)
return -1;
page_server_sk_fd.event = page_server_event;
page_server_sk_fd.fd = sk;
return epoll_add_lpfd(epollfd, &page_server_sk_fd);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment