mirror of
https://git.savannah.gnu.org/git/guile.git
synced 2025-07-06 01:30:22 +02:00
add examples/ethreads/memcached-{client,server}
* examples/ethreads/memcached-client.scm: * examples/ethreads/memcached-server.scm: Two new ethreads examples. Not really optimized, no consideration for memory use, but perhaps instructive.
This commit is contained in:
parent
253dc1a711
commit
183f3db22e
2 changed files with 299 additions and 0 deletions
148
examples/ethreads/memcached-client.scm
Normal file
148
examples/ethreads/memcached-client.scm
Normal file
|
@ -0,0 +1,148 @@
|
|||
;;; Simple memcached client implementation
|
||||
|
||||
;; Copyright (C) 2012 Free Software Foundation, Inc.
|
||||
|
||||
;; This library is free software; you can redistribute it and/or
|
||||
;; modify it under the terms of the GNU Lesser General Public
|
||||
;; License as published by the Free Software Foundation; either
|
||||
;; version 3 of the License, or (at your option) any later version.
|
||||
;;
|
||||
;; This library is distributed in the hope that it will be useful,
|
||||
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
;; Lesser General Public License for more details.
|
||||
;;
|
||||
;; You should have received a copy of the GNU Lesser General Public
|
||||
;; License along with this library; if not, write to the Free Software
|
||||
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
||||
;; 02110-1301 USA
|
||||
|
||||
(use-modules (rnrs bytevectors)
|
||||
(ice-9 ethreads)
|
||||
(ice-9 binary-ports)
|
||||
(ice-9 textual-ports)
|
||||
(ice-9 rdelim)
|
||||
(ice-9 match))
|
||||
|
||||
(define (set-nonblocking! port)
|
||||
(fcntl port F_SETFL (logior O_NONBLOCK (fcntl port F_GETFL)))
|
||||
(setvbuf port 'block 1024))
|
||||
|
||||
(define (server-error port msg . args)
|
||||
(apply format (current-error-port) msg args)
|
||||
(newline (current-error-port))
|
||||
(close-port port)
|
||||
(suspend))
|
||||
|
||||
(define (parse-int port val)
|
||||
(let ((num (string->number val)))
|
||||
(unless (and num (integer? num) (exact? num) (>= num 0))
|
||||
(server-error port "Expected a non-negative integer: ~s" val))
|
||||
num))
|
||||
|
||||
(define (make-item flags bv)
|
||||
(vector flags bv))
|
||||
(define (item-flags item)
|
||||
(vector-ref item 0))
|
||||
(define (item-bv item)
|
||||
(vector-ref item 1))
|
||||
|
||||
(define (get port . keys)
|
||||
(put-string port "get ")
|
||||
(put-string port (string-join keys " "))
|
||||
(put-string port "\r\n")
|
||||
(force-output port)
|
||||
(let lp ((vals '()))
|
||||
(let ((line (read-line port)))
|
||||
(when (eof-object? line)
|
||||
(server-error port "Expected a response to 'get', got EOF"))
|
||||
(match (string-split (string-trim-right line) #\space)
|
||||
(("VALUE" key flags length)
|
||||
(let* ((flags (parse-int port flags))
|
||||
(length (parse-int port length)))
|
||||
(unless (member key keys)
|
||||
(server-error port "Unknown key: ~a" key))
|
||||
(when (assoc key vals)
|
||||
(server-error port "Already have response for key: ~a" key))
|
||||
(let ((bv (get-bytevector-n port length)))
|
||||
(unless (= (bytevector-length bv) length)
|
||||
(server-error port "Expected ~A bytes, got ~A" length bv))
|
||||
(when (eqv? (peek-char port) #\return)
|
||||
(read-char port))
|
||||
(unless (eqv? (read-char port) #\newline)
|
||||
(server-error port "Expected \\n"))
|
||||
(lp (acons key (make-item flags bv) vals)))))
|
||||
(("END")
|
||||
(reverse vals))
|
||||
(_
|
||||
(server-error port "Bad line: ~A" line))))))
|
||||
|
||||
(define* (set port key flags exptime bytes #:key noreply?)
|
||||
(put-string port "set ")
|
||||
(put-string port key)
|
||||
(put-char port #\space)
|
||||
(put-string port (number->string flags))
|
||||
(put-char port #\space)
|
||||
(put-string port (number->string exptime))
|
||||
(put-char port #\space)
|
||||
(put-string port (number->string (bytevector-length bytes)))
|
||||
(when noreply?
|
||||
(put-string port " noreply"))
|
||||
(put-string port "\r\n")
|
||||
(put-bytevector port bytes)
|
||||
(put-string port "\r\n")
|
||||
(force-output port)
|
||||
(let ((line (read-line port)))
|
||||
(match line
|
||||
((? eof-object?)
|
||||
(server-error port "EOF while expecting response from server"))
|
||||
("STORED\r" #t)
|
||||
("NOT_STORED\r" #t)
|
||||
(_
|
||||
(server-error port "Unexpected response from server: ~A" line)))))
|
||||
|
||||
(define (connect-to-server addrinfo)
|
||||
(let ((port (socket (addrinfo:fam addrinfo)
|
||||
(addrinfo:socktype addrinfo)
|
||||
(addrinfo:protocol addrinfo))))
|
||||
;; Disable Nagle's algorithm. We buffer ourselves.
|
||||
(setsockopt port IPPROTO_TCP TCP_NODELAY 0)
|
||||
(set-nonblocking! port)
|
||||
(connect port (addrinfo:addr addrinfo))
|
||||
port))
|
||||
|
||||
(define *active-clients* 0)
|
||||
|
||||
(define (client-loop addrinfo n num-connections)
|
||||
(set! *active-clients* (1+ *active-clients*))
|
||||
(let ((port (connect-to-server addrinfo))
|
||||
(key (string-append "test-" (number->string n))))
|
||||
(let lp ((m 0))
|
||||
(when (< m num-connections)
|
||||
(let ((v (string->utf8 (number->string m))))
|
||||
(set port key 0 0 v)
|
||||
(let* ((response (get port key))
|
||||
(item (assoc-ref response key)))
|
||||
(unless item
|
||||
(server-error port "Not found: ~A" key))
|
||||
(unless (equal? (item-bv item) v)
|
||||
(server-error port "Bad response: ~A (expected ~A)" (item-bv item) v))
|
||||
(lp (1+ m))))))
|
||||
(close-port port))
|
||||
(set! *active-clients* (1- *active-clients*))
|
||||
(when (zero? *active-clients*)
|
||||
(exit 0)))
|
||||
|
||||
(define (run-memcached-test num-clients num-connections)
|
||||
;; The getaddrinfo call blocks, unfortunately. Call it once before
|
||||
;; spawning clients.
|
||||
(let ((addrinfo (car (getaddrinfo "localhost" (number->string 11211)))))
|
||||
(let lp ((n 0))
|
||||
(when (< n num-clients)
|
||||
(spawn
|
||||
(lambda ()
|
||||
(client-loop addrinfo n num-connections)))
|
||||
(lp (1+ n)))))
|
||||
(run))
|
||||
|
||||
(apply run-memcached-test (map string->number (cdr (program-arguments))))
|
151
examples/ethreads/memcached-server.scm
Normal file
151
examples/ethreads/memcached-server.scm
Normal file
|
@ -0,0 +1,151 @@
|
|||
;;; Simple memcached server implementation
|
||||
|
||||
;; Copyright (C) 2012 Free Software Foundation, Inc.
|
||||
|
||||
;; This library is free software; you can redistribute it and/or
|
||||
;; modify it under the terms of the GNU Lesser General Public
|
||||
;; License as published by the Free Software Foundation; either
|
||||
;; version 3 of the License, or (at your option) any later version.
|
||||
;;
|
||||
;; This library is distributed in the hope that it will be useful,
|
||||
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
;; Lesser General Public License for more details.
|
||||
;;
|
||||
;; You should have received a copy of the GNU Lesser General Public
|
||||
;; License along with this library; if not, write to the Free Software
|
||||
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
||||
;; 02110-1301 USA
|
||||
|
||||
(use-modules (rnrs bytevectors)
|
||||
(ice-9 ethreads)
|
||||
(ice-9 binary-ports)
|
||||
(ice-9 textual-ports)
|
||||
(ice-9 rdelim)
|
||||
(ice-9 match))
|
||||
|
||||
(define (set-nonblocking! port)
|
||||
(fcntl port F_SETFL (logior O_NONBLOCK (fcntl port F_GETFL)))
|
||||
(setvbuf port 'block 1024))
|
||||
|
||||
(define (make-default-socket family addr port)
|
||||
(let ((sock (socket PF_INET SOCK_STREAM 0)))
|
||||
(setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
|
||||
(fcntl sock F_SETFD FD_CLOEXEC)
|
||||
(bind sock family addr port)
|
||||
(set-nonblocking! sock)
|
||||
sock))
|
||||
|
||||
(define (client-error port msg . args)
|
||||
(put-string port (apply format #f msg args))
|
||||
(put-string port "\r\n")
|
||||
(close-port port)
|
||||
(suspend))
|
||||
|
||||
(define (parse-int port val)
|
||||
(let ((num (string->number val)))
|
||||
(unless (and num (integer? num) (exact? num) (>= num 0))
|
||||
(client-error port "Expected a non-negative integer: ~s" val))
|
||||
num))
|
||||
|
||||
(define (make-item flags bv)
|
||||
(vector flags bv))
|
||||
(define (item-flags item)
|
||||
(vector-ref item 0))
|
||||
(define (item-bv item)
|
||||
(vector-ref item 1))
|
||||
|
||||
(define *commands* (make-hash-table))
|
||||
|
||||
(define-syntax-rule (define-command (name port store . pat)
|
||||
body body* ...)
|
||||
(begin
|
||||
(define (name port store args)
|
||||
(match args
|
||||
(pat body body* ...)
|
||||
(else
|
||||
(client-error port "Bad line: ~A ~S" 'name (string-join args " ")))))
|
||||
(hashq-set! *commands* 'name name)))
|
||||
|
||||
(define-command (get port store key* ...)
|
||||
(let lp ((key* key*))
|
||||
(match key*
|
||||
((key key* ...)
|
||||
(let ((item (hash-ref store key)))
|
||||
(when item
|
||||
(put-string port "VALUE ")
|
||||
(put-string port key)
|
||||
(put-char port #\space)
|
||||
(put-string port (number->string (item-flags item)))
|
||||
(put-char port #\space)
|
||||
(put-string port (number->string
|
||||
(bytevector-length (item-bv item))))
|
||||
(put-char port #\return)
|
||||
(put-char port #\newline)
|
||||
(put-bytevector port (item-bv item))
|
||||
(put-string port "\r\n"))
|
||||
(lp key*)))
|
||||
(()
|
||||
(put-string port "END\r\n")))))
|
||||
|
||||
(define-command (set port store key flags exptime bytes
|
||||
. (and noreply (or ("noreply") ())))
|
||||
(let* ((flags (parse-int port flags))
|
||||
(exptime (parse-int port exptime))
|
||||
(bytes (parse-int port bytes)))
|
||||
(let ((bv (get-bytevector-n port bytes)))
|
||||
(unless (= (bytevector-length bv) bytes)
|
||||
(client-error port "Tried to read ~A bytes, only read ~A"
|
||||
bytes (bytevector-length bv)))
|
||||
(hash-set! store key (make-item flags bv))
|
||||
(when (eqv? (peek-char port) #\return)
|
||||
(read-char port))
|
||||
(when (eqv? (peek-char port) #\newline)
|
||||
(read-char port)))
|
||||
(put-string port "STORED\r\n")))
|
||||
|
||||
(define (client-loop port addr store)
|
||||
(let loop ()
|
||||
;; TODO: Restrict read-line to 512 chars.
|
||||
(let ((line (read-line port)))
|
||||
(cond
|
||||
((eof-object? line)
|
||||
(close-port port))
|
||||
(else
|
||||
(match (string-split (string-trim-right line) #\space)
|
||||
((verb . args)
|
||||
(let ((proc (hashq-ref *commands* (string->symbol verb))))
|
||||
(unless proc
|
||||
(client-error port "Bad command: ~a" verb))
|
||||
(proc port store args)
|
||||
(force-output port)
|
||||
(loop)))
|
||||
(else (client-error port "Bad command line" line))))))))
|
||||
|
||||
;; todo: accept and connect
|
||||
(define (socket-loop socket store)
|
||||
(let loop ()
|
||||
(match (accept socket)
|
||||
((client . addr)
|
||||
(set-nonblocking! client)
|
||||
;; Disable Nagle's algorithm. We buffer ourselves.
|
||||
(setsockopt client IPPROTO_TCP TCP_NODELAY 0)
|
||||
(spawn (lambda () (client-loop client addr store)))
|
||||
(loop)))))
|
||||
|
||||
(define* (run-memcached #:key
|
||||
(host #f)
|
||||
(family AF_INET)
|
||||
(addr (if host
|
||||
(inet-pton family host)
|
||||
INADDR_LOOPBACK))
|
||||
(port 11211)
|
||||
(socket (make-default-socket family addr port)))
|
||||
(listen socket 128)
|
||||
(sigaction SIGPIPE SIG_IGN)
|
||||
(spawn
|
||||
(lambda ()
|
||||
(socket-loop socket (make-hash-table))))
|
||||
(run))
|
||||
|
||||
(run-memcached)
|
Loading…
Add table
Add a link
Reference in a new issue