So, my old post made to Reddit, and as I expected, lots of people complained about my conclusion. I still find damaging that Clojure community have this huge energy on defending “core” solutions, even when they are low-level, impractical, or (in this case) does not work well, if it works at all. But, well, I decided to clarify some of the posts, and answer some of the comments that people told me would, supposedly, “solve” the problem. I already answered these questions on other forums, so I’ll try to re-use some of my comments:

Interop with <p!

Some people asked about interop with <p!. Honestly, when I made the article, this option did not exist. Even then, a Promise can be rejected and resolved with arbitrary data, so to translate this code to ClojureScript means capturing the original result of <p!, check if it’s an ExceptionInfo, with the right type, and extract the right part. It’s also harder to compose IMHO (like, with Promises, you can chain actions, then if any of them fail, you can catch the error and the promise is back to the “success” phase). But at least, now it’s less complicated – although I would say, if you’re going to add a library to your project, why not add funcool/promesa instead of core.async? Remembering that promesa is both faster than core.async, and its constructions can work with any arbitrary data, not only promises, so you can treat all code as if it’s async (or sync), without needing to remember which ones are promises, and which ones are not…

Just .pause and .resume constantly

This works, indeed. For example, with sockets:

(.on socket &quot;data&quot; (fn [data]
                     (.pause rs)
                     (async/put! chan data #(.resume socket))))
(.on socket &quot;end&quot; #(async/close! chan))

But not all callback-based async code in ClojureScript allow you to pause events. Node.JS Socket do, WebSocket in browser don’t. Also, NPM package pg, when you query with a cursor, also will not allow it. In this case, there’s nothing you can do, really – you will either drop messages, or an exception will happen.

Use offer!

(go (>! and put! both are fragile with callbacks, because you can’t really “park” or “block” threads in Javascript. Keep put!ing messages in a channel, and you’ll hit an exception “No more than 1024 pending operations” (or something like that). So people asked me to use offer! or make a sliding buffer, etc. This doesn’t really solves the problem if you can’t drop messages – and I do have lots of cases when this happens. There were some people that told me that dropping messages is part of the life, and I wholeheartedly disagree – there are LOTs of situations when you can’t loose anything – bank transactions, socket messages (you can’t reconstruct the payload if you lost part of it), downloads, etc. There are ways to mitigate this, for example:

;; Supposing onData is my async callback
;; and that I can pause/resume things
(let )
      send-data (fn [data]
                  (when-not (a/offer! c)
                    (swap! last-vals conj data)
                    (.pause socket)))]
  (a/go-loop []
    (a/&lt;! resume)
    (.resume a)
    (let [pending @last-vals]
      (reset! last-vals [])
      (doseq [d pending] (send-data d)))
  (.onData socket send-data))

Now, does this solves the problem? Partially – because we have to decide when to resume. Lets try to not use core.async at all, with the same problem:

(defn- resume-soon [socket]
  (if (-&gt; @buffer count (&lt; 500))
    (.resume socket)
    (js/setTimeout #(resume-soon socket) 200)))

(let [buffer (atom [])]
  (.on socket &quot;data&quot; (fn [data]
                       (swap! buffer conj data)
                       (when (-&gt; @buffer count (&gt; 1024))
                         (.pause socket)
                         (resume-soon socket))))

;; To &quot;read&quot; you just add a watch on the buffer:
(add-watch buffer ::stream
           (fn [_ _ _ new-data]
             (when (seq new-data)
               ;; do whatever you want
               ;; with the current fragments)
             (reset! buffer [])))

It’s not as “elegant” as other solutions, but hey, it works, it’s fast, and you don’t drop messages. But “do whatever you want” doesn’t mean “add into a channel” because if you do, instead of one buffer to manage you will now have two – the second one being the channel itself – and remember, we don’t know how many pending messages are inside the channel, so you can’t predict when an error will occur, and you’ll loose the ability to decide when to safely .resume your code.

Backpressure handling is hard, and you’re shifting the problem to the server

The argument is that, if the server is sending messages anyway you’ll have the problem on the server – because it’s expecting clients to consume messages and you’re not handling, so it becomes a bottleneck on the server side. The trouble with this argument is that it only works for a specific configuration of client/server as there are LOTs of cases where async code happens in ClojureScript – SQL queries with cursors, messaging systems like Kafka and RabbitMQ, event handlers on browsers, etc. In most, if not all, these examples you can’t really drop messages. For example, on a SQL query, on each callback you’ll receive a single, or a batch, of SQL results (depending on the library you’re using). If you lose a single “row” of SQL, it’s like you lost the data on the server, which is not true. Good luck trying to find out where the problem is…

On SQL queries with cursors, the connection stays open until the last row is treated by a callback, but no sending of data is being done until you actually treat messages. Most libraries can, and probably will, implement some kind of buffer mechanism when it keeps asking for messages until the buffer is full – and in that case, it’ll just stop polling until you treat the pending messages. It’s exactly the same code I did above, but in Javascript. Again, there’s no way to convert callbacks to core.async without relying on external buffers like an atom or a mutable reference; worse yet, not all libraries offer a way to manually pause the consumption – I’m sure the PostgreSQL library that I did use does not offer any pausing of cursors, and I was able to handle millions of rows (some querying external data) just fine. But, as I was querying external data, and had to wait for they to return, if I was using core.async it’s 100% certain that I would have hit the 1024 pending messages limit anyway. Kafka.js is another library that controls the poll in the background, and again, it works fine with callbacks (even considering that Kafka itself is a pull-based system, and Kafka.JS is not).

You can only have backpressure on pull semantics, so we need a pull library on Node.JS

There is one, as someone pointed me at Reddit, called Readable Stream. Honestly, I don’t know any library, or any part, of Node.JS that implements this, and even the documentation itself uses getReadableStreamSomehow(), so I would have to investigate this. Other than that, I don’t know any library that implements a pull semantic, really, as it can (and it will) block the main thread. In fact, that’s probably the only situation when core.async in ClojureScript is acceptable: you have a blocking code (some calculation, some rare sync code or anything like that) and you don’t want to block the main thread, but also can process fragments little by little. I would argue that’s a kind narrow use-case, but it’s doable. To replace callbacks, not really for the problems I mentioned before (and on my original post).

In fact, most of my “solutions” to the core.async problems is a way to implement “pull” based on a “push” system (that’s the async-callback thing that exists on Node.JS). I argue that this just complicates the problem, is not portable (not every consumer can be easily paused), you’re fighting against the library’s implementation (whatever it was, as you’re basically implementing a second buffer over the first one), it’ll be probably slower and consume more memory, all for… what, exactly? Just for the sake of using core.async?

There is error treatment…

I never said that it didn’t. What I said is that Javascript, with promises, handles this gracefully. Promises are almost and Either monad (with the exception that it’s “flattened”, that means, you can’t have a promise inside a promise – this will flatten out), in the sense that when they complete, they can be either “resolved” (got a value) or “rejected” (got an error). You can put any value on both resolved and rejected, so it’s mostly a convention that if your promise is in the “rejected” state, it’ll not run subsequent .then callbacks because something failed before. You can .catch the error to make it go back to “resolved” state, too. Now, what happens if you use the <p! macro on a rejected promise? You get back an ex-info value. This means that, if you have two async code that return numbers, and one of them fails, and you try to sum both, this will work on Javascript and return back a string. Good luck debugging that. Now, what happens if I don’t use core.async and either use promesa’s p/let macro or use pure interop? You get a “rejected” promise. That’s what I meant.

If you use core.async in place of promises, what you have to do is check, every time, if a promise-chan returned an error, and if it did, you’ll need to treat it. That means a lot of boilerplate, because ClojureScript does not have return to exit early from a function.

Or wrap everything around try-catch and hope that JS implicit conversions (or ClojureScript ones too) don’t bite you. And remember, if throw an exception inside a go block in Javascript, it can either throw an exception on the main thread or have the same behavior as Clojure (that is, it will silently not work).

I’m not convinced / you must prove me…

You don’t need to be. I also don’t need to prove anything, that’s the beauty of things. I’m just sharing that it did not work in multiple implementations, and I’m inviting someone to show me a code that works by purely using core.async in ClojureScript, or a library that’s not prone to the No more than 1024 pending puts error. I did not find any article about how to avoid this error, I did ask on StackOverflow, and got no answer. I found a single blog post (that gave the idea to .pause and .resume constantly) that also does not solve the problem for every possible implementation (only for streams that allow pausing), and I imagine it would be slower for things that are really async (like RabbitMQ, for example. On the blog’s example, he used /dev/zero, that’s mostly synchronous).

What I want so argue is that, if you want to pull a library, try to pull one that will not hit you in the back. Because JS is non-blocking core.async‘s main ability (to move tasks to different threads) is mainly lost, and there’s almost no code in JS world that blocks the main thread anyway, so why use it? Also, most async code in JS is moving to promises, so by using core.async you have to transform promises into channels, use then in go, and that will make your whole function go – async. Every interop with JS is lost – you now need to transform a go or chan back into a promise, and then pass again to JS if you want to use it. As an example, VSCode API wants you to return a promise-like thing on multiple situations, for example, on “goto var definition”. Here’s a way to do it, extracted and adapted from Clover:

(defn- var-definition [document position]
  (p/let [data (vs/get-document-data document position)
          [_ curr-var] (helpers/current-var (:contents data) (:position data))
          [_ curr-ns] (helpers/ns-range-for (:contents data) (:position data))
          nrepl (get-nrepl-connection!)
          {:keys [file-name line]} (get-definition nrepl curr-var curr-ns)]
    (when file-name
      (Location. (. Uri parse file-name) (Position. line 0)))))

It’s using funcool/promesa. Do you know which of these are async, and which are not? The answer is: it doesn’t matter. If something is sync, it’ll just return the code, if it’s async, it’ll await the result, and then it’ll run the code, and return a promise. If the promise fails (maybe the nrepl is not connected, maybe the namespace does not exist, or the code it generates to send to the REPL is not valid) it’ll be captured by the VSCode implementation and treated accordingly.

Now, suppose VSCode will not treat the error. Well, it’ again very easy to fix the issue: (def safe-var-definition (comp #(p/catch % (constantly nil)) var-definition)) and then use this safe version. The core.async version is left as an exercise for the reader. If you want to try, assume get-nrepl-connection! and get-definition are async.

1 Comment

ClojureScript vs clojure.core.async – Maurício Szabo · 2022-04-22 at 18:15

[…] please see the latest post on this subject, when I try to address some of the proposed solutions that people asked me to […]

Comments are closed.

%d bloggers like this: