Skip to content

Conversation

@az7arul
Copy link
Contributor

@az7arul az7arul commented Jun 19, 2015

No description provided.

@az7arul
Copy link
Contributor Author

az7arul commented Jun 19, 2015

Deals with #642

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this backward compatible? node_redis officially hasn't documented the callback on the .subscribe method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe method accepts an optional callback from v0.7.0

and yes its not documented ( there is a todo for it).

ioredis clients do not emit subscribe event, I am not aware of doing this in any other way.

@behrad
Copy link
Collaborator

behrad commented Jun 19, 2015

You should be setting createClientFactory in Kue settings when creating Kue, that should be documented after this merged in.

@az7arul
Copy link
Contributor Author

az7arul commented Jun 19, 2015

Doesn't this take care of createClientFactory documentation for passing ioredis client ?

@behrad
Copy link
Collaborator

behrad commented Jun 19, 2015

Sure, but I'd prefer mentioning support for ioredis for Redis Cluster users and provide a basic sample injecting ioredis in :)

@az7arul
Copy link
Contributor Author

az7arul commented Jun 19, 2015

got it, adding the documentation.

@behrad
Copy link
Collaborator

behrad commented Jun 19, 2015

How long are you using kue with ioredis? is your PR tested widely for all kue features?

@az7arul
Copy link
Contributor Author

az7arul commented Jun 19, 2015

I haven't used ioredis with kue and no it is not widely tested. I could create job, process them and add a complete event callback. But when I pass ioredis cluster client its not working.

I changed the redis client in the specs to use ioredis client and all the tests pass ( they do throw Connection closed error for few of them).

@behrad
Copy link
Collaborator

behrad commented Jun 19, 2015

But when I pass ioredis cluster client its not working

then we should not yet merge this in until this is resolved.
We also should make sure if delayed jobs (promotion) and ttl works fine

@az7arul
Copy link
Contributor Author

az7arul commented Jun 19, 2015

yeah, need to make sure everything works.

btw, the reason cluster is not working is because it throws

[Error: All keys in the pipeline should belong to the same slot(expect "q:jobs:inactive" belongs to slot 11113).]

One way to fix it is to do this ( using Redis Cluster hash tags)

    client.prefix           = '{' +options.prefix + '}';

@az7arul
Copy link
Contributor Author

az7arul commented Jun 19, 2015

Updated Readme.md with ioredis cluster docs.

I have found similar results with default createQueue method and passing the cluster enabled client using https://gist.github.com/anonymous/2a33c28006553bbe7e08

@behrad
Copy link
Collaborator

behrad commented Jun 19, 2015

One way to fix it is to do this ( using Redis Cluster hash tags)

Can you elaborate on the problem and tell how is it fixed that way?

@az7arul
Copy link
Contributor Author

az7arul commented Jun 19, 2015

From redis cluster specs: Redis Cluster implements a concept called hash tags that can be used in order to force certain keys to be stored in the same node.

so for {q}:jobs and {q}:ids only the part inside the curly braces ( here q) is used to calculate the hash. So all the keys end up in the same slot which is needed for multi key operations.

@dg3feiko
Copy link

the critical part should be lua scripting, I migrated node-warlock and it is not only about syntax, it is about the sequence of lua scripting, for old redis, you might just load-then-use, but for redis cluster, you need to either load it to all nodes, which is very not possible because new nodes are likely added afterward, or use the defineCommand pattern of ioredis

@az7arul
Copy link
Contributor Author

az7arul commented Jul 23, 2015

@behrad @dg3feiko any suggestions to make sure this is not breaking anything ?

@behrad
Copy link
Collaborator

behrad commented Jul 23, 2015

hadn't enough time to test this @az7arul yet, Aren't you testing with ioredis ?

@hunt
Copy link

hunt commented Jul 23, 2015

I already test it. Test passed without any error but it's not working... i'll try to dig down the problem maybe in this weekend.

@phlip9
Copy link

phlip9 commented Aug 5, 2015

+1 It would be awesome if kue worked with ioredis

@leepa
Copy link

leepa commented Aug 14, 2015

From reading this change it can only deployed on an empty queue?

Might be an idea to make the prefix changing optional.

@behrad
Copy link
Collaborator

behrad commented Aug 14, 2015

From reading this change it can only deployed on an empty queue?

I didn't get your point.

This patch is changing prefix:jobs -> {prefix}:jobs so that enforcing keys on the same redis instance!!!

@behrad
Copy link
Collaborator

behrad commented Aug 14, 2015

this should mean that you can scale via using different Kue namespaces, but not within a single Kue namespace. Right @az7arul ?

@leepa
Copy link

leepa commented Aug 14, 2015

My point is:

If there's stuff in prefix:jobs when you deploy this... it's lost.

@behrad
Copy link
Collaborator

behrad commented Aug 14, 2015

Why this should happen?

@behrad
Copy link
Collaborator

behrad commented Aug 14, 2015

Ahaaa, got you just now... Pordon @leepa
Sure you can't seemlessly upgrade to this unless you write an index migration script for your current data...

@leepa
Copy link

leepa commented Aug 14, 2015

In a production environment - this kind of thing really matters. It'd surely be wise to allow for seamless migration in a production environment...?

@behrad
Copy link
Collaborator

behrad commented Aug 14, 2015

and ofcourse we should think a work around for single redis deployments where you can stick with prefix:job and only want to use ioredis, first thing as you said, can be introducing an option of isClustered or useClusterPrefix,...

Only be warned that I'm not yet testing/accepting this PR since I've done no tests on it. Testers are welcome :)

@dandanknight
Copy link

I've been testing this PR with ioredis in sentinel mode for a couple of days now, and so far, had no issues at all. I obviously had to do a flushall on Redis beforehand. Will be moving the code out of dev and into our test environment in the next few days, where it should get a good workout.

Also, one small gotcha I found is that because ioredis only emits an error event if it has a listener on it, I couldn't work out why I was getting uncaught exceptions during redis failover, but it turns out that Kue attaches to the ioredis error event by default, so that Kue error event needs to be caught and handled in the case of a redis-server going down and ioredis attempting reconnection to a slave.

I've attached directly to the ioredis events and can now get some good logging out of it, in the case of lost connections, failovers, new connections, ready etc.

I'll keep testing and let you know if I find any issues. Cheers.

@behrad
Copy link
Collaborator

behrad commented Oct 1, 2015

@dandanknight great hearing this from you... Can you please give us more feedback and also create a PR for any improvements you are on?

@dandanknight
Copy link

Quick update, I'm successfully using ioredis in our test environment, which consists of two web servers, each running 4 express threads (load balanced by PM2), which pop jobs (via Kue) onto a master/slave Redis store (watched by redis-sentinels), then two worker servers, each with two consumer threads (also PM2 balanced) consuming the jobs. I've had no issues to report so far. I have been shutting down redis-servers, sentinels, restarting etc, and ioredis does a great job of picking back up the connections when the services become available. It also has an offline queue which allows it to still take in jobs and cache them until a server becomes available again.

I've written an ioredis wrapper that both my producer and consumer apps use, which you require, create an instance of, and it basically just returns a new ioredis object, but along the way also subscribes to all the extra events that ioredis offers.

I can share my code if anyone wants it, which uses ioredis's retry connection strategies etc, but I'm quite new to nodejs, so there are probably much better ways to do it than I have done!

@behrad
Copy link
Collaborator

behrad commented Oct 12, 2015

Valuable comment @dandanknight , Are you using this PR for ioredis integration with Kue?
I'd like to see any related code for this, so I strongly prefer replacing node_redis with ioredis

@dandanknight
Copy link

Hi @behrad , yes using this PR. Not had to make any changes to it. Published it to a local private repo (Sinopia) so can be installed across our servers using npm for now, untill you guys merge it. My connection module looks like....

/**
 * Created by dan on 01/10/15.
 */

var Redis = require('ioredis');
var log = require('./log');

var _redisConfig;
var notified = false;

function IoRedis(redisConfig) {
    this._redisConfig = _redisConfig = redisConfig;
    this._lastConnectionAttempt = 0;
    this._lastSentinelConnectionAttempt = 0;
    log.debug("Redis config: ", this._redisConfig);
}

var exports = module.exports = IoRedis;

IoRedis.prototype.createClient = function () {

    var _this = this;

    var redis = new Redis(
        {
            sentinels: _redisConfig.sentinels,
            name     : _redisConfig.name,
            enableOfflineQueue: _redisConfig.enableOfflineQueue,
            retryStrategy : function(times){
                var delay = Math.min(times * 5 * 1000, 60000);

                if(_this._lastConnectionAttempt !== times){
                    log.warn("REDIS: Connection attempt %s, will retry in %s seconds...", times, delay / 1000);
                    _this._lastConnectionAttempt = times;
                }

                return delay;
            },
            sentinelRetryStrategy : function(times){
                var delay = Math.min(times * 5 * 1000, 60000);
                if(_this._lastSentinelConnectionAttempt !== times) {
                    //console.log("Attempt Sentinel reconnection in %s seconds....", delay);
                    _this._lastSentinelConnectionAttempt = times;
                }
                return delay;
            }
        });

    redis.on('ready', function (a, b, c) {
        log.info("REDIS: Connection ready: %s:%s - ID:%s", this.stream.remoteAddress, this.stream.remotePort, this.stream._handle.fd);
    });

    redis.on('reconnecting', function () {
        if (!notified) {
            log.warn("REDIS: Attempting reconnection...");
            notified = true;
        }
    });

    redis.on('close', function () {
        if (!notified) {
            log.warn("REDIS: Connection closed: %s", this.connector.stream._host);
        }
    });

    // Allow Kue to subscibe to this event (as it does anyway) and handle the errors there.

    //redis.on('error', function (err) {
    //        log.error("REDIS: ", err);
    //});

    redis.on('end', function(){
        log.debug("REDIS: No more connections will be made");
        notified = false;
        _this._lastConnectionAttempt = 0;
    });

    redis.on('connect', function () {
        log.info("REDIS: Connected to: %s:%s - ID:", this.stream.remoteAddress, this.stream.remotePort, this.stream._handle.fd);
        notified = false;
        _this._lastConnectionAttempt = 0;
    });

    return redis;
};

Redis.Promise.onPossiblyUnhandledRejection(function (error) {
    log.error("REDIS ERROR:", error);
});

And then simply connect the normal way using....

var IoRedis = require('./ioRedis');

KueUtils.prototype.connect = function () {
    ioRedis = new IoRedis(this._redisConfig);
    this._q = q = kue.createQueue({
        redis: {
            createClientFactory: ioRedis.createClient
        }
    });

    if(this._q.listeners('error').length === 0){
        this._q.on('error', function (err) {
            if (err.code === "ECONNREFUSED" && !notified) {
                log.error("Redis lost connection, attempting reconnect");
                notified = true;
            }
        });
    }
};

Like I said, I'm quiet new to Node, so please don't take any of this code as gospel! Also, I've straight copied and pasted from my IDE, so there may be a few specific bits in there that are only relevant to my project. Cheers.

@behrad behrad mentioned this pull request Nov 17, 2015
@behrad behrad merged commit b53e475 into Automattic:master Nov 20, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants