server in Clojure, fourth attempt

Getting closer!

(ns exvarcis.core
  (:import [java.net InetAddress InetSocketAddress Socket]
           [java.nio ByteBuffer]
           [java.nio.charset Charset]
           [java.nio.channels
            ServerSocketChannel SocketChannel Selector SelectionKey]))

(defn buf-seq
  ([buf] (buf-seq buf 0))
  ([buf i]
   (lazy-seq  
      (when (< i (.limit buf))
        (cons (.get buf i) (buf-seq buf (inc i)))))))

(let [c (atom 0)]
  (defn new-connection []
    {:id (swap! c inc)
     :writebuf (ByteBuffer/allocate 8096)
     :readbuf (ByteBuffer/allocate 8096)}))

(defmulti select-op
  (fn [k] (.readyOps k)))

(defmethod select-op :default [k]
  (println "Default op."))

(defmethod select-op (SelectionKey/OP_ACCEPT) [k]
  (-> 
    (.accept (.channel k))
    (.configureBlocking false)
    (.register (.selector k) (SelectionKey/OP_READ))
    (.attach (new-connection)))
  (println "Client registered"))

(defn close-connection [k]
  (println "closing connection" (.channel k))
  (.cancel k)
  (.close (.channel k)))

(defmethod select-op (SelectionKey/OP_READ) [k]
  (let
      [readbuf (-> (.attachment k) (:readbuf))
       bytes-read (.read (.channel k) readbuf)]
    (when (= bytes-read -1)
      (close-connection k))))

(defn select! [selector]
  (.select selector)
  (let [ks (.selectedKeys selector)]
    (doseq [k ks]
      (select-op k))
    (.clear ks)))

(defn get-lines [buf]
  (let
    [dup-buf (.duplicate buf)
     original-limit (-> dup-buf .flip .limit)
     last-newline (.lastIndexOf (vec (buf-seq dup-buf)) 10)]
    (when-not (= last-newline -1)
      (.flip buf)
      (.limit buf (inc last-newline))
      (let
        [lines (->>
                  buf
                  (.decode (Charset/defaultCharset))
                  .toString
                  clojure.string/split-lines)]
        (.limit buf original-limit)
        (.compact buf)
        lines))))

(defn write-lines [lines k]
  (let
      [encoded-lines (->> lines (.encode (Charset/defaultCharset)))
       buf (-> (.attachment k) (:writebuf))]
    (when (> (.limit encoded-lines) 0)
      (.put buf encoded-lines)
      (.flip buf)
      (.write (.channel k) buf)
      (.compact buf))))

(defn echo-all [ks]
  (let
    [readbufs (map #(-> (.attachment %) (:readbuf)) ks)
     lines (mapcat get-lines readbufs)
     lines-with-crlf (str (clojure.string/join "\r\n" lines) "\r\n")]
    (doseq [k ks]
      (write-lines lines-with-crlf k))))

(defn run [selector]
 (while true
   (select! selector)
   (echo-all (filter #(and (.isValid %) (.attachment %)) (.keys selector)))
   (Thread/sleep 3000)))

(defn start [handler]
  (let [selector (Selector/open)
        acceptor (->
                  (ServerSocketChannel/open)
                  (.configureBlocking false)
                  (.bind (InetSocketAddress. "127.0.0.1" 0)))]
    (.register acceptor selector (SelectionKey/OP_ACCEPT))
    (println "Starting Ex v Arcis on" (.getLocalAddress acceptor))
    (handler selector)))

(defn -main []
  (start run)
)

Manipulating ByteBuffers is a little tricky but also quite satisfying! Slowly adding error handling but also punting on a lot of stuff at the moment. The big breakthrough here is echo’ing input to all connected clients. The functions that do that, get-lines, write-lines, and echo-all, definitely need work though. I also borrowed a coercion function, buf-seq, from Paul Stadig’s https://github.com/pjstadig/nio to make it easy to scan the buffer for newlines. Thanks Paul!

Advertisements

No comments yet

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: