Spymemcached的一个坑

  • Post author:
  • Post category:IT
  • Post comments:0评论

Spymemcached是 Memcached 的一个流行的Java client库(另一个比较著名的是原淘宝的伯岩/庄晓丹开发的XMemcached, 他也开发另一个Taobao开源的项目Metamorphosis),性能表现出色,广泛应用于Java + Memcached 项目中。

Spymemcached 最早由Dustin Sallings开发,Dustin 后来和别人一起创办了Couchbase (原NorthScale),职位为首席架构师。2014加入Google。



本身Memcached没有集群的功能,客户端可以根据不同的key值set/get到不同的Memcached的节点上。 一致性Hash算法可以将数据均衡地分配到各个节点,并且在节点加入和退出的时候可以很好地将失效节点上的数据均衡的分配给其它节点。 Spymemcached使用Ketama算法。

但是,当 memcached 集群的一个节点因为某种原因宕机的时候,spymemcached 并没有正确的选择到另外一个live的节点,而是直接失败:

123456789
2015-11-23 05:56:20.942 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108182.2015-11-23 05:56:20.944 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108254.2015-11-23 05:56:20.946 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108341.2015-11-23 05:56:20.947 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108352.2015-11-23 05:56:20.947 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108381.2015-11-23 05:56:20.948 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108407.2015-11-23 05:56:20.950 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108480.2015-11-23 05:56:20.952 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108552.2015-11-23 05:56:20.954 WARN net.spy.memcached.MemcachedConnection:  Could not redistribute to another node, retrying primary node for ff-108608.

如果使用XMemcached,则没有这种现象。

spymemcached已经设置为一致性Hash的模式:

123456
......      ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder();      builder.setHashAlg(DefaultHashAlgorithm.KETAMA_HASH);      builder.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT);      builder.setFailureMode(FailureMode.Redistribute);......

原因在于当key对应的节点(称之为primary node宕机的时候),spymemcached会有限地选择另外一个节点:

1234567891011121314151617
if (primary.isActive() || failureMode == FailureMode.Retry) {    placeIn = primary;} else if (failureMode == FailureMode.Cancel) {    o.cancel();} else {    Iterator<MemcachedNode> i = locator.getSequence(key);    while (placeIn == null && i.hasNext()) {        MemcachedNode n = i.next();        if (n.isActive()) {            placeIn = n;        }    }    if (placeIn == null) {        placeIn = primary;        this.getLogger().warn("Could not redistribute to another node, " + "retrying primary node for %s.", key);    }}

其中locator.getSequence(key)最多会提供7个备选虚拟节点。

12345
public Iterator<MemcachedNode> getSequence(String k) {  // Seven searches gives us a 1 in 2^7 chance of hitting the  // same dead node all of the time.  return new KetamaIterator(k, 7, getKetamaNodes(), hashAlg);}

但是,依照他的注释,在宕机的情况下,大约会有1/128的几率这七个虚拟节点都会指向这个宕机的primary node。

实际上,下面的代码百分百会选择不到那个live ("127.0.0.1:11211")的节点:

12345678910111213141516171819202122
@Testpublic void testMissingNode2() {    List<MemcachedNode> nodes = new ArrayList<MemcachedNode>();    nodes.add(createMockNode(new InetSocketAddress("127.0.0.1", 11211)));    nodes.add(createMockNode(new InetSocketAddress("127.0.0.1", 11311)));    KetamaNodeLocator locator = new KetamaNodeLocator(nodes, DefaultHashAlgorithm.KETAMA_HASH);    Iterator<MemcachedNode> i = locator.getSequence("ff-108552");    Set<MemcachedNode> foundNodes = new HashSet<MemcachedNode>();    while (i.hasNext()) {        foundNodes.add(i.next());    }    // This fails. 127.0.0.1:11211 is never found.    for (MemcachedNode node: nodes) {        Assert.assertTrue(foundNodes.contains(node));    }}private  MemcachedNode createMockNode(InetSocketAddress sock) {    MemcachedNode mockNode = EasyMock.createMock(MemcachedNode.class);    EasyMock.expect(mockNode.getSocketAddress()).andReturn(sock).anyTimes();    EasyMock.replay(mockNode);    return mockNode;}

事实上,google groups上也有讨论, 原spymemcached的bug管理系统上也有相关的bug,但是问题并没有解决。

导致的问题是,某些缓存项在某个memcached节点宕机的时候,不能利用缓存系统,只能从其它持久化系统比如数据库中获取值。

问题找到,解决办法也就有了,修改getSequence方法,提供更多的节点共选择:

123456789
public Iterator<MemcachedNode> getSequence(String k) {    // return new KetamaIterator(k, 7, getKetamaNodes(), hashAlg);    int maxTry = config.getNodeRepetitions() + 1;    if (maxTry < 20) {       maxTry = 20;    }    return new KetamaIterator(k, maxTry, getKetamaNodes(), hashAlg);}

另外,如何在运行时动态地增加新的memcached节点? 这篇文章给出了一个解决方案

你不得不重载MemcachedClient, MemcachedConnection 和 DefaultConnectionFactory。作者未测试,不保证work。不管怎样,倒是一个思路。

ExtMemCachedConnection.java

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
public class ExtMemCachedConnection extends MemcachedConnection {  protected final OperationFactory opFact;  /**   * Construct a memcached connection.   *   * @param bufSize the size of the buffer used for reading from the server   * @param f       the factory that will provide an operation queue   * @param a       the addresses of the servers to connect to   * @throws java.io.IOException if a connection attempt fails early   */  public ExtendableMemcachedConnection(int bufSize, ConnectionFactory f,                                       List<InetSocketAddress> a,                                       Collection<ConnectionObserver> obs,                                       FailureMode fm, OperationFactory opfactory)      throws IOException {    super(bufSize, f, a, obs, fm, opfactory);    this.opFact = opfactory;  }  public void add(InetSocketAddress nodeAddress) throws IOException {    final List<InetSocketAddress> nodeToAdd = new ArrayList<InetSocketAddress>(1);    nodeToAdd.add(nodeAddress);    List<MemcachedNode> newNodesList = createConnections(nodeToAdd);    newNodesList.addAll(getLocator().getAll());    getLocator().updateLocator(newNodesList);  }  //The node should be obtain from locator to ensure currentNode.equals(node) will return true  public void remove(MemcachedNode node) throws IOException {    for(MemcachedNode currentNode : getLocator().getAll()) {      if(currentNode.equals(node)) {        Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue();        if (currentNode.getChannel() != null) {          currentNode.getChannel().close();          currentNode.setSk(null);          if (currentNode.getBytesRemainingToWrite() > 0) {            getLogger().warn("Shut down with %d bytes remaining to write",                             currentNode.getBytesRemainingToWrite());          }          getLogger().debug("Shut down channel %s", currentNode.getChannel());        }        //Unfortunatelly,  redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation        redistributeOperations(notCompletedOperations);      }    }  }  protected void redistributeOperations(Collection<Operation> ops) {    for (Operation op : ops) {      if (op.isCancelled() || op.isTimedOut()) {        continue;      }      if (op instanceof KeyedOperation) {        KeyedOperation ko = (KeyedOperation) op;        int added = 0;        for (String k : ko.getKeys()) {          for (Operation newop : opFact.clone(ko)) {            addOperation(k, newop);            added++;          }        }        assert added > 0 : "Didn't add any new operations when redistributing";      } else {        // Cancel things that don't have definite targets.        op.cancel();      }    }  }}

ExtMemcachedClient.java

12345678910
public void add(InetSocketAddress nodeAddress) {  if(mconn instanceof ExtMemcachedConnection) {    ((ExtMemcachedConnection)mconn).add(nodeAddress);    }}public boolean remove(MemcachedNode node) {  if(mconn instanceof ExtMemcachedConnection) {    ((ExtMemcachedConnection)mconn).remove(nodeAddress);  }}

ExtMemcachedConnectionfactory.java

12345
@Overridepublic MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {  return new ExtendableMemcachedConnection(getReadBufSize(), this, addrs,                                           getInitialObservers(), getFailureMode(), getOperationFactory());}

发表回复